diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt index cecac0d..9c36651 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/routes/WSSongRoutes.kt @@ -6,6 +6,7 @@ import io.ktor.websocket.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.serialization.Serializable import org.koin.java.KoinJavaComponent.inject @@ -31,6 +32,41 @@ fun Routing.wsSongRoutes() { } } + suspend fun sendWithRetry( + session: WebSocketServerSession, + message: SongResponse, + maxRetries: Int = 3, + delayMillis: Long = 2000L + ): Boolean { + var attempt = 0 + while (attempt < maxRetries) { + try { + session.sendSerialized(message) // 메시지 전송 시도 + return true // 성공하면 true 반환 + } catch (e: Exception) { + attempt++ + logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.") + e.printStackTrace() + delay(delayMillis) // 재시도 전 대기 + } + } + return false // 재시도 실패 시 false 반환 + } + + fun broadcastMessage(userId: String, message: SongResponse) { + val userSessions = sessions[userId] + + userSessions?.forEach { session -> + CoroutineScope(Dispatchers.Default).launch { + val success = sendWithRetry(session, message) + if (!success) { + println("Removing session for user $userId due to repeated failures.") + userSessions.remove(session) // 실패 시 세션 제거 + } + } + } + } + webSocket("/song/{uid}") { val uid = call.parameters["uid"] val user = uid?.let { UserService.getUser(it) } @@ -85,33 +121,29 @@ fun Routing.wsSongRoutes() { dispatcher.subscribe(SongEvent::class) { logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.name) CoroutineScope(Dispatchers.Default).launch { - sessions[it.uid]?.forEach { ws -> - ws.sendSerialized(SongResponse( - it.type.value, - it.uid, - it.reqUid, - it.name, - it.author, - it.time, - it.url - )) - } + broadcastMessage(it.uid, SongResponse( + it.type.value, + it.uid, + it.reqUid, + it.name, + it.author, + it.time, + it.url + )) } } dispatcher.subscribe(TimerEvent::class) { if(it.type == TimerType.STREAM_OFF) { CoroutineScope(Dispatchers.Default).launch { - sessions[it.uid]?.forEach { ws -> - ws.sendSerialized(SongResponse( - it.type.value, - it.uid, - null, - null, - null, - null, - null - )) - } + broadcastMessage(it.uid, SongResponse( + it.type.value, + it.uid, + null, + null, + null, + null, + null + )) } } }