From 5223cbe2b234652a4341109051fa0bc26bd8d3bd Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Sun, 18 May 2025 08:55:01 +0900 Subject: [PATCH] [feature] song list websocket service fixed. --- .../webserver/routes/WSSongRoutes.kt | 132 +++++++++++------- 1 file changed, 83 insertions(+), 49 deletions(-) diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt index 47632b7..b4f98fa 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt @@ -37,9 +37,10 @@ fun Routing.wsSongRoutes() { fun addSession(uid: String, session: WebSocketServerSession) { sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) } + fun removeSession(uid: String, session: WebSocketServerSession) { sessions[uid]?.remove(session) - if(sessions[uid]?.isEmpty() == true) { + if (sessions[uid]?.isEmpty() == true) { sessions.remove(uid) } } @@ -88,78 +89,111 @@ fun Routing.wsSongRoutes() { } webSocket("/song/{uid}") { + logger.info("WebSocket connection attempt received") val uid = call.parameters["uid"] val user = uid?.let { UserService.getUser(it) } if (uid == null || user == null) { + logger.warn("Invalid UID: $uid") close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) return@webSocket } - addSession(uid, this) - if(status[uid] == SongType.STREAM_OFF) { - songScope.launch { - sendSerialized(SongResponse( - SongType.STREAM_OFF.value, - uid, - null, - null, - null, - )) - } - } try { - for (frame in incoming) { - when(frame) { - is Frame.Text -> { - val text = frame.readText().trim() - if(text == "ping") { - send("pong") - } else { - val data = Json.decodeFromString(text) - if (data.type == SongType.ACK.value) { - ackMap[data.uid]?.get(this)?.complete(true) - ackMap[data.uid]?.remove(this) - } - } + addSession(uid, this) + logger.info("WebSocket connection established for user: $uid") + + // Start heartbeat + val heartbeatJob = songScope.launch { + while (true) { + try { + send(Frame.Ping(ByteArray(0))) + delay(30000) // 30 seconds + } catch (e: Exception) { + logger.error("Heartbeat failed for user $uid", e) + break } - is Frame.Ping -> send(Frame.Pong(frame.data)) - else -> {} } } - } catch(e: ClosedReceiveChannelException) { - logger.error("Error in WebSocket: ${e.message}") - } finally { - removeSession(uid, this) - ackMap[uid]?.remove(this) + + if (status[uid] == SongType.STREAM_OFF) { + songScope.launch { + sendSerialized( + SongResponse( + SongType.STREAM_OFF.value, + uid, + null, + null, + null, + ) + ) + } + } + try { + for (frame in incoming) { + when (frame) { + is Frame.Text -> { + val text = frame.readText().trim() + if (text == "ping") { + send("pong") + } else { + val data = Json.decodeFromString(text) + if (data.type == SongType.ACK.value) { + ackMap[data.uid]?.get(this)?.complete(true) + ackMap[data.uid]?.remove(this) + } + } + } + + is Frame.Ping -> send(Frame.Pong(frame.data)) + else -> {} + } + } + } catch (e: ClosedReceiveChannelException) { + logger.error("WebSocket connection closed for user $uid: ${e.message}") + } catch (e: Exception) { + logger.error("Unexpected error in WebSocket for user $uid", e) + } finally { + logger.info("Cleaning up WebSocket connection for user $uid") + removeSession(uid, this) + ackMap[uid]?.remove(this) + heartbeatJob.cancel() + } + } catch(e: Exception) { + logger.error("Unexpected error in WebSocket for user $uid", e) } } dispatcher.subscribe(SongEvent::class) { logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) songScope.launch { - broadcastMessage(it.uid, SongResponse( - it.type.value, - it.uid, - it.reqUid, - it.current?.toSerializable(), - it.next?.toSerializable(), - it.delUrl - )) + broadcastMessage( + it.uid, SongResponse( + it.type.value, + it.uid, + it.reqUid, + it.current?.toSerializable(), + it.next?.toSerializable(), + it.delUrl + ) + ) } } dispatcher.subscribe(TimerEvent::class) { - if(it.type == TimerType.STREAM_OFF) { + if (it.type == TimerType.STREAM_OFF) { songScope.launch { - broadcastMessage(it.uid, SongResponse( - it.type.value, - it.uid, - null, - null, - null, - )) + broadcastMessage( + it.uid, SongResponse( + it.type.value, + it.uid, + null, + null, + null, + ) + ) } } } } + @Serializable data class SerializableYoutubeVideo( val url: String,