fix wsSongListRoutes

- duplicated session is not permited.
- re-tx logic add
This commit is contained in:
dalbodeule 2024-08-19 17:24:42 +09:00
parent 7553e8be7e
commit e3c0266253
No known key found for this signature in database
GPG Key ID: EFA860D069C9FA65

View File

@ -7,6 +7,7 @@ import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -19,24 +20,53 @@ import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import space.mori.chzzk_bot.webserver.UserSession import space.mori.chzzk_bot.webserver.UserSession
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
fun Routing.wsSongListRoutes() { fun Routing.wsSongListRoutes() {
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, WebSocketServerSession>()
val status = ConcurrentHashMap<String, SongType>() val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongListRoutes") val logger = LoggerFactory.getLogger("WSSongListRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) if(sessions[uid] != null) {
CoroutineScope(Dispatchers.Default).launch {
sessions[uid]?.close(
CloseReason(CloseReason.Codes.VIOLATED_POLICY,
"Duplicated sessions."
))
}
}
sessions[uid] = session
} }
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String) {
sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) {
sessions.remove(uid) sessions.remove(uid)
} }
suspend fun sendWithRetry(ws: WebSocketServerSession, res: SongResponse, maxRetries: Int, delayMillis: Long = 3000L) {
var attempt = 0
var sentSuccessfully = false
while (attempt < maxRetries && !sentSuccessfully) {
try {
// Attempt to send the message
ws.sendSerialized(res)
sentSuccessfully = true // If no exception, mark as sent successfully
logger.debug("Message sent successfully on attempt $attempt")
} catch (e: Exception) {
attempt++
logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn(e.stackTraceToString())
// Wait before retrying
delay(delayMillis)
}
}
if (!sentSuccessfully) {
logger.error("Failed to send message after $maxRetries attempts.")
}
} }
webSocket("/songlist") { webSocket("/songlist") {
@ -63,7 +93,7 @@ fun Routing.wsSongListRoutes() {
null, null,
)) ))
} }
removeSession(uid, this) removeSession(uid)
} }
try { try {
@ -164,7 +194,7 @@ fun Routing.wsSongListRoutes() {
} catch(e: ClosedReceiveChannelException) { } catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
removeSession(uid, this) removeSession(uid)
} }
} }
@ -173,9 +203,8 @@ fun Routing.wsSongListRoutes() {
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid) val user = UserService.getUser(it.uid)
if(user != null) { if(user != null) {
sessions[user.token ?: ""]?.forEach { ws -> sessions[user.token ?: ""]?.let { ws ->
ws.sendSerialized( sendWithRetry(ws, SongResponse(
SongResponse(
it.type.value, it.type.value,
it.uid, it.uid,
it.reqUid, it.reqUid,
@ -183,8 +212,7 @@ fun Routing.wsSongListRoutes() {
it.author, it.author,
it.time, it.time,
it.url it.url
) ), 3)
)
} }
} }
} }
@ -194,9 +222,8 @@ fun Routing.wsSongListRoutes() {
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid) val user = UserService.getUser(it.uid)
if(user != null) { if(user != null) {
sessions[user.token]?.forEach { ws -> sessions[user.token ?: ""]?.let { ws ->
ws.sendSerialized( sendWithRetry(ws, SongResponse(
SongResponse(
it.type.value, it.type.value,
it.uid, it.uid,
null, null,
@ -204,9 +231,8 @@ fun Routing.wsSongListRoutes() {
null, null,
null, null,
null null
) ), 3)
) removeSession(user.token ?: "")
removeSession(user.token ?: "", ws)
} }
} }
} }