From e4a2d28b3c1c371d6f231d3f2824c76f50f8241a Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Fri, 2 Aug 2024 15:42:07 +0900 Subject: [PATCH] in WSTimerRoutes.kt, send data two or many session on one uid. --- .../webserver/routes/WSTimerRoutes.kt | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSTimerRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSTimerRoutes.kt index 1d7c3cb..215df6a 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSTimerRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSTimerRoutes.kt @@ -14,13 +14,25 @@ import org.slf4j.LoggerFactory import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.services.UserService import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue val logger: Logger = LoggerFactory.getLogger("WSTimerRoutes") fun Routing.wsTimerRoutes() { - val sessions = ConcurrentHashMap() + val sessions = ConcurrentHashMap>() val status = ConcurrentHashMap() + fun addSession(uid: String, session: WebSocketServerSession) { + sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) + } + + fun removeSession(uid: String, session: WebSocketServerSession) { + sessions[uid]?.remove(session) + if(sessions[uid]?.isEmpty() == true) { + sessions.remove(uid) + } + } + webSocket("/timer/{uid}") { val uid = call.parameters["uid"] val user = uid?.let { UserService.getUser(it) } @@ -33,7 +45,8 @@ fun Routing.wsTimerRoutes() { return@webSocket } - sessions[uid] = this + addSession(uid, this) + if(status[uid] == TimerType.STREAM_OFF) { CoroutineScope(Dispatchers.Default).launch { sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, "")) @@ -52,10 +65,10 @@ fun Routing.wsTimerRoutes() { } } } - } catch(_: ClosedReceiveChannelException) { - + } catch(e: ClosedReceiveChannelException) { + logger.error("Error in WebSocket: ${e.message}") } finally { - sessions.remove(uid) + removeSession(uid, this) } } @@ -65,7 +78,9 @@ fun Routing.wsTimerRoutes() { logger.debug("TimerEvent: {} / {}", it.uid, it.type) status[it.uid] = it.type CoroutineScope(Dispatchers.Default).launch { - sessions[it.uid]?.sendSerialized(TimerResponse(it.type.value, it.time ?: "")) + sessions[it.uid]?.forEach { ws -> + ws.sendSerialized(TimerResponse(it.type.value, it.time ?: "")) + } } } }