Merge pull request #129 from dalbodeule/develop

[feature] song list websocket service fixed.
This commit is contained in:
JinU Choi 2025-05-18 08:56:41 +09:00 committed by GitHub
commit 2c0c887ba1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -37,9 +37,10 @@ fun Routing.wsSongRoutes() {
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
} }
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session) sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) { if (sessions[uid]?.isEmpty() == true) {
sessions.remove(uid) sessions.remove(uid)
} }
} }
@ -88,78 +89,111 @@ fun Routing.wsSongRoutes() {
} }
webSocket("/song/{uid}") { webSocket("/song/{uid}") {
logger.info("WebSocket connection attempt received")
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) } val user = uid?.let { UserService.getUser(it) }
if (uid == null || user == null) { if (uid == null || user == null) {
logger.warn("Invalid UID: $uid")
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket return@webSocket
} }
addSession(uid, this)
if(status[uid] == SongType.STREAM_OFF) {
songScope.launch {
sendSerialized(SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
))
}
}
try { try {
for (frame in incoming) { addSession(uid, this)
when(frame) { logger.info("WebSocket connection established for user: $uid")
is Frame.Text -> {
val text = frame.readText().trim() // Start heartbeat
if(text == "ping") { val heartbeatJob = songScope.launch {
send("pong") while (true) {
} else { try {
val data = Json.decodeFromString<SongRequest>(text) send(Frame.Ping(ByteArray(0)))
if (data.type == SongType.ACK.value) { delay(30000) // 30 seconds
ackMap[data.uid]?.get(this)?.complete(true) } catch (e: Exception) {
ackMap[data.uid]?.remove(this) logger.error("Heartbeat failed for user $uid", e)
} break
}
} }
is Frame.Ping -> send(Frame.Pong(frame.data))
else -> {}
} }
} }
} catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") if (status[uid] == SongType.STREAM_OFF) {
} finally { songScope.launch {
removeSession(uid, this) sendSerialized(
ackMap[uid]?.remove(this) SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
)
)
}
}
try {
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText().trim()
if (text == "ping") {
send("pong")
} else {
val data = Json.decodeFromString<SongRequest>(text)
if (data.type == SongType.ACK.value) {
ackMap[data.uid]?.get(this)?.complete(true)
ackMap[data.uid]?.remove(this)
}
}
}
is Frame.Ping -> send(Frame.Pong(frame.data))
else -> {}
}
}
} catch (e: ClosedReceiveChannelException) {
logger.error("WebSocket connection closed for user $uid: ${e.message}")
} catch (e: Exception) {
logger.error("Unexpected error in WebSocket for user $uid", e)
} finally {
logger.info("Cleaning up WebSocket connection for user $uid")
removeSession(uid, this)
ackMap[uid]?.remove(this)
heartbeatJob.cancel()
}
} catch(e: Exception) {
logger.error("Unexpected error in WebSocket for user $uid", e)
} }
} }
dispatcher.subscribe(SongEvent::class) { dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
songScope.launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(
it.type.value, it.uid, SongResponse(
it.uid, it.type.value,
it.reqUid, it.uid,
it.current?.toSerializable(), it.reqUid,
it.next?.toSerializable(), it.current?.toSerializable(),
it.delUrl it.next?.toSerializable(),
)) it.delUrl
)
)
} }
} }
dispatcher.subscribe(TimerEvent::class) { dispatcher.subscribe(TimerEvent::class) {
if(it.type == TimerType.STREAM_OFF) { if (it.type == TimerType.STREAM_OFF) {
songScope.launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(
it.type.value, it.uid, SongResponse(
it.uid, it.type.value,
null, it.uid,
null, null,
null, null,
)) null,
)
)
} }
} }
} }
} }
@Serializable @Serializable
data class SerializableYoutubeVideo( data class SerializableYoutubeVideo(
val url: String, val url: String,