mirror of
https://github.com/dalbodeule/chibot-chzzk-bot.git
synced 2025-06-09 07:18:22 +00:00
debug on WSSongListRoutes.kt
- add re-tx logics - add ACK enum value - else other improve
This commit is contained in:
parent
3e0246771e
commit
94e226fcab
@ -7,7 +7,8 @@ enum class SongType(var value: Int) {
|
|||||||
REMOVE(1),
|
REMOVE(1),
|
||||||
NEXT(2),
|
NEXT(2),
|
||||||
|
|
||||||
STREAM_OFF(50)
|
STREAM_OFF(50),
|
||||||
|
ACK(51)
|
||||||
}
|
}
|
||||||
|
|
||||||
class SongEvent(
|
class SongEvent(
|
||||||
|
@ -24,9 +24,8 @@ import kotlinx.serialization.Serializable
|
|||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import space.mori.chzzk_bot.common.services.UserService
|
import space.mori.chzzk_bot.common.services.UserService
|
||||||
import space.mori.chzzk_bot.webserver.routes.*
|
import space.mori.chzzk_bot.webserver.routes.*
|
||||||
import space.mori.chzzk_bot.webserver.utils.DiscordGuildCache
|
|
||||||
import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits
|
import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits
|
||||||
import space.mori.chzzk_bot.webserver.utils.Guild
|
import wsSongListRoutes
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
val dotenv = dotenv {
|
val dotenv = dotenv {
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
package space.mori.chzzk_bot.webserver.routes
|
|
||||||
|
|
||||||
import io.ktor.server.routing.*
|
import io.ktor.server.routing.*
|
||||||
import io.ktor.server.sessions.*
|
import io.ktor.server.sessions.*
|
||||||
import io.ktor.server.websocket.*
|
import io.ktor.server.websocket.*
|
||||||
|
import io.ktor.util.logging.Logger
|
||||||
import io.ktor.websocket.*
|
import io.ktor.websocket.*
|
||||||
|
import io.ktor.websocket.Frame.*
|
||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
import kotlinx.coroutines.channels.ClosedReceiveChannelException
|
||||||
@ -15,12 +15,16 @@ import org.koin.java.KoinJavaComponent.inject
|
|||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import space.mori.chzzk_bot.common.events.*
|
import space.mori.chzzk_bot.common.events.*
|
||||||
import space.mori.chzzk_bot.common.models.SongList
|
import space.mori.chzzk_bot.common.models.SongList
|
||||||
|
import space.mori.chzzk_bot.common.models.SongLists.uid
|
||||||
|
import space.mori.chzzk_bot.common.models.User
|
||||||
import space.mori.chzzk_bot.common.services.SongConfigService
|
import space.mori.chzzk_bot.common.services.SongConfigService
|
||||||
import space.mori.chzzk_bot.common.services.SongListService
|
import space.mori.chzzk_bot.common.services.SongListService
|
||||||
import space.mori.chzzk_bot.common.services.UserService
|
import space.mori.chzzk_bot.common.services.UserService
|
||||||
import space.mori.chzzk_bot.common.utils.YoutubeVideo
|
import space.mori.chzzk_bot.common.utils.YoutubeVideo
|
||||||
import space.mori.chzzk_bot.common.utils.getYoutubeVideo
|
import space.mori.chzzk_bot.common.utils.getYoutubeVideo
|
||||||
import space.mori.chzzk_bot.webserver.UserSession
|
import space.mori.chzzk_bot.webserver.UserSession
|
||||||
|
import space.mori.chzzk_bot.webserver.routes.SongResponse
|
||||||
|
import space.mori.chzzk_bot.webserver.routes.toSerializable
|
||||||
import space.mori.chzzk_bot.webserver.utils.CurrentSong
|
import space.mori.chzzk_bot.webserver.utils.CurrentSong
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
@ -35,9 +39,8 @@ fun Routing.wsSongListRoutes() {
|
|||||||
if (sessions[uid] != null) {
|
if (sessions[uid] != null) {
|
||||||
CoroutineScope(Dispatchers.Default).launch {
|
CoroutineScope(Dispatchers.Default).launch {
|
||||||
sessions[uid]?.close(
|
sessions[uid]?.close(
|
||||||
CloseReason(CloseReason.Codes.VIOLATED_POLICY,
|
CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Duplicated sessions.")
|
||||||
"Duplicated sessions."
|
)
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sessions[uid] = session
|
sessions[uid] = session
|
||||||
@ -47,6 +50,24 @@ fun Routing.wsSongListRoutes() {
|
|||||||
sessions.remove(uid)
|
sessions.remove(uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean {
|
||||||
|
val timeout = 5000L // 5 seconds timeout
|
||||||
|
val startTime = System.currentTimeMillis()
|
||||||
|
while (System.currentTimeMillis() - startTime < timeout) {
|
||||||
|
for (frame in ws.incoming) {
|
||||||
|
if (frame is Text) {
|
||||||
|
val message = frame.readText()
|
||||||
|
val data = Json.decodeFromString<SongRequest>(message)
|
||||||
|
if (data.type == SongType.ACK.value) {
|
||||||
|
return true // ACK received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delay(100) // Check every 100 ms
|
||||||
|
}
|
||||||
|
return false // Timeout
|
||||||
|
}
|
||||||
|
|
||||||
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
|
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
|
||||||
var attempt = 0
|
var attempt = 0
|
||||||
var sentSuccessfully = false
|
var sentSuccessfully = false
|
||||||
@ -56,11 +77,17 @@ fun Routing.wsSongListRoutes() {
|
|||||||
try {
|
try {
|
||||||
// Attempt to send the message
|
// Attempt to send the message
|
||||||
ws?.sendSerialized(res)
|
ws?.sendSerialized(res)
|
||||||
sentSuccessfully = true // If no exception, mark as sent successfully
|
logger.debug("Message sent successfully to $uid on attempt $attempt")
|
||||||
logger.debug("Message sent successfully on attempt $attempt")
|
// Wait for ACK
|
||||||
|
val ackReceived = waitForAck(ws!!, res.type)
|
||||||
|
if (ackReceived) {
|
||||||
|
sentSuccessfully = true
|
||||||
|
} else {
|
||||||
|
logger.warn("ACK not received for message to $uid on attempt $attempt.")
|
||||||
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
attempt++
|
attempt++
|
||||||
logger.warn("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
|
logger.warn("Failed to send message to $uid on attempt $attempt. Retrying in $delayMillis ms.")
|
||||||
logger.warn(e.stackTraceToString())
|
logger.warn(e.stackTraceToString())
|
||||||
|
|
||||||
// Wait before retrying
|
// Wait before retrying
|
||||||
@ -69,7 +96,7 @@ fun Routing.wsSongListRoutes() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!sentSuccessfully) {
|
if (!sentSuccessfully) {
|
||||||
logger.error("Failed to send message after $maxRetries attempts.")
|
logger.error("Failed to send message to $uid after $maxRetries attempts.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,35 +128,95 @@ fun Routing.wsSongListRoutes() {
|
|||||||
try {
|
try {
|
||||||
for (frame in incoming) {
|
for (frame in incoming) {
|
||||||
when (frame) {
|
when (frame) {
|
||||||
is Frame.Text -> {
|
is Text -> {
|
||||||
if (frame.readText() == "ping") {
|
if (frame.readText() == "ping") {
|
||||||
send("pong")
|
send("pong")
|
||||||
} else {
|
} else {
|
||||||
val data = frame.readText().let { Json.decodeFromString<SongRequest>(it) }
|
val data = frame.readText().let { Json.decodeFromString<SongRequest>(it) }
|
||||||
|
|
||||||
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(
|
// Handle song requests
|
||||||
user,
|
handleSongRequest(data, user, dispatcher, logger)
|
||||||
data.maxQueue
|
|
||||||
|
// Send ACK after handling request
|
||||||
|
send("ACK: ${data.url}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
is Ping -> send(Pong(frame.data))
|
||||||
|
else -> ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e: ClosedReceiveChannelException) {
|
||||||
|
logger.error("Error in WebSocket: ${e.message}")
|
||||||
|
} finally {
|
||||||
|
removeSession(uid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dispatcher.subscribe(SongEvent::class) {
|
||||||
|
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
|
||||||
|
CoroutineScope(Dispatchers.Default).launch {
|
||||||
|
val user = UserService.getUser(it.uid)
|
||||||
|
if (user != null) {
|
||||||
|
user.token?.let { token ->
|
||||||
|
sendWithRetry(
|
||||||
|
token, SongResponse(
|
||||||
|
it.type.value,
|
||||||
|
it.uid,
|
||||||
|
it.reqUid,
|
||||||
|
it.current?.toSerializable(),
|
||||||
|
it.next?.toSerializable(),
|
||||||
|
it.delUrl
|
||||||
)
|
)
|
||||||
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(
|
|
||||||
user,
|
|
||||||
data.maxUserLimit
|
|
||||||
)
|
)
|
||||||
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(
|
}
|
||||||
user,
|
}
|
||||||
data.isStreamerOnly
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dispatcher.subscribe(TimerEvent::class) {
|
||||||
|
if (it.type == TimerType.STREAM_OFF) {
|
||||||
|
CoroutineScope(Dispatchers.Default).launch {
|
||||||
|
val user = UserService.getUser(it.uid)
|
||||||
|
if (user != null) {
|
||||||
|
user.token?.let { token ->
|
||||||
|
sendWithRetry(
|
||||||
|
token, SongResponse(
|
||||||
|
it.type.value,
|
||||||
|
it.uid,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
suspend fun handleSongRequest(
|
||||||
|
data: SongRequest,
|
||||||
|
user: User,
|
||||||
|
dispatcher: CoroutinesEventBus,
|
||||||
|
logger: Logger
|
||||||
|
) {
|
||||||
|
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(user, data.maxQueue)
|
||||||
|
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(user, data.maxUserLimit)
|
||||||
|
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(user, data.isStreamerOnly)
|
||||||
if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled)
|
if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled)
|
||||||
|
|
||||||
if (data.type == SongType.ADD.value && data.url != null) {
|
when (data.type) {
|
||||||
|
SongType.ADD.value -> {
|
||||||
|
data.url?.let { url ->
|
||||||
try {
|
try {
|
||||||
val youtubeVideo = getYoutubeVideo(data.url)
|
val youtubeVideo = getYoutubeVideo(url)
|
||||||
if (youtubeVideo != null) {
|
if (youtubeVideo != null) {
|
||||||
CoroutineScope(Dispatchers.Default).launch {
|
CoroutineScope(Dispatchers.Default).launch {
|
||||||
SongListService.saveSong(
|
SongListService.saveSong(
|
||||||
user,
|
user,
|
||||||
user.token!!,
|
user.token!!,
|
||||||
data.url,
|
url,
|
||||||
youtubeVideo.name,
|
youtubeVideo.name,
|
||||||
youtubeVideo.author,
|
youtubeVideo.author,
|
||||||
youtubeVideo.length,
|
youtubeVideo.length,
|
||||||
@ -149,14 +236,15 @@ fun Routing.wsSongListRoutes() {
|
|||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.debug("SongType.ADD Error: $uid $e")
|
logger.debug("SongType.ADD Error: $uid $e")
|
||||||
}
|
}
|
||||||
} else if (data.type == SongType.REMOVE.value && data.url != null) {
|
}
|
||||||
|
}
|
||||||
|
SongType.REMOVE.value -> {
|
||||||
|
data.url?.let { url ->
|
||||||
val songs = SongListService.getSong(user)
|
val songs = SongListService.getSong(user)
|
||||||
|
val exactSong = songs.firstOrNull { it.url == url }
|
||||||
val exactSong = songs.firstOrNull { it.url == data.url }
|
|
||||||
if (exactSong != null) {
|
if (exactSong != null) {
|
||||||
SongListService.deleteSong(user, exactSong.uid, exactSong.name)
|
SongListService.deleteSong(user, exactSong.uid, exactSong.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher.post(
|
dispatcher.post(
|
||||||
SongEvent(
|
SongEvent(
|
||||||
user.token!!,
|
user.token!!,
|
||||||
@ -164,10 +252,12 @@ fun Routing.wsSongListRoutes() {
|
|||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
data.url
|
url
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
} else if (data.type == SongType.NEXT.value) {
|
}
|
||||||
|
}
|
||||||
|
SongType.NEXT.value -> {
|
||||||
val songList = SongListService.getSong(user)
|
val songList = SongListService.getSong(user)
|
||||||
var song: SongList? = null
|
var song: SongList? = null
|
||||||
var youtubeVideo: YoutubeVideo? = null
|
var youtubeVideo: YoutubeVideo? = null
|
||||||
@ -198,58 +288,6 @@ fun Routing.wsSongListRoutes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
is Frame.Ping -> send(Frame.Pong(frame.data))
|
|
||||||
else -> {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch(e: ClosedReceiveChannelException) {
|
|
||||||
logger.error("Error in WebSocket: ${e.message}")
|
|
||||||
} finally {
|
|
||||||
removeSession(uid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dispatcher.subscribe(SongEvent::class) {
|
|
||||||
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
|
|
||||||
CoroutineScope(Dispatchers.Default).launch {
|
|
||||||
val user = UserService.getUser(it.uid)
|
|
||||||
if(user != null) {
|
|
||||||
user.token?.let { token ->
|
|
||||||
sendWithRetry(
|
|
||||||
token, SongResponse(
|
|
||||||
it.type.value,
|
|
||||||
it.uid,
|
|
||||||
it.reqUid,
|
|
||||||
it.current?.toSerializable(),
|
|
||||||
it.next?.toSerializable(),
|
|
||||||
it.delUrl
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dispatcher.subscribe(TimerEvent::class) {
|
|
||||||
if(it.type == TimerType.STREAM_OFF) {
|
|
||||||
CoroutineScope(Dispatchers.Default).launch {
|
|
||||||
val user = UserService.getUser(it.uid)
|
|
||||||
if(user != null) {
|
|
||||||
user.token?.let { token ->
|
|
||||||
sendWithRetry(
|
|
||||||
token, SongResponse(
|
|
||||||
it.type.value,
|
|
||||||
it.uid,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Serializable
|
@Serializable
|
||||||
data class SongRequest(
|
data class SongRequest(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user