From 9c15c8f10d737e02887f252c23a0ba1a5469abf9 Mon Sep 17 00:00:00 2001 From: dalbodeule <11470513+dalbodeule@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:26:25 +0900 Subject: [PATCH] Remove SongListWebSocketManager and simplify wsSongListRoutes The SongListWebSocketManager class and its associated logic were removed to streamline the codebase. The wsSongListRoutes function was updated accordingly to no longer require the manager as a parameter. --- .../space/mori/chzzk_bot/webserver/Main.kt | 3 +- .../utils/SongListWebSocketManager.kt | 271 ------------------ 2 files changed, 1 insertion(+), 273 deletions(-) delete mode 100644 webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/utils/SongListWebSocketManager.kt diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt index 0411a03..49e956c 100644 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt +++ b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/Main.kt @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.webserver.routes.* import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits -import space.mori.chzzk_bot.webserver.utils.SongListWebSocketManager import wsSongListRoutes import java.math.BigInteger import java.security.SecureRandom @@ -225,7 +224,7 @@ val server = embeddedServer(Netty, port = 8080, ) { wsTimerRoutes() wsSongRoutes() - wsSongListRoutes(SongListWebSocketManager(LoggerFactory.getLogger("wsSongListRoutes"))) + wsSongListRoutes() swaggerUI("swagger-ui/index.html", "openapi/documentation.yaml") { options { diff --git a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/utils/SongListWebSocketManager.kt b/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/utils/SongListWebSocketManager.kt deleted file mode 100644 index 57c9097..0000000 --- a/webserver/src/main/kotlin/space/mori/chzzk_bot/webserver/utils/SongListWebSocketManager.kt +++ /dev/null @@ -1,271 +0,0 @@ -package space.mori.chzzk_bot.webserver.utils - -import io.ktor.server.websocket.WebSocketServerSession -import io.ktor.util.logging.Logger -import io.ktor.websocket.CloseReason -import io.ktor.websocket.Frame -import io.ktor.websocket.close -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.channels.ClosedReceiveChannelException -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.serialization.json.Json -import space.mori.chzzk_bot.common.events.SongType -import space.mori.chzzk_bot.webserver.routes.SongResponse -import java.util.concurrent.ConcurrentHashMap -import kotlin.collections.set - -class SongListWebSocketManager(internal val logger: Logger) { - companion object { - private const val ACK_TIMEOUT_MS = 5000L - private const val ACK_CHECK_INTERVAL_MS = 100L - private const val MAX_RETRY_ATTEMPTS = 5 - private const val RETRY_DELAY_MS = 3000L - internal const val PING_MESSAGE = "ping" - internal const val PONG_MESSAGE = "pong" - private const val KEEP_ALIVE_INTERVAL_MS = 30000L // 30초 간격으로 핑-퐁 메시지 전송 - } - - private val sessions = ConcurrentHashMap() - private val status = ConcurrentHashMap() - private val lastActivity = ConcurrentHashMap() - private val pendingAcks = ConcurrentHashMap() - - // 핑-퐁 상태 관리를 위한 맵 - private val pingStatus = ConcurrentHashMap() - private val pingJobs = ConcurrentHashMap() - private val mutex = kotlinx.coroutines.sync.Mutex() - - init { - // 정기적으로 비활성 연결을 체크하는 백그라운드 작업 - CoroutineScope(Dispatchers.Default).launch { - while (true) { - try { - checkInactiveSessions() - delay(60000) // 1분마다 체크 - } catch (e: Exception) { - logger.error("비활성 세션 체크 중 오류 발생: ${e.message}") - } - } - } - } - - suspend fun addSession(uid: String, session: WebSocketServerSession) { - mutex.lock() - try { - sessions[uid] = session - lastActivity[uid] = System.currentTimeMillis() - - // 핑-퐁 작업 시작 - startPingPongJob(uid, session) - - logger.info("웹소켓 세션 추가됨: $uid") - } finally { - mutex.unlock() - } - } - - suspend fun removeSession(uid: String) { - mutex.lock() - try { - sessions.remove(uid) - status.remove(uid) - lastActivity.remove(uid) - pendingAcks.remove(uid) - - // 핑-퐁 작업 중지 - stopPingPongJob(uid) - - logger.info("웹소켓 세션 제거됨: $uid") - } finally { - mutex.unlock() - } - } - - private fun startPingPongJob(uid: String, session: WebSocketServerSession) { - pingStatus[uid] = true - pingJobs[uid] = CoroutineScope(Dispatchers.IO).launch { - while (pingStatus[uid] == true && sessions[uid] != null) { - try { - session.send(Frame.Text(PING_MESSAGE)) - logger.debug("핑 메시지 전송: $uid") - - // 응답 대기 (타임아웃 처리는 메시지 수신부에서) - mutex.lock() - try { - lastActivity[uid] = System.currentTimeMillis() - } finally { - mutex.unlock() - } - - delay(KEEP_ALIVE_INTERVAL_MS) - } catch (e: Exception) { - logger.error("핑-퐁 작업 중 오류 발생: ${e.message}") - - // 연결에 문제가 있으면 세션을 제거 - if (e is ClosedReceiveChannelException || e is java.io.IOException) { - logger.warn("연결 문제로 세션 제거: $uid") - removeSession(uid) - break - } - - delay(5000) // 오류 발생 시 잠시 대기 후 재시도 - } - } - } - } - - private fun stopPingPongJob(uid: String) { - pingStatus[uid] = false - pingJobs[uid]?.cancel() - pingJobs.remove(uid) - } - - // 비활성 세션 점검 및 제거 - private suspend fun checkInactiveSessions() { - val currentTime = System.currentTimeMillis() - val inactiveTimeout = 3 * KEEP_ALIVE_INTERVAL_MS // 3번의 핑-퐁 주기 이상 응답이 없으면 비활성으로 간주 - - val inactiveSessions = lastActivity.entries - .filter { currentTime - it.value > inactiveTimeout } - .map { it.key } - - mutex.lock() - try { - inactiveSessions.forEach { uid -> - logger.warn("비활성 세션 감지됨. 제거 중: $uid (마지막 활동: ${(currentTime - lastActivity[uid]!!) / 1000}초 전)") - - try { - sessions[uid]?.close(CloseReason(CloseReason.Codes.GOING_AWAY, "비활성 연결 감지")) - } catch (e: Exception) { - logger.error("비활성 세션 닫기 실패: ${e.message}") - } finally { - removeSession(uid) - } - } - } finally { - mutex.unlock() - } - - if (inactiveSessions.isNotEmpty()) { - logger.info("총 ${inactiveSessions.size}개의 비활성 세션이 제거됨") - } - } - - // 활동 기록 업데이트 - suspend fun updateActivity(uid: String) { - mutex.lock() - try { - lastActivity[uid] = System.currentTimeMillis() - } finally { - mutex.unlock() - } - } - - // 퐁 메시지 처리 - suspend fun handlePong(uid: String) { - updateActivity(uid) - logger.debug("퐁 메시지 수신: $uid") - } - - suspend fun waitForAck(ws: WebSocketServerSession): Boolean { - val sessionId = ws.hashCode().toString() - mutex.lock() - try { - pendingAcks[sessionId] = false - } finally { - mutex.unlock() - } - - val startTime = System.currentTimeMillis() - while (System.currentTimeMillis() - startTime < ACK_TIMEOUT_MS) { - mutex.lock() - try { - if (pendingAcks[sessionId] == true) { - pendingAcks.remove(sessionId) - return true - } - } finally { - mutex.unlock() - } - delay(ACK_CHECK_INTERVAL_MS) - } - - mutex.lock() - try { - pendingAcks.remove(sessionId) - } finally { - mutex.unlock() - } - return false - } - - suspend fun acknowledgeMessage(sessionId: String) { - mutex.lock() - try { - pendingAcks[sessionId] = true - } finally { - mutex.unlock() - } - } - - suspend fun sendWithRetry(uid: String, response: SongResponse) { - val session = sessions[uid] ?: run { - logger.warn("세션을 찾을 수 없음: $uid") - return - } - - var attempts = 0 - val jsonResponse = Json.encodeToString(SongResponse.serializer(), response) - - while (attempts < MAX_RETRY_ATTEMPTS) { - try { - session.send(Frame.Text(jsonResponse)) - updateActivity(uid) - - if (waitForAck(session)) { - logger.debug("메시지 전송 성공 (시도 ${attempts + 1}): $uid") - return - } else { - logger.warn("확인 응답 없음 (시도 ${attempts + 1}): $uid") - } - } catch (e: Exception) { - logger.error("메시지 전송 실패 (시도 ${attempts + 1}): ${e.message}") - - if (e is ClosedReceiveChannelException || e is java.io.IOException) { - logger.warn("연결 끊김으로 인한 세션 제거: $uid") - removeSession(uid) - return - } - } - - attempts++ - if (attempts < MAX_RETRY_ATTEMPTS) { - delay(RETRY_DELAY_MS) - } - } - - logger.error("최대 재시도 횟수 초과 후 메시지 전송 실패: $uid") - // 여러 번 실패 후 연결이 불안정한 것으로 판단하여 세션 제거 - removeSession(uid) - } - - suspend fun getUserStatus(uid: String): SongType? { - mutex.lock() - return try { - status[uid] - } finally { - mutex.unlock() - } - } - - suspend fun updateUserStatus(uid: String, songType: SongType) { - mutex.lock() - try { - status[uid] = songType - } finally { - mutex.unlock() - } - } -} \ No newline at end of file