Merge pull request #108 from dalbodeule/develop

fix wsSongListRoutes
This commit is contained in:
JinU Choi 2024-08-19 17:27:29 +09:00 committed by GitHub
commit 3972530c79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,6 +7,7 @@ import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
@ -19,23 +20,52 @@ import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import space.mori.chzzk_bot.webserver.UserSession
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
fun Routing.wsSongListRoutes() {
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val sessions = ConcurrentHashMap<String, WebSocketServerSession>()
val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongListRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
if(sessions[uid] != null) {
CoroutineScope(Dispatchers.Default).launch {
sessions[uid]?.close(
CloseReason(CloseReason.Codes.VIOLATED_POLICY,
"Duplicated sessions."
))
}
}
sessions[uid] = session
}
fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) {
sessions.remove(uid)
fun removeSession(uid: String) {
sessions.remove(uid)
}
suspend fun sendWithRetry(ws: WebSocketServerSession, res: SongResponse, maxRetries: Int, delayMillis: Long = 3000L) {
var attempt = 0
var sentSuccessfully = false
while (attempt < maxRetries && !sentSuccessfully) {
try {
// Attempt to send the message
ws.sendSerialized(res)
sentSuccessfully = true // If no exception, mark as sent successfully
logger.debug("Message sent successfully on attempt $attempt")
} catch (e: Exception) {
attempt++
logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn(e.stackTraceToString())
// Wait before retrying
delay(delayMillis)
}
}
if (!sentSuccessfully) {
logger.error("Failed to send message after $maxRetries attempts.")
}
}
@ -63,7 +93,7 @@ fun Routing.wsSongListRoutes() {
null,
))
}
removeSession(uid, this)
removeSession(uid)
}
try {
@ -164,7 +194,7 @@ fun Routing.wsSongListRoutes() {
} catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}")
} finally {
removeSession(uid, this)
removeSession(uid)
}
}
@ -173,18 +203,16 @@ fun Routing.wsSongListRoutes() {
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if(user != null) {
sessions[user.token ?: ""]?.forEach { ws ->
ws.sendSerialized(
SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.name,
it.author,
it.time,
it.url
)
)
sessions[user.token ?: ""]?.let { ws ->
sendWithRetry(ws, SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.name,
it.author,
it.time,
it.url
), 3)
}
}
}
@ -194,9 +222,8 @@ fun Routing.wsSongListRoutes() {
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if(user != null) {
sessions[user.token]?.forEach { ws ->
ws.sendSerialized(
SongResponse(
sessions[user.token ?: ""]?.let { ws ->
sendWithRetry(ws, SongResponse(
it.type.value,
it.uid,
null,
@ -204,9 +231,8 @@ fun Routing.wsSongListRoutes() {
null,
null,
null
)
)
removeSession(user.token ?: "", ws)
), 3)
removeSession(user.token ?: "")
}
}
}