From c5a98943c049c5ffca46a7700d02c40b033f71fe Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Thu, 24 Apr 2025 17:08:12 +0900 Subject: [PATCH] Refactor WebSocket ACK handling and improve message retries Introduced `waitForAck` to centralize ACK handling logic and updated retry mechanism in `sendWithRetry` to improve reliability and readability. Cleaned up error handling in WebSocket session management and ensured proper cleanup of resources. These changes enhance maintainability and robustness of the WebSocket song list routes. --- .../webserver/routes/WSSongListRoutes.kt | 69 +++++++++++-------- 1 file changed, 40 insertions(+), 29 deletions(-) diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt index 2f566a5..479587c 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongListRoutes.kt @@ -49,6 +49,8 @@ fun Routing.wsSongListRoutes() { songListScope.cancel() } + val ackMap = ConcurrentHashMap>() + suspend fun addSession(uid: String, session: WebSocketServerSession) { val oldSession = sessionMutex.withLock { val old = sessions[uid] @@ -62,22 +64,33 @@ fun Routing.wsSongListRoutes() { CloseReason.Codes.VIOLATED_POLICY, "Another session is already active.")) } catch(e: Exception) { logger.warn("Error closing old session: ${e.message}") - e.printStackTrace() } } } } + suspend fun removeSession(uid: String) { sessionMutex.withLock { sessions.remove(uid) } } - val ackMap = ConcurrentHashMap>() + suspend fun waitForAck(ws: WebSocketServerSession, expectedUid: String): Boolean { + val ackDeferred = CompletableDeferred() + ackMap[expectedUid] = ackDeferred + return try { + withTimeoutOrNull(5000L) { ackDeferred.await() } ?: false + } catch (e: CancellationException) { + false + } finally { + ackMap.remove(expectedUid) + } + } suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) { var attempt = 0 - while (attempt < maxRetries) { + var sentSuccessfully = false + while (attempt < maxRetries && !sentSuccessfully) { val ws: WebSocketServerSession? = sessionMutex.withLock { sessions[uid] } if (ws == null) { logger.debug("No active session for $uid. Retrying in $delayMillis ms.") @@ -87,13 +100,10 @@ fun Routing.wsSongListRoutes() { } try { ws.sendSerialized(res) - logger.debug("Message sent successfully to $uid on attempt $attempt") - val ackDeferred = CompletableDeferred() - ackMap[res.uid] = ackDeferred - val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false + val ackReceived = waitForAck(ws, res.uid) if (ackReceived) { logger.debug("ACK received for message to $uid on attempt $attempt.") - return + sentSuccessfully = true } else { logger.warn("ACK not received for message to $uid on attempt $attempt.") attempt++ @@ -104,15 +114,16 @@ fun Routing.wsSongListRoutes() { attempt++ logger.warn("Failed to send message to $uid on attempt $attempt: ${e.message}") if (e is WebSocketException || e is IOException) { - logger.warn("Connection issue detected, session may be invalid") removeSession(uid) } } - if (attempt < maxRetries) { + if (!sentSuccessfully && attempt < maxRetries) { delay(delayMillis) } } - logger.error("Failed to send message to $uid after $maxRetries attempts.") + if (!sentSuccessfully) { + logger.error("Failed to send message to $uid after $maxRetries attempts.") + } } webSocket("/songlist") { @@ -126,18 +137,23 @@ fun Routing.wsSongListRoutes() { addSession(uid, this) if (status[uid] == SongType.STREAM_OFF) { songListScope.launch { - sendSerialized(SongResponse( - SongType.STREAM_OFF.value, - uid, - null, - null, - null, - )) + try { + sendSerialized(SongResponse( + SongType.STREAM_OFF.value, + uid, + null, + null, + null, + )) + } catch (e: Exception) { + logger.warn("Error sending STREAM_OFF: ${e.message}") + } finally { + removeSession(uid) + } } - removeSession(uid) + return@webSocket } - - songListScope.launch { + try { for (frame in incoming) { when (frame) { is Frame.Text -> { @@ -148,7 +164,6 @@ fun Routing.wsSongListRoutes() { val data = Json.decodeFromString(text) if (data.type == SongType.ACK.value) { ackMap[data.uid]?.complete(true) - ackMap.remove(data.uid) } else { handleSongRequest(data, user, dispatcher, logger) } @@ -158,17 +173,13 @@ fun Routing.wsSongListRoutes() { else -> {} } } - } - - try { - // Keep the connection alive - suspendCancellableCoroutine {} } catch (e: ClosedReceiveChannelException) { logger.error("WebSocket connection closed: ${e.message}") - } catch(e: Exception) { + } catch (e: Exception) { logger.error("Error in WebSocket: ${e.message}") } finally { removeSession(uid) + ackMap.remove(uid) } } @@ -219,7 +230,7 @@ fun Routing.wsSongListRoutes() { } // 노래 처리를 위한 Mutex 추가 private val songMutex = Mutex() -suspend fun handleSongRequest( +fun handleSongRequest( data: SongRequest, user: User, dispatcher: CoroutinesEventBus,