From 94e226fcabf746da767d70c10d046e66dc0032c7 Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Tue, 24 Sep 2024 10:35:11 +0900 Subject: [PATCH] debug on WSSongListRoutes.kt - add re-tx logics - add ACK enum value - else other improve --- .../chzzk_bot/common/events/SongEvents.kt | 3 +- .../space/mori/chzzk_bot/webserver/Main.kt | 3 +- .../webserver/routes/WSSongListRoutes.kt | 264 ++++++++++-------- 3 files changed, 154 insertions(+), 116 deletions(-) diff --git a/common/src/main/kotlin/space/mori/chzzk_bot/common/events/SongEvents.kt b/common/src/main/kotlin/space/mori/chzzk_bot/common/events/SongEvents.kt index cbe27f9..c3f319c 100644 --- a/common/src/main/kotlin/space/mori/chzzk_bot/common/events/SongEvents.kt +++ b/common/src/main/kotlin/space/mori/chzzk_bot/common/events/SongEvents.kt @@ -7,7 +7,8 @@ enum class SongType(var value: Int) { REMOVE(1), NEXT(2), - STREAM_OFF(50) + STREAM_OFF(50), + ACK(51) } class SongEvent( diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt index 4757779..819f6cb 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt @@ -24,9 +24,8 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.webserver.routes.* -import space.mori.chzzk_bot.webserver.utils.DiscordGuildCache import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits -import space.mori.chzzk_bot.webserver.utils.Guild +import wsSongListRoutes import java.time.Duration val dotenv = dotenv { diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt index 187e148..7ffce84 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt @@ -1,9 +1,9 @@ -package space.mori.chzzk_bot.webserver.routes - import io.ktor.server.routing.* import io.ktor.server.sessions.* import io.ktor.server.websocket.* +import io.ktor.util.logging.Logger import io.ktor.websocket.* +import io.ktor.websocket.Frame.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.ClosedReceiveChannelException @@ -15,12 +15,16 @@ import org.koin.java.KoinJavaComponent.inject import org.slf4j.LoggerFactory import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.models.SongList +import space.mori.chzzk_bot.common.models.SongLists.uid +import space.mori.chzzk_bot.common.models.User import space.mori.chzzk_bot.common.services.SongConfigService import space.mori.chzzk_bot.common.services.SongListService import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.utils.YoutubeVideo import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.webserver.UserSession +import space.mori.chzzk_bot.webserver.routes.SongResponse +import space.mori.chzzk_bot.webserver.routes.toSerializable import space.mori.chzzk_bot.webserver.utils.CurrentSong import java.util.concurrent.ConcurrentHashMap @@ -32,12 +36,11 @@ fun Routing.wsSongListRoutes() { val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) fun addSession(uid: String, session: WebSocketServerSession) { - if(sessions[uid] != null) { + if (sessions[uid] != null) { CoroutineScope(Dispatchers.Default).launch { sessions[uid]?.close( - CloseReason(CloseReason.Codes.VIOLATED_POLICY, - "Duplicated sessions." - )) + CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Duplicated sessions.") + ) } } sessions[uid] = session @@ -47,6 +50,24 @@ fun Routing.wsSongListRoutes() { sessions.remove(uid) } + suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean { + val timeout = 5000L // 5 seconds timeout + val startTime = System.currentTimeMillis() + while (System.currentTimeMillis() - startTime < timeout) { + for (frame in ws.incoming) { + if (frame is Text) { + val message = frame.readText() + val data = Json.decodeFromString(message) + if (data.type == SongType.ACK.value) { + return true // ACK received + } + } + } + delay(100) // Check every 100 ms + } + return false // Timeout + } + suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) { var attempt = 0 var sentSuccessfully = false @@ -56,11 +77,17 @@ fun Routing.wsSongListRoutes() { try { // Attempt to send the message ws?.sendSerialized(res) - sentSuccessfully = true // If no exception, mark as sent successfully - logger.debug("Message sent successfully on attempt $attempt") + logger.debug("Message sent successfully to $uid on attempt $attempt") + // Wait for ACK + val ackReceived = waitForAck(ws!!, res.type) + if (ackReceived) { + sentSuccessfully = true + } else { + logger.warn("ACK not received for message to $uid on attempt $attempt.") + } } catch (e: Exception) { attempt++ - logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.") + logger.warn("Failed to send message to $uid on attempt $attempt. Retrying in $delayMillis ms.") logger.warn(e.stackTraceToString()) // Wait before retrying @@ -69,13 +96,13 @@ fun Routing.wsSongListRoutes() { } if (!sentSuccessfully) { - logger.error("Failed to send message after $maxRetries attempts.") + logger.error("Failed to send message to $uid after $maxRetries attempts.") } } webSocket("/songlist") { val session = call.sessions.get() - val user = session?.id?.let { UserService.getUserWithNaverId( it ) } + val user = session?.id?.let { UserService.getUserWithNaverId(it) } if (user == null) { close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid SID")) return@webSocket @@ -85,7 +112,7 @@ fun Routing.wsSongListRoutes() { addSession(uid!!, this) - if(status[uid] == SongType.STREAM_OFF) { + if (status[uid] == SongType.STREAM_OFF) { CoroutineScope(Dispatchers.Default).launch { sendSerialized(SongResponse( SongType.STREAM_OFF.value, @@ -100,111 +127,25 @@ fun Routing.wsSongListRoutes() { try { for (frame in incoming) { - when(frame) { - is Frame.Text -> { + when (frame) { + is Text -> { if (frame.readText() == "ping") { send("pong") } else { val data = frame.readText().let { Json.decodeFromString(it) } - if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit( - user, - data.maxQueue - ) - if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit( - user, - data.maxUserLimit - ) - if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly( - user, - data.isStreamerOnly - ) - if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled) + // Handle song requests + handleSongRequest(data, user, dispatcher, logger) - if (data.type == SongType.ADD.value && data.url != null) { - try { - val youtubeVideo = getYoutubeVideo(data.url) - if (youtubeVideo != null) { - CoroutineScope(Dispatchers.Default).launch { - SongListService.saveSong( - user, - user.token!!, - data.url, - youtubeVideo.name, - youtubeVideo.author, - youtubeVideo.length, - user.username - ) - dispatcher.post( - SongEvent( - user.token!!, - SongType.ADD, - user.token, - CurrentSong.getSong(user), - youtubeVideo - ) - ) - } - } - } catch (e: Exception) { - logger.debug("SongType.ADD Error: $uid $e") - } - } else if (data.type == SongType.REMOVE.value && data.url != null) { - val songs = SongListService.getSong(user) - - val exactSong = songs.firstOrNull { it.url == data.url } - if (exactSong != null) { - SongListService.deleteSong(user, exactSong.uid, exactSong.name) - } - - dispatcher.post( - SongEvent( - user.token!!, - SongType.REMOVE, - null, - null, - null, - data.url - ) - ) - } else if (data.type == SongType.NEXT.value) { - val songList = SongListService.getSong(user) - var song: SongList? = null - var youtubeVideo: YoutubeVideo? = null - - if (songList.isNotEmpty()) { - song = songList[0] - SongListService.deleteSong(user, song.uid, song.name) - } - - song?.let { - youtubeVideo = YoutubeVideo( - song.url, - song.name, - song.author, - song.time - ) - } - dispatcher.post( - SongEvent( - user.token!!, - SongType.NEXT, - song?.uid, - youtubeVideo - ) - ) - - CurrentSong.setSong(user, youtubeVideo) - } + // Send ACK after handling request + send("ACK: ${data.url}") } } - is Frame.Ping -> send(Frame.Pong(frame.data)) - else -> { - - } + is Ping -> send(Pong(frame.data)) + else -> "" } } - } catch(e: ClosedReceiveChannelException) { + } catch (e: ClosedReceiveChannelException) { logger.error("Error in WebSocket: ${e.message}") } finally { removeSession(uid) @@ -215,7 +156,7 @@ fun Routing.wsSongListRoutes() { logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) CoroutineScope(Dispatchers.Default).launch { val user = UserService.getUser(it.uid) - if(user != null) { + if (user != null) { user.token?.let { token -> sendWithRetry( token, SongResponse( @@ -225,16 +166,18 @@ fun Routing.wsSongListRoutes() { it.current?.toSerializable(), it.next?.toSerializable(), it.delUrl - )) + ) + ) } } } } + dispatcher.subscribe(TimerEvent::class) { - if(it.type == TimerType.STREAM_OFF) { + if (it.type == TimerType.STREAM_OFF) { CoroutineScope(Dispatchers.Default).launch { val user = UserService.getUser(it.uid) - if(user != null) { + if (user != null) { user.token?.let { token -> sendWithRetry( token, SongResponse( @@ -243,7 +186,8 @@ fun Routing.wsSongListRoutes() { null, null, null, - )) + ) + ) } } } @@ -251,6 +195,100 @@ fun Routing.wsSongListRoutes() { } } +suspend fun handleSongRequest( + data: SongRequest, + user: User, + dispatcher: CoroutinesEventBus, + logger: Logger +) { + if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(user, data.maxQueue) + if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(user, data.maxUserLimit) + if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(user, data.isStreamerOnly) + if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled) + + when (data.type) { + SongType.ADD.value -> { + data.url?.let { url -> + try { + val youtubeVideo = getYoutubeVideo(url) + if (youtubeVideo != null) { + CoroutineScope(Dispatchers.Default).launch { + SongListService.saveSong( + user, + user.token!!, + url, + youtubeVideo.name, + youtubeVideo.author, + youtubeVideo.length, + user.username + ) + dispatcher.post( + SongEvent( + user.token!!, + SongType.ADD, + user.token, + CurrentSong.getSong(user), + youtubeVideo + ) + ) + } + } + } catch (e: Exception) { + logger.debug("SongType.ADD Error: $uid $e") + } + } + } + SongType.REMOVE.value -> { + data.url?.let { url -> + val songs = SongListService.getSong(user) + val exactSong = songs.firstOrNull { it.url == url } + if (exactSong != null) { + SongListService.deleteSong(user, exactSong.uid, exactSong.name) + } + dispatcher.post( + SongEvent( + user.token!!, + SongType.REMOVE, + null, + null, + null, + url + ) + ) + } + } + SongType.NEXT.value -> { + val songList = SongListService.getSong(user) + var song: SongList? = null + var youtubeVideo: YoutubeVideo? = null + + if (songList.isNotEmpty()) { + song = songList[0] + SongListService.deleteSong(user, song.uid, song.name) + } + + song?.let { + youtubeVideo = YoutubeVideo( + song.url, + song.name, + song.author, + song.time + ) + } + dispatcher.post( + SongEvent( + user.token!!, + SongType.NEXT, + song?.uid, + youtubeVideo + ) + ) + + CurrentSong.setSong(user, youtubeVideo) + } + } +} + @Serializable data class SongRequest( val type: Int, @@ -261,4 +299,4 @@ data class SongRequest( val isStreamerOnly: Boolean?, val remove: Int?, val isDisabled: Boolean?, -) \ No newline at end of file +)