Merge pull request #21 from dalbodeule/develop

in WSTimerRoutes.kt, send data two or many session on one uid.
This commit is contained in:
JinU Choi 2024-08-02 15:44:35 +09:00 committed by GitHub
commit 65fb2ac3e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,13 +14,25 @@ import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
val logger: Logger = LoggerFactory.getLogger("WSTimerRoutes") val logger: Logger = LoggerFactory.getLogger("WSTimerRoutes")
fun Routing.wsTimerRoutes() { fun Routing.wsTimerRoutes() {
val sessions = ConcurrentHashMap<String, WebSocketServerSession>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val status = ConcurrentHashMap<String, TimerType>() val status = ConcurrentHashMap<String, TimerType>()
fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
}
fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) {
sessions.remove(uid)
}
}
webSocket("/timer/{uid}") { webSocket("/timer/{uid}") {
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) } val user = uid?.let { UserService.getUser(it) }
@ -33,7 +45,8 @@ fun Routing.wsTimerRoutes() {
return@webSocket return@webSocket
} }
sessions[uid] = this addSession(uid, this)
if(status[uid] == TimerType.STREAM_OFF) { if(status[uid] == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, "")) sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, ""))
@ -52,10 +65,10 @@ fun Routing.wsTimerRoutes() {
} }
} }
} }
} catch(_: ClosedReceiveChannelException) { } catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
sessions.remove(uid) removeSession(uid, this)
} }
} }
@ -65,7 +78,9 @@ fun Routing.wsTimerRoutes() {
logger.debug("TimerEvent: {} / {}", it.uid, it.type) logger.debug("TimerEvent: {} / {}", it.uid, it.type)
status[it.uid] = it.type status[it.uid] = it.type
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
sessions[it.uid]?.sendSerialized(TimerResponse(it.type.value, it.time ?: "")) sessions[it.uid]?.forEach { ws ->
ws.sendSerialized(TimerResponse(it.type.value, it.time ?: ""))
}
} }
} }
} }