Merge pull request #109 from dalbodeule/develop

fix wsSongRoutes
This commit is contained in:
JinU Choi 2024-08-20 11:31:48 +09:00 committed by GitHub
commit e855566193
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -6,6 +6,7 @@ import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import org.koin.java.KoinJavaComponent.inject
@ -31,6 +32,41 @@ fun Routing.wsSongRoutes() {
}
}
suspend fun sendWithRetry(
session: WebSocketServerSession,
message: SongResponse,
maxRetries: Int = 3,
delayMillis: Long = 2000L
): Boolean {
var attempt = 0
while (attempt < maxRetries) {
try {
session.sendSerialized(message) // 메시지 전송 시도
return true // 성공하면 true 반환
} catch (e: Exception) {
attempt++
logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
e.printStackTrace()
delay(delayMillis) // 재시도 전 대기
}
}
return false // 재시도 실패 시 false 반환
}
fun broadcastMessage(userId: String, message: SongResponse) {
val userSessions = sessions[userId]
userSessions?.forEach { session ->
CoroutineScope(Dispatchers.Default).launch {
val success = sendWithRetry(session, message)
if (!success) {
println("Removing session for user $userId due to repeated failures.")
userSessions.remove(session) // 실패 시 세션 제거
}
}
}
}
webSocket("/song/{uid}") {
val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) }
@ -85,8 +121,7 @@ fun Routing.wsSongRoutes() {
dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.name)
CoroutineScope(Dispatchers.Default).launch {
sessions[it.uid]?.forEach { ws ->
ws.sendSerialized(SongResponse(
broadcastMessage(it.uid, SongResponse(
it.type.value,
it.uid,
it.reqUid,
@ -97,12 +132,10 @@ fun Routing.wsSongRoutes() {
))
}
}
}
dispatcher.subscribe(TimerEvent::class) {
if(it.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch {
sessions[it.uid]?.forEach { ws ->
ws.sendSerialized(SongResponse(
broadcastMessage(it.uid, SongResponse(
it.type.value,
it.uid,
null,
@ -115,7 +148,6 @@ fun Routing.wsSongRoutes() {
}
}
}
}
@Serializable
data class SongResponse(