Cancel route scope on application stop and simplify ACK handling.

Added a monitor to cancel the route scope when the application stops, ensuring proper resource cleanup. Removed the timeout logic in the ACK handling method, simplifying the flow while maintaining error handling.
This commit is contained in:
dalbodeule 2025-04-24 16:37:11 +09:00
parent 9c15c8f10d
commit d07cdb6ae8
No known key found for this signature in database
GPG Key ID: EFA860D069C9FA65
2 changed files with 22 additions and 19 deletions

View File

@ -16,7 +16,6 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.io.IOException import kotlinx.io.IOException
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -78,26 +77,23 @@ fun Routing.wsSongListRoutes() {
} }
suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean { suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean {
return withTimeoutOrNull(5000L) { // 5초 타임아웃
try { try {
for (frame in ws.incoming) { for (frame in ws.incoming) {
if (frame is Text) { if (frame is Text) {
val message = frame.readText() val message = frame.readText()
if (message == "ping") { if (message == "ping") {
return@withTimeoutOrNull true continue // Keep the loop running if a ping is received
} }
val data = Json.decodeFromString<SongRequest>(message) val data = Json.decodeFromString<SongRequest>(message)
if (data.type == SongType.ACK.value) { if (data.type == SongType.ACK.value) {
return@withTimeoutOrNull true // ACK 받음 return true // ACK received
} }
} }
} }
false // 채널이 닫힘
} catch (e: Exception) { } catch (e: Exception) {
logger.warn("Error waiting for ACK: ${e.message}") logger.warn("Error waiting for ACK: ${e.message}")
false
} }
} ?: false // 타임아웃 시 false 반환 return false // Return false if no ACK received
} }
@ -124,6 +120,7 @@ fun Routing.wsSongListRoutes() {
// ACK 대기 // ACK 대기
val ackReceived = waitForAck(ws, res.type) val ackReceived = waitForAck(ws, res.type)
if (ackReceived) { if (ackReceived) {
logger.debug("ACK received for message to $uid on attempt $attempt.")
sentSuccessfully = true sentSuccessfully = true
} else { } else {
logger.warn("ACK not received for message to $uid on attempt $attempt.") logger.warn("ACK not received for message to $uid on attempt $attempt.")

View File

@ -1,11 +1,13 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -21,6 +23,10 @@ import java.util.concurrent.ConcurrentLinkedQueue
val routeScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) val routeScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.wsSongRoutes() { fun Routing.wsSongRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
routeScope.cancel()
}
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val status = ConcurrentHashMap<String, SongType>() val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongRoutes") val logger = LoggerFactory.getLogger("WSSongRoutes")