Refactor WebSocket ACK handling and improve message retries

Introduced `waitForAck` to centralize ACK handling logic and updated retry mechanism in `sendWithRetry` to improve reliability and readability. Cleaned up error handling in WebSocket session management and ensured proper cleanup of resources. These changes enhance maintainability and robustness of the WebSocket song list routes.
This commit is contained in:
dalbodeule 2025-04-24 17:08:12 +09:00
parent 8230762053
commit c5a98943c0
No known key found for this signature in database
GPG Key ID: EFA860D069C9FA65

View File

@ -49,6 +49,8 @@ fun Routing.wsSongListRoutes() {
songListScope.cancel() songListScope.cancel()
} }
val ackMap = ConcurrentHashMap<String, CompletableDeferred<Boolean>>()
suspend fun addSession(uid: String, session: WebSocketServerSession) { suspend fun addSession(uid: String, session: WebSocketServerSession) {
val oldSession = sessionMutex.withLock { val oldSession = sessionMutex.withLock {
val old = sessions[uid] val old = sessions[uid]
@ -62,22 +64,33 @@ fun Routing.wsSongListRoutes() {
CloseReason.Codes.VIOLATED_POLICY, "Another session is already active.")) CloseReason.Codes.VIOLATED_POLICY, "Another session is already active."))
} catch(e: Exception) { } catch(e: Exception) {
logger.warn("Error closing old session: ${e.message}") logger.warn("Error closing old session: ${e.message}")
e.printStackTrace()
} }
} }
} }
} }
suspend fun removeSession(uid: String) { suspend fun removeSession(uid: String) {
sessionMutex.withLock { sessionMutex.withLock {
sessions.remove(uid) sessions.remove(uid)
} }
} }
val ackMap = ConcurrentHashMap<String, CompletableDeferred<Boolean>>() suspend fun waitForAck(ws: WebSocketServerSession, expectedUid: String): Boolean {
val ackDeferred = CompletableDeferred<Boolean>()
ackMap[expectedUid] = ackDeferred
return try {
withTimeoutOrNull(5000L) { ackDeferred.await() } ?: false
} catch (e: CancellationException) {
false
} finally {
ackMap.remove(expectedUid)
}
}
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) { suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0 var attempt = 0
while (attempt < maxRetries) { var sentSuccessfully = false
while (attempt < maxRetries && !sentSuccessfully) {
val ws: WebSocketServerSession? = sessionMutex.withLock { sessions[uid] } val ws: WebSocketServerSession? = sessionMutex.withLock { sessions[uid] }
if (ws == null) { if (ws == null) {
logger.debug("No active session for $uid. Retrying in $delayMillis ms.") logger.debug("No active session for $uid. Retrying in $delayMillis ms.")
@ -87,13 +100,10 @@ fun Routing.wsSongListRoutes() {
} }
try { try {
ws.sendSerialized(res) ws.sendSerialized(res)
logger.debug("Message sent successfully to $uid on attempt $attempt") val ackReceived = waitForAck(ws, res.uid)
val ackDeferred = CompletableDeferred<Boolean>()
ackMap[res.uid] = ackDeferred
val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false
if (ackReceived) { if (ackReceived) {
logger.debug("ACK received for message to $uid on attempt $attempt.") logger.debug("ACK received for message to $uid on attempt $attempt.")
return sentSuccessfully = true
} else { } else {
logger.warn("ACK not received for message to $uid on attempt $attempt.") logger.warn("ACK not received for message to $uid on attempt $attempt.")
attempt++ attempt++
@ -104,16 +114,17 @@ fun Routing.wsSongListRoutes() {
attempt++ attempt++
logger.warn("Failed to send message to $uid on attempt $attempt: ${e.message}") logger.warn("Failed to send message to $uid on attempt $attempt: ${e.message}")
if (e is WebSocketException || e is IOException) { if (e is WebSocketException || e is IOException) {
logger.warn("Connection issue detected, session may be invalid")
removeSession(uid) removeSession(uid)
} }
} }
if (attempt < maxRetries) { if (!sentSuccessfully && attempt < maxRetries) {
delay(delayMillis) delay(delayMillis)
} }
} }
if (!sentSuccessfully) {
logger.error("Failed to send message to $uid after $maxRetries attempts.") logger.error("Failed to send message to $uid after $maxRetries attempts.")
} }
}
webSocket("/songlist") { webSocket("/songlist") {
val session = call.sessions.get<UserSession>() val session = call.sessions.get<UserSession>()
@ -126,6 +137,7 @@ fun Routing.wsSongListRoutes() {
addSession(uid, this) addSession(uid, this)
if (status[uid] == SongType.STREAM_OFF) { if (status[uid] == SongType.STREAM_OFF) {
songListScope.launch { songListScope.launch {
try {
sendSerialized(SongResponse( sendSerialized(SongResponse(
SongType.STREAM_OFF.value, SongType.STREAM_OFF.value,
uid, uid,
@ -133,11 +145,15 @@ fun Routing.wsSongListRoutes() {
null, null,
null, null,
)) ))
} } catch (e: Exception) {
logger.warn("Error sending STREAM_OFF: ${e.message}")
} finally {
removeSession(uid) removeSession(uid)
} }
}
songListScope.launch { return@webSocket
}
try {
for (frame in incoming) { for (frame in incoming) {
when (frame) { when (frame) {
is Frame.Text -> { is Frame.Text -> {
@ -148,7 +164,6 @@ fun Routing.wsSongListRoutes() {
val data = Json.decodeFromString<SongRequest>(text) val data = Json.decodeFromString<SongRequest>(text)
if (data.type == SongType.ACK.value) { if (data.type == SongType.ACK.value) {
ackMap[data.uid]?.complete(true) ackMap[data.uid]?.complete(true)
ackMap.remove(data.uid)
} else { } else {
handleSongRequest(data, user, dispatcher, logger) handleSongRequest(data, user, dispatcher, logger)
} }
@ -158,17 +173,13 @@ fun Routing.wsSongListRoutes() {
else -> {} else -> {}
} }
} }
}
try {
// Keep the connection alive
suspendCancellableCoroutine<Unit> {}
} catch (e: ClosedReceiveChannelException) { } catch (e: ClosedReceiveChannelException) {
logger.error("WebSocket connection closed: ${e.message}") logger.error("WebSocket connection closed: ${e.message}")
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Error in WebSocket: ${e.message}") logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
removeSession(uid) removeSession(uid)
ackMap.remove(uid)
} }
} }
@ -219,7 +230,7 @@ fun Routing.wsSongListRoutes() {
} }
// 노래 처리를 위한 Mutex 추가 // 노래 처리를 위한 Mutex 추가
private val songMutex = Mutex() private val songMutex = Mutex()
suspend fun handleSongRequest( fun handleSongRequest(
data: SongRequest, data: SongRequest,
user: User, user: User,
dispatcher: CoroutinesEventBus, dispatcher: CoroutinesEventBus,