From e3c0266253e7fa814b91b6934f2d4700ab406ba8 Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Mon, 19 Aug 2024 17:24:42 +0900 Subject: [PATCH] fix wsSongListRoutes - duplicated session is not permited. - re-tx logic add --- .../webserver/routes/WSSongListRoutes.kt | 80 ++++++++++++------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt index 8247486..44745ab 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt @@ -7,6 +7,7 @@ import io.ktor.websocket.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json @@ -19,23 +20,52 @@ import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.webserver.UserSession import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedQueue fun Routing.wsSongListRoutes() { - val sessions = ConcurrentHashMap>() + val sessions = ConcurrentHashMap() val status = ConcurrentHashMap() val logger = LoggerFactory.getLogger("WSSongListRoutes") val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) fun addSession(uid: String, session: WebSocketServerSession) { - sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) + if(sessions[uid] != null) { + CoroutineScope(Dispatchers.Default).launch { + sessions[uid]?.close( + CloseReason(CloseReason.Codes.VIOLATED_POLICY, + "Duplicated sessions." + )) + } + } + sessions[uid] = session } - fun removeSession(uid: String, session: WebSocketServerSession) { - sessions[uid]?.remove(session) - if(sessions[uid]?.isEmpty() == true) { - sessions.remove(uid) + fun removeSession(uid: String) { + sessions.remove(uid) + } + + suspend fun sendWithRetry(ws: WebSocketServerSession, res: SongResponse, maxRetries: Int, delayMillis: Long = 3000L) { + var attempt = 0 + var sentSuccessfully = false + + while (attempt < maxRetries && !sentSuccessfully) { + try { + // Attempt to send the message + ws.sendSerialized(res) + sentSuccessfully = true // If no exception, mark as sent successfully + logger.debug("Message sent successfully on attempt $attempt") + } catch (e: Exception) { + attempt++ + logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.") + logger.warn(e.stackTraceToString()) + + // Wait before retrying + delay(delayMillis) + } + } + + if (!sentSuccessfully) { + logger.error("Failed to send message after $maxRetries attempts.") } } @@ -63,7 +93,7 @@ fun Routing.wsSongListRoutes() { null, )) } - removeSession(uid, this) + removeSession(uid) } try { @@ -164,7 +194,7 @@ fun Routing.wsSongListRoutes() { } catch(e: ClosedReceiveChannelException) { logger.error("Error in WebSocket: ${e.message}") } finally { - removeSession(uid, this) + removeSession(uid) } } @@ -173,18 +203,16 @@ fun Routing.wsSongListRoutes() { CoroutineScope(Dispatchers.Default).launch { val user = UserService.getUser(it.uid) if(user != null) { - sessions[user.token ?: ""]?.forEach { ws -> - ws.sendSerialized( - SongResponse( - it.type.value, - it.uid, - it.reqUid, - it.name, - it.author, - it.time, - it.url - ) - ) + sessions[user.token ?: ""]?.let { ws -> + sendWithRetry(ws, SongResponse( + it.type.value, + it.uid, + it.reqUid, + it.name, + it.author, + it.time, + it.url + ), 3) } } } @@ -194,9 +222,8 @@ fun Routing.wsSongListRoutes() { CoroutineScope(Dispatchers.Default).launch { val user = UserService.getUser(it.uid) if(user != null) { - sessions[user.token]?.forEach { ws -> - ws.sendSerialized( - SongResponse( + sessions[user.token ?: ""]?.let { ws -> + sendWithRetry(ws, SongResponse( it.type.value, it.uid, null, @@ -204,9 +231,8 @@ fun Routing.wsSongListRoutes() { null, null, null - ) - ) - removeSession(user.token ?: "", ws) + ), 3) + removeSession(user.token ?: "") } } }