Merge pull request #118 from dalbodeule/debug

debug on WSSongListRoutes.kt
This commit is contained in:
JinU Choi 2024-09-24 11:28:58 +09:00 committed by GitHub
commit 99686496b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 162 additions and 121 deletions

View File

@ -7,7 +7,8 @@ enum class SongType(var value: Int) {
REMOVE(1),
NEXT(2),
STREAM_OFF(50)
STREAM_OFF(50),
ACK(51)
}
class SongEvent(

View File

@ -12,7 +12,7 @@ object SongLists: IntIdTable("song_list") {
val uid = varchar("uid", 64)
val url = varchar("url", 128)
val name = text("name")
val reqName = varchar("req_name", 20)
val reqName = varchar("req_name", 80)
val author = text("author")
val time = integer("time")
val created_at = datetime("created_at").default(LocalDateTime.now())

View File

@ -24,9 +24,8 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.webserver.routes.*
import space.mori.chzzk_bot.webserver.utils.DiscordGuildCache
import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits
import space.mori.chzzk_bot.webserver.utils.Guild
import wsSongListRoutes
import java.time.Duration
val dotenv = dotenv {

View File

@ -69,8 +69,10 @@ fun Routing.apiCommandRoutes() {
commandRequest.failContent ?: ""
)
CoroutineScope(Dispatchers.Default).launch {
for(i: Int in 0..3) {
dispatcher.post(CommandReloadEvent(user.token ?: ""))
}
}
call.respond(HttpStatusCode.OK)
}
@ -104,8 +106,10 @@ fun Routing.apiCommandRoutes() {
commandRequest.failContent ?: ""
)
CoroutineScope(Dispatchers.Default).launch {
for(i: Int in 0..3) {
dispatcher.post(CommandReloadEvent(user.token ?: ""))
}
}
call.respond(HttpStatusCode.OK)
} catch(e: Exception) {
call.respond(HttpStatusCode.BadRequest)
@ -137,8 +141,10 @@ fun Routing.apiCommandRoutes() {
try {
CommandService.removeCommand(user, commandRequest.label)
CoroutineScope(Dispatchers.Default).launch {
for(i: Int in 0..3) {
dispatcher.post(CommandReloadEvent(user.token ?: ""))
}
}
call.respond(HttpStatusCode.OK)
} catch(e: Exception) {
call.respond(HttpStatusCode.BadRequest)

View File

@ -1,9 +1,9 @@
package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.routing.*
import io.ktor.server.sessions.*
import io.ktor.server.websocket.*
import io.ktor.util.logging.Logger
import io.ktor.websocket.*
import io.ktor.websocket.Frame.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException
@ -15,12 +15,16 @@ import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.*
import space.mori.chzzk_bot.common.models.SongList
import space.mori.chzzk_bot.common.models.SongLists.uid
import space.mori.chzzk_bot.common.models.User
import space.mori.chzzk_bot.common.services.SongConfigService
import space.mori.chzzk_bot.common.services.SongListService
import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.YoutubeVideo
import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import space.mori.chzzk_bot.webserver.UserSession
import space.mori.chzzk_bot.webserver.routes.SongResponse
import space.mori.chzzk_bot.webserver.routes.toSerializable
import space.mori.chzzk_bot.webserver.utils.CurrentSong
import java.util.concurrent.ConcurrentHashMap
@ -35,9 +39,8 @@ fun Routing.wsSongListRoutes() {
if (sessions[uid] != null) {
CoroutineScope(Dispatchers.Default).launch {
sessions[uid]?.close(
CloseReason(CloseReason.Codes.VIOLATED_POLICY,
"Duplicated sessions."
))
CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Duplicated sessions.")
)
}
}
sessions[uid] = session
@ -47,6 +50,24 @@ fun Routing.wsSongListRoutes() {
sessions.remove(uid)
}
suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean {
val timeout = 5000L // 5 seconds timeout
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < timeout) {
for (frame in ws.incoming) {
if (frame is Text) {
val message = frame.readText()
val data = Json.decodeFromString<SongRequest>(message)
if (data.type == SongType.ACK.value) {
return true // ACK received
}
}
}
delay(100) // Check every 100 ms
}
return false // Timeoutㅌ
}
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0
var sentSuccessfully = false
@ -56,11 +77,17 @@ fun Routing.wsSongListRoutes() {
try {
// Attempt to send the message
ws?.sendSerialized(res)
sentSuccessfully = true // If no exception, mark as sent successfully
logger.debug("Message sent successfully on attempt $attempt")
logger.debug("Message sent successfully to $uid on attempt $attempt")
// Wait for ACK
val ackReceived = waitForAck(ws!!, res.type)
if (ackReceived) {
sentSuccessfully = true
} else {
logger.warn("ACK not received for message to $uid on attempt $attempt.")
}
} catch (e: Exception) {
attempt++
logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn("Failed to send message to $uid on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn(e.stackTraceToString())
// Wait before retrying
@ -69,7 +96,7 @@ fun Routing.wsSongListRoutes() {
}
if (!sentSuccessfully) {
logger.error("Failed to send message after $maxRetries attempts.")
logger.error("Failed to send message to $uid after $maxRetries attempts.")
}
}
@ -101,35 +128,92 @@ fun Routing.wsSongListRoutes() {
try {
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
is Text -> {
if (frame.readText() == "ping") {
send("pong")
} else {
val data = frame.readText().let { Json.decodeFromString<SongRequest>(it) }
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(
user,
data.maxQueue
// Handle song requests
handleSongRequest(data, user, dispatcher, logger)
}
}
is Ping -> send(Pong(frame.data))
else -> ""
}
}
} catch (e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}")
} finally {
removeSession(uid)
}
}
dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if (user != null) {
user.token?.let { token ->
sendWithRetry(
token, SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.current?.toSerializable(),
it.next?.toSerializable(),
it.delUrl
)
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(
user,
data.maxUserLimit
)
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(
user,
data.isStreamerOnly
}
}
}
}
dispatcher.subscribe(TimerEvent::class) {
if (it.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if (user != null) {
user.token?.let { token ->
sendWithRetry(
token, SongResponse(
it.type.value,
it.uid,
null,
null,
null,
)
)
}
}
}
}
}
}
suspend fun handleSongRequest(
data: SongRequest,
user: User,
dispatcher: CoroutinesEventBus,
logger: Logger
) {
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(user, data.maxQueue)
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(user, data.maxUserLimit)
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(user, data.isStreamerOnly)
if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled)
if (data.type == SongType.ADD.value && data.url != null) {
when (data.type) {
SongType.ADD.value -> {
data.url?.let { url ->
try {
val youtubeVideo = getYoutubeVideo(data.url)
val youtubeVideo = getYoutubeVideo(url)
if (youtubeVideo != null) {
CoroutineScope(Dispatchers.Default).launch {
SongListService.saveSong(
user,
user.token!!,
data.url,
url,
youtubeVideo.name,
youtubeVideo.author,
youtubeVideo.length,
@ -149,14 +233,15 @@ fun Routing.wsSongListRoutes() {
} catch (e: Exception) {
logger.debug("SongType.ADD Error: $uid $e")
}
} else if (data.type == SongType.REMOVE.value && data.url != null) {
}
}
SongType.REMOVE.value -> {
data.url?.let { url ->
val songs = SongListService.getSong(user)
val exactSong = songs.firstOrNull { it.url == data.url }
val exactSong = songs.firstOrNull { it.url == url }
if (exactSong != null) {
SongListService.deleteSong(user, exactSong.uid, exactSong.name)
}
dispatcher.post(
SongEvent(
user.token!!,
@ -164,10 +249,12 @@ fun Routing.wsSongListRoutes() {
null,
null,
null,
data.url
url
)
)
} else if (data.type == SongType.NEXT.value) {
}
}
SongType.NEXT.value -> {
val songList = SongListService.getSong(user)
var song: SongList? = null
var youtubeVideo: YoutubeVideo? = null
@ -198,58 +285,6 @@ fun Routing.wsSongListRoutes() {
}
}
}
is Frame.Ping -> send(Frame.Pong(frame.data))
else -> {
}
}
}
} catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}")
} finally {
removeSession(uid)
}
}
dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if(user != null) {
user.token?.let { token ->
sendWithRetry(
token, SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.current?.toSerializable(),
it.next?.toSerializable(),
it.delUrl
))
}
}
}
}
dispatcher.subscribe(TimerEvent::class) {
if(it.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch {
val user = UserService.getUser(it.uid)
if(user != null) {
user.token?.let { token ->
sendWithRetry(
token, SongResponse(
it.type.value,
it.uid,
null,
null,
null,
))
}
}
}
}
}
}
@Serializable
data class SongRequest(