Compare commits

...

15 Commits

Author SHA1 Message Date
dalbodeule
3c3b9a79a2
debug chzzk login 4 2025-05-13 21:25:57 +09:00
dalbodeule
61a5f985c1
Refactor: replace songListScope with appropriate scopes
Replaced `songListScope` with `songScope` in `WSSongRoutes` and `timerScope` in `WSTimerRoutes` to better reflect their respective purposes. Improves code clarity and consistency in scope usage.
2025-04-24 17:48:11 +09:00
dalbodeule
aa95976005
Refactor WebSocket song list handling and improve session logic
Replaced individual WebSocket session management with `SessionHandler` to centralize and streamline logic. Improved code readability, reliability, and maintainability by reducing redundancy and encapsulating session and request handling in dedicated classes. Added retry mechanisms, acknowledgment handling, and better application shutdown handling.
2025-04-24 17:45:25 +09:00
dalbodeule
c5a98943c0
Refactor WebSocket ACK handling and improve message retries
Introduced `waitForAck` to centralize ACK handling logic and updated retry mechanism in `sendWithRetry` to improve reliability and readability. Cleaned up error handling in WebSocket session management and ensured proper cleanup of resources. These changes enhance maintainability and robustness of the WebSocket song list routes.
2025-04-24 17:08:12 +09:00
dalbodeule
8230762053
Refactor WebSocket handlers and add ACK-based message flow
Consolidated coroutine scopes into `songListScope` and `timerScope` for better management across WebSocket routes. Introduced ACK (acknowledgment) handling for reliable message delivery with retries and timeouts. Updated session handling for multiple WebSocket routes to improve code maintainability and consistency.
2025-04-24 16:56:49 +09:00
dalbodeule
d07cdb6ae8
Cancel route scope on application stop and simplify ACK handling.
Added a monitor to cancel the route scope when the application stops, ensuring proper resource cleanup. Removed the timeout logic in the ACK handling method, simplifying the flow while maintaining error handling.
2025-04-24 16:37:11 +09:00
dalbodeule
9c15c8f10d
Remove SongListWebSocketManager and simplify wsSongListRoutes
The SongListWebSocketManager class and its associated logic were removed to streamline the codebase. The wsSongListRoutes function was updated accordingly to no longer require the manager as a parameter.
2025-04-24 16:26:25 +09:00
dalbodeule
5a7f78ff3e
Refactor WebSocket route to use shared CoroutineScope
Introduced a shared `routeScope` with `SupervisorJob` for better coroutine management across WebSocket routes. This replaces ad-hoc CoroutineScope creation, preventing unnecessary scope overhead and supporting centralized cancellation. Mutexes were added for session and song-related operations to ensure thread safety.
2025-04-24 16:23:55 +09:00
dalbodeule
7a84a9e437
Configure SongListWebSocketManager in wsSongListRoutes.
This change adds a `SongListWebSocketManager` instance with a logger to the `wsSongListRoutes` setup. It improves manageability and ensures better logging for WebSocket interactions in the song list route.
2025-04-24 16:01:16 +09:00
dalbodeule
02cede87f8
Add SongListWebSocketManager and refactor WebSocket routes
Introduced SongListWebSocketManager for managing WebSocket sessions, including ping-pong handling and retry mechanisms. Refactored WSSongListRoutes to delegate session management and simplify logic by leveraging the new manager class.
2025-04-24 15:58:56 +09:00
dalbodeule
17d8065a34
Fix session cleanup in WebSocket routes
Add missing `finally` blocks to ensure session removal in WebSocket routes after exceptions. This prevents potential memory leaks and ensures proper resource cleanup.
2025-04-24 15:01:12 +09:00
dalbodeule
0e8462eaf1
Handle WebSocket session removal on channel closure
Add `removeSession` calls in WebSocket exception handling blocks to ensure proper session cleanup when a `ClosedReceiveChannelException` occurs. Prevents potential resource leaks and ensures consistency across WebSocket routes.
2025-04-24 14:56:00 +09:00
dalbodeule
83cb68b63f
**Remove redundant session cleanup in WebSocket error handlers**
Removed unnecessary `removeSession` calls from WebSocket `finally` blocks as they are either handled elsewhere or no longer needed. This simplifies the error handling flow and ensures consistency across WebSocket route implementations.
2025-04-24 14:51:28 +09:00
JinU Choi
c2bb653ee1
Merge pull request #124 from dalbodeule/develop
if account deleted?
2025-03-31 18:47:13 +09:00
dalbodeule
8ab1dc585e
if account deleted? 2025-03-31 18:43:12 +09:00
8 changed files with 459 additions and 303 deletions

View File

@ -40,7 +40,11 @@ object ChzzkHandler {
botUid = chzzk.loggedUser.userId botUid = chzzk.loggedUser.userId
UserService.getAllUsers().map { UserService.getAllUsers().map {
if(!it.isDisabled) if(!it.isDisabled)
try {
chzzk.getChannel(it.token)?.let { token -> addUser(token, it) } chzzk.getChannel(it.token)?.let { token -> addUser(token, it) }
} catch(e: Exception) {
logger.info("Exception: ${it.token}(${it.username}) not found. ${e.stackTraceToString()}")
}
} }
handlers.forEach { handler -> handlers.forEach { handler ->

View File

@ -5,7 +5,8 @@ enum class TimerType(var value: Int) {
TIMER(1), TIMER(1),
REMOVE(2), REMOVE(2),
STREAM_OFF(50) STREAM_OFF(50),
ACK(51)
} }
class TimerEvent( class TimerEvent(

View File

@ -15,6 +15,8 @@ object Users: IntIdTable("users") {
val liveAlertMessage = text("live_alert_message").nullable() val liveAlertMessage = text("live_alert_message").nullable()
val isDisableStartupMsg = bool("is_disable_startup_msg").default(false) val isDisableStartupMsg = bool("is_disable_startup_msg").default(false)
val isDisabled = bool("is_disabled").default(false) val isDisabled = bool("is_disabled").default(false)
val accessToken = varchar("access_token", 255).nullable()
val refreshToken = varchar("refresh_token", 255).nullable()
} }
class User(id: EntityID<Int>) : IntEntity(id) { class User(id: EntityID<Int>) : IntEntity(id) {
@ -29,6 +31,9 @@ class User(id: EntityID<Int>) : IntEntity(id) {
var isDisableStartupMsg by Users.isDisableStartupMsg var isDisableStartupMsg by Users.isDisableStartupMsg
var isDisabled by Users.isDisabled var isDisabled by Users.isDisabled
var accessToken by Users.accessToken
var refreshToken by Users.refreshToken
// 유저가 가진 매니저들 // 유저가 가진 매니저들
var managers by User.via(UserManagers.user, UserManagers.manager) var managers by User.via(UserManagers.user, UserManagers.manager)

View File

@ -97,4 +97,19 @@ object UserService {
user user
} }
} }
fun setAccessToken(user: User, accessToken: String): User {
return transaction {
user.accessToken = accessToken
user
}
}
fun setRefreshToken(user: User, accessToken: String, refreshToken: String): User {
return transaction {
user.accessToken = accessToken
user.refreshToken = refreshToken
user
}
}
} }

View File

@ -25,7 +25,6 @@ import kotlinx.serialization.json.Json
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.webserver.routes.* import space.mori.chzzk_bot.webserver.routes.*
import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits
import wsSongListRoutes
import java.math.BigInteger import java.math.BigInteger
import java.security.SecureRandom import java.security.SecureRandom
import java.time.Duration import java.time.Duration
@ -192,6 +191,7 @@ val server = embeddedServer(Netty, port = 8080, ) {
val userInfo = getChzzkUser(tokenResponse.content.accessToken) val userInfo = getChzzkUser(tokenResponse.content.accessToken)
if(userInfo.content != null) { if(userInfo.content != null) {
val user = UserService.getUser(userInfo.content.channelId)
call.sessions.set( call.sessions.set(
UserSession( UserSession(
session.state, session.state,
@ -199,6 +199,7 @@ val server = embeddedServer(Netty, port = 8080, ) {
listOf() listOf()
) )
) )
user?.let { UserService.setRefreshToken(it, tokenResponse.content.accessToken, tokenResponse.content.refreshToken ?: "") }
call.respondRedirect(getFrontendURL("")) call.respondRedirect(getFrontendURL(""))
} }
} catch (e: Exception) { } catch (e: Exception) {

View File

@ -1,115 +1,54 @@
package space.mori.chzzk_bot.webserver.routes
import io.ktor.client.plugins.websocket.WebSocketException
import io.ktor.server.application.*
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.sessions.* import io.ktor.server.sessions.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.util.logging.Logger
import io.ktor.websocket.* import io.ktor.websocket.*
import io.ktor.websocket.Frame.* import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.launch import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
import space.mori.chzzk_bot.common.models.SongList import space.mori.chzzk_bot.common.models.SongList
import space.mori.chzzk_bot.common.models.SongLists.uid
import space.mori.chzzk_bot.common.models.User import space.mori.chzzk_bot.common.models.User
import space.mori.chzzk_bot.common.services.SongConfigService
import space.mori.chzzk_bot.common.services.SongListService import space.mori.chzzk_bot.common.services.SongListService
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.YoutubeVideo import space.mori.chzzk_bot.common.utils.YoutubeVideo
import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import space.mori.chzzk_bot.webserver.UserSession import space.mori.chzzk_bot.webserver.UserSession
import space.mori.chzzk_bot.webserver.routes.SongResponse
import space.mori.chzzk_bot.webserver.routes.toSerializable
import space.mori.chzzk_bot.webserver.utils.CurrentSong import space.mori.chzzk_bot.webserver.utils.CurrentSong
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
fun Routing.wsSongListRoutes() { fun Routing.wsSongListRoutes() {
val sessions = ConcurrentHashMap<String, WebSocketServerSession>()
val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongListRoutes") val logger = LoggerFactory.getLogger("WSSongListRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val songListScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun addSession(uid: String, session: WebSocketServerSession) { // Manage all active sessions
if (sessions[uid] != null) { val sessionHandlers = ConcurrentHashMap<String, SessionHandler>()
CoroutineScope(Dispatchers.Default).launch {
sessions[uid]?.close(
CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Duplicated sessions.")
)
}
}
sessions[uid] = session
}
fun removeSession(uid: String) { // Handle application shutdown
sessions.remove(uid) environment.monitor.subscribe(ApplicationStopped) {
sessionHandlers.values.forEach {
songListScope.launch {
it.close(CloseReason(CloseReason.Codes.NORMAL, "Server shutting down"))
} }
suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean {
val timeout = 5000L // 5 seconds timeout
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < timeout) {
for (frame in ws.incoming) {
if (frame is Text) {
val message = frame.readText()
if(message == "ping") {
return true
}
val data = Json.decodeFromString<SongRequest>(message)
if (data.type == SongType.ACK.value) {
return true // ACK received
}
}
}
delay(100) // Check every 100 ms
}
return false // Timeout
}
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0
var sentSuccessfully = false
while (attempt < maxRetries && !sentSuccessfully) {
val ws = sessions[uid]
try {
if(ws == null) {
delay(delayMillis)
continue
}
// Attempt to send the message
ws.sendSerialized(res)
logger.debug("Message sent successfully to $uid on attempt $attempt")
// Wait for ACK
val ackReceived = waitForAck(ws, res.type)
if (ackReceived == true) {
sentSuccessfully = true
} else {
logger.warn("ACK not received for message to $uid on attempt $attempt.")
}
} catch (e: Exception) {
attempt++
logger.warn("Failed to send message to $uid on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn(e.stackTraceToString())
} finally {
// Wait before retrying
delay(delayMillis)
}
}
if (!sentSuccessfully) {
logger.error("Failed to send message to $uid after $maxRetries attempts.")
} }
} }
// WebSocket endpoint
webSocket("/songlist") { webSocket("/songlist") {
val session = call.sessions.get<UserSession>() val session = call.sessions.get<UserSession>()
val user = session?.id?.let { UserService.getUser(it) } val user: User? = session?.id?.let { UserService.getUser(it) }
if (user == null) { if (user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid SID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid SID"))
return@webSocket return@webSocket
@ -117,102 +56,204 @@ fun Routing.wsSongListRoutes() {
val uid = user.token val uid = user.token
addSession(uid, this) // Ensure only one session per user
sessionHandlers[uid]?.close(CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Another session is already active."))
if (status[uid] == SongType.STREAM_OFF) { val handler = SessionHandler(uid, this, dispatcher, logger)
CoroutineScope(Dispatchers.Default).launch { sessionHandlers[uid] = handler
sendSerialized(SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
))
}
removeSession(uid)
}
// Initialize session
handler.initialize()
// Listen for incoming frames
try { try {
for (frame in incoming) { for (frame in incoming) {
when (frame) { when (frame) {
is Text -> { is Frame.Text -> handler.handleTextFrame(frame.readText())
if (frame.readText().trim() == "ping") { is Frame.Ping -> send(Frame.Pong(frame.data))
send("pong") else -> Unit
} else {
val data = frame.readText().let { Json.decodeFromString<SongRequest>(it) }
// Handle song requests
handleSongRequest(data, user, dispatcher, logger)
}
}
is Ping -> send(Pong(frame.data))
else -> ""
} }
} }
} catch (e: ClosedReceiveChannelException) { } catch (e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.info("Session closed: ${e.message}")
} catch (e: IOException) {
logger.error("IO error: ${e.message}")
} catch (e: Exception) {
logger.error("Unexpected error: ${e.message}")
} finally { } finally {
removeSession(uid) sessionHandlers.remove(uid)
handler.close(CloseReason(CloseReason.Codes.NORMAL, "Session ended"))
} }
} }
dispatcher.subscribe(SongEvent::class) { // Subscribe to SongEvents
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) dispatcher.subscribe(SongEvent::class) { event ->
CoroutineScope(Dispatchers.Default).launch { val handler = sessionHandlers[event.uid]
val user = UserService.getUser(it.uid) songListScope.launch {
if (user != null) { handler?.sendSongResponse(event)
sendWithRetry(
user.token, SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.current?.toSerializable(),
it.next?.toSerializable(),
it.delUrl
)
)
}
} }
} }
dispatcher.subscribe(TimerEvent::class) { // Subscribe to TimerEvents
if (it.type == TimerType.STREAM_OFF) { dispatcher.subscribe(TimerEvent::class) { event ->
CoroutineScope(Dispatchers.Default).launch { if (event.type == TimerType.STREAM_OFF) {
val user = UserService.getUser(it.uid) val handler = sessionHandlers[event.uid]
if (user != null) { songListScope.launch {
sendWithRetry( handler?.sendTimerOff()
user.token, SongResponse(
it.type.value,
it.uid,
null,
null,
null,
)
)
}
} }
} }
} }
} }
suspend fun handleSongRequest( class SessionHandler(
private val uid: String,
private val session: WebSocketServerSession,
private val dispatcher: CoroutinesEventBus,
private val logger: Logger
) {
private val ackMap = ConcurrentHashMap<String, CompletableDeferred<Boolean>>()
private val sessionMutex = Mutex()
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
suspend fun initialize() {
// Send initial status if needed,
// For example, send STREAM_OFF if applicable
// This can be extended based on your requirements
}
suspend fun handleTextFrame(text: String) {
if (text.trim() == "ping") {
session.send("pong")
return
}
val data = try {
Json.decodeFromString<SongRequest>(text)
} catch (e: Exception) {
logger.warn("Failed to decode SongRequest: ${e.message}")
return
}
when (data.type) {
SongType.ACK.value -> handleAck(data.uid)
else -> handleSongRequest(data)
}
}
private fun handleAck(requestUid: String) {
ackMap[requestUid]?.complete(true)
ackMap.remove(requestUid)
}
private fun handleSongRequest(data: SongRequest) {
scope.launch {
SongRequestProcessor.process(data, uid, dispatcher, this@SessionHandler, logger)
}
}
suspend fun sendSongResponse(event: SongEvent) {
val response = SongResponse(
type = event.type.value,
uid = event.uid,
reqUid = event.reqUid,
current = event.current?.toSerializable(),
next = event.next?.toSerializable(),
delUrl = event.delUrl
)
sendWithRetry(response)
}
suspend fun sendTimerOff() {
val response = SongResponse(
type = TimerType.STREAM_OFF.value,
uid = uid,
reqUid = null,
current = null,
next = null,
delUrl = null
)
sendWithRetry(response)
}
private suspend fun sendWithRetry(res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0
while (attempt < maxRetries) {
try {
session.sendSerialized(res)
val ackDeferred = CompletableDeferred<Boolean>()
ackMap[res.uid] = ackDeferred
val ackReceived = withTimeoutOrNull(5000L) { ackDeferred.await() } ?: false
if (ackReceived) {
logger.debug("ACK received for message to $uid on attempt $attempt.")
return
} else {
logger.warn("ACK not received for message to $uid on attempt $attempt.")
}
} catch (e: IOException) {
logger.warn("Failed to send message to $uid on attempt $attempt: ${e.message}")
if (e is WebSocketException) {
close(CloseReason(CloseReason.Codes.PROTOCOL_ERROR, "WebSocket error"))
return
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("Unexpected error while sending message to $uid on attempt $attempt: ${e.message}")
}
attempt++
delay(delayMillis)
}
logger.error("Failed to send message to $uid after $maxRetries attempts.")
}
suspend fun close(reason: CloseReason) {
try {
session.close(reason)
} catch (e: Exception) {
logger.warn("Error closing session: ${e.message}")
}
}
}
object SongRequestProcessor {
private val songMutex = Mutex()
suspend fun process(
data: SongRequest,
uid: String,
dispatcher: CoroutinesEventBus,
handler: SessionHandler,
logger: Logger
) {
val user = UserService.getUser(uid) ?: return
when (data.type) {
SongType.ADD.value -> handleAdd(data, user, dispatcher, handler, logger)
SongType.REMOVE.value -> handleRemove(data, user, dispatcher, logger)
SongType.NEXT.value -> handleNext(user, dispatcher, logger)
else -> {
// Handle other types if necessary
}
}
}
private suspend fun handleAdd(
data: SongRequest, data: SongRequest,
user: User, user: User,
dispatcher: CoroutinesEventBus, dispatcher: CoroutinesEventBus,
handler: SessionHandler,
logger: Logger logger: Logger
) { ) {
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(user, data.maxQueue) val url = data.url ?: return
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(user, data.maxUserLimit) val youtubeVideo = getYoutubeVideo(url) ?: run {
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(user, data.isStreamerOnly) logger.warn("Failed to fetch YouTube video for URL: $url")
if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled) return
}
when (data.type) { songMutex.withLock {
SongType.ADD.value -> {
data.url?.let { url ->
try {
val youtubeVideo = getYoutubeVideo(url)
if (youtubeVideo != null) {
CoroutineScope(Dispatchers.Default).launch {
SongListService.saveSong( SongListService.saveSong(
user, user,
user.token, user.token,
@ -222,81 +263,95 @@ suspend fun handleSongRequest(
youtubeVideo.length, youtubeVideo.length,
user.username user.username
) )
}
dispatcher.post( dispatcher.post(
SongEvent( SongEvent(
user.token, uid = user.token,
SongType.ADD, type = SongType.ADD,
user.token, reqUid = user.token,
CurrentSong.getSong(user), current = CurrentSong.getSong(user),
youtubeVideo next = youtubeVideo
) )
) )
} }
}
} catch (e: Exception) { private suspend fun handleRemove(
logger.debug("SongType.ADD Error: $uid $e") data: SongRequest,
} user: User,
} dispatcher: CoroutinesEventBus,
} logger: Logger
SongType.REMOVE.value -> { ) {
data.url?.let { url -> val url = data.url ?: return
songMutex.withLock {
val songs = SongListService.getSong(user) val songs = SongListService.getSong(user)
val exactSong = songs.firstOrNull { it.url == url } val exactSong = songs.firstOrNull { it.url == url }
if (exactSong != null) { if (exactSong != null) {
SongListService.deleteSong(user, exactSong.uid, exactSong.name) SongListService.deleteSong(user, exactSong.uid, exactSong.name)
} }
}
dispatcher.post( dispatcher.post(
SongEvent( SongEvent(
user.token, uid = user.token,
SongType.REMOVE, type = SongType.REMOVE,
null, delUrl = url,
null, reqUid = null,
null, current = null,
url next = null,
) )
) )
} }
}
SongType.NEXT.value -> { private suspend fun handleNext(
val songList = SongListService.getSong(user) user: User,
dispatcher: CoroutinesEventBus,
logger: Logger
) {
var song: SongList? = null var song: SongList? = null
var youtubeVideo: YoutubeVideo? = null var youtubeVideo: YoutubeVideo? = null
songMutex.withLock {
val songList = SongListService.getSong(user)
if (songList.isNotEmpty()) { if (songList.isNotEmpty()) {
song = songList[0] song = songList[0]
SongListService.deleteSong(user, song.uid, song.name) SongListService.deleteSong(user, song.uid, song.name)
} }
}
song?.let { song?.let {
youtubeVideo = YoutubeVideo( youtubeVideo = YoutubeVideo(
song.url, it.url,
song.name, it.name,
song.author, it.author,
song.time it.time
) )
} }
dispatcher.post( dispatcher.post(
SongEvent( SongEvent(
user.token, uid = user.token,
SongType.NEXT, type = SongType.NEXT,
song?.uid, current = null,
youtubeVideo next = youtubeVideo,
reqUid = null,
delUrl = null
) )
) )
CurrentSong.setSong(user, youtubeVideo) CurrentSong.setSong(user, youtubeVideo)
} }
} }
}
@Serializable @Serializable
data class SongRequest( data class SongRequest(
val type: Int, val type: Int,
val uid: String, val uid: String,
val url: String?, val url: String? = null,
val maxQueue: Int?, val maxQueue: Int? = null,
val maxUserLimit: Int?, val maxUserLimit: Int? = null,
val isStreamerOnly: Boolean?, val isStreamerOnly: Boolean? = null,
val remove: Int?, val remove: Int? = null,
val isDisabled: Boolean?, val isDisabled: Boolean? = null
) )

View File

@ -1,14 +1,20 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
@ -17,15 +23,20 @@ import space.mori.chzzk_bot.common.utils.YoutubeVideo
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
val songScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.wsSongRoutes() { fun Routing.wsSongRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
songScope.cancel()
}
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val status = ConcurrentHashMap<String, SongType>() val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongRoutes") val logger = LoggerFactory.getLogger("WSSongRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val ackMap = ConcurrentHashMap<String, ConcurrentHashMap<WebSocketServerSession, CompletableDeferred<Boolean>>>()
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
} }
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session) sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) { if(sessions[uid]?.isEmpty() == true) {
@ -42,27 +53,35 @@ fun Routing.wsSongRoutes() {
var attempt = 0 var attempt = 0
while (attempt < maxRetries) { while (attempt < maxRetries) {
try { try {
session.sendSerialized(message) // 메시지 전송 시도 session.sendSerialized(message)
return true // 성공하면 true 반환 val ackDeferred = CompletableDeferred<Boolean>()
ackMap.computeIfAbsent(message.uid) { ConcurrentHashMap() }[session] = ackDeferred
val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false
if (ackReceived) {
ackMap[message.uid]?.remove(session)
return true
} else {
attempt++
logger.warn("ACK not received for message to ${message.uid} on attempt $attempt.")
}
} catch (e: Exception) { } catch (e: Exception) {
attempt++ attempt++
logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.") logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
e.printStackTrace() e.printStackTrace()
delay(delayMillis) // 재시도 전 대기 delay(delayMillis)
} }
} }
return false // 재시도 실패 시 false 반환 return false
} }
fun broadcastMessage(userId: String, message: SongResponse) { fun broadcastMessage(userId: String, message: SongResponse) {
val userSessions = sessions[userId] val userSessions = sessions[userId]
userSessions?.forEach { session -> userSessions?.forEach { session ->
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
val success = sendWithRetry(session, message) val success = sendWithRetry(session, message)
if (!success) { if (!success) {
println("Removing session for user $userId due to repeated failures.") logger.info("Removing session for user $userId due to repeated failures.")
userSessions.remove(session) // 실패 시 세션 제거 removeSession(userId, session)
} }
} }
} }
@ -71,19 +90,13 @@ fun Routing.wsSongRoutes() {
webSocket("/song/{uid}") { webSocket("/song/{uid}") {
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) } val user = uid?.let { UserService.getUser(it) }
if (uid == null) { if (uid == null || user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket return@webSocket
} }
if (user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket
}
addSession(uid, this) addSession(uid, this)
if(status[uid] == SongType.STREAM_OFF) { if(status[uid] == SongType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
sendSerialized(SongResponse( sendSerialized(SongResponse(
SongType.STREAM_OFF.value, SongType.STREAM_OFF.value,
uid, uid,
@ -93,33 +106,36 @@ fun Routing.wsSongRoutes() {
)) ))
} }
} }
try { try {
for (frame in incoming) { for (frame in incoming) {
when(frame) { when(frame) {
is Frame.Text -> { is Frame.Text -> {
if(frame.readText().trim() == "ping") { val text = frame.readText().trim()
if(text == "ping") {
send("pong") send("pong")
} else {
val data = Json.decodeFromString<SongRequest>(text)
if (data.type == SongType.ACK.value) {
ackMap[data.uid]?.get(this)?.complete(true)
ackMap[data.uid]?.remove(this)
}
} }
} }
is Frame.Ping -> send(Frame.Pong(frame.data)) is Frame.Ping -> send(Frame.Pong(frame.data))
else -> { else -> {}
}
} }
} }
} catch(e: ClosedReceiveChannelException) { } catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
removeSession(uid, this) removeSession(uid, this)
ackMap[uid]?.remove(this)
} }
} }
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
dispatcher.subscribe(SongEvent::class) { dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(it.uid, SongResponse(
it.type.value, it.type.value,
it.uid, it.uid,
@ -132,7 +148,7 @@ fun Routing.wsSongRoutes() {
} }
dispatcher.subscribe(TimerEvent::class) { dispatcher.subscribe(TimerEvent::class) {
if(it.type == TimerType.STREAM_OFF) { if(it.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(it.uid, SongResponse(
it.type.value, it.type.value,
it.uid, it.uid,
@ -144,7 +160,6 @@ fun Routing.wsSongRoutes() {
} }
} }
} }
@Serializable @Serializable
data class SerializableYoutubeVideo( data class SerializableYoutubeVideo(
val url: String, val url: String,
@ -152,9 +167,7 @@ data class SerializableYoutubeVideo(
val author: String, val author: String,
val length: Int val length: Int
) )
fun YoutubeVideo.toSerializable() = SerializableYoutubeVideo(url, name, author, length) fun YoutubeVideo.toSerializable() = SerializableYoutubeVideo(url, name, author, length)
@Serializable @Serializable
data class SongResponse( data class SongResponse(
val type: Int, val type: Int,

View File

@ -1,13 +1,20 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
@ -17,14 +24,19 @@ import space.mori.chzzk_bot.webserver.utils.CurrentTimer
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
val timerScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.wsTimerRoutes() { fun Routing.wsTimerRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
timerScope.cancel()
}
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val logger = LoggerFactory.getLogger("WSTimerRoutes") val logger = LoggerFactory.getLogger("WSTimerRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val ackMap = ConcurrentHashMap<String, ConcurrentHashMap<WebSocketServerSession, CompletableDeferred<Boolean>>>()
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
} }
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session) sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) { if(sessions[uid]?.isEmpty() == true) {
@ -32,82 +44,132 @@ fun Routing.wsTimerRoutes() {
} }
} }
suspend fun sendWithRetry(
session: WebSocketServerSession,
message: TimerResponse,
maxRetries: Int = 3,
delayMillis: Long = 2000L
): Boolean {
var attempt = 0
while (attempt < maxRetries) {
try {
session.sendSerialized(message)
val ackDeferred = CompletableDeferred<Boolean>()
ackMap.computeIfAbsent(message.uid) { ConcurrentHashMap() }[session] = ackDeferred
val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false
if (ackReceived) {
ackMap[message.uid]?.remove(session)
return true
} else {
attempt++
logger.warn("ACK not received for message to ${message.uid} on attempt $attempt.")
}
} catch (e: Exception) {
attempt++
logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
e.printStackTrace()
delay(delayMillis)
}
}
return false
}
fun broadcastMessage(uid: String, message: TimerResponse) {
val userSessions = sessions[uid]
userSessions?.forEach { session ->
timerScope.launch {
val success = sendWithRetry(session, message.copy(uid = uid))
if (!success) {
logger.info("Removing session for user $uid due to repeated failures.")
removeSession(uid, session)
}
}
}
}
webSocket("/timer/{uid}") { webSocket("/timer/{uid}") {
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) } val user = uid?.let { UserService.getUser(it) }
if (uid == null) { if (uid == null || user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket return@webSocket
} }
if (user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket
}
addSession(uid, this) addSession(uid, this)
val timer = CurrentTimer.getTimer(user) val timer = CurrentTimer.getTimer(user)
if (timer?.type == TimerType.STREAM_OFF) { if (timer?.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch { timerScope.launch {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null)) sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null, uid))
} }
} else { } else {
CoroutineScope(Dispatchers.Default).launch { timerScope.launch {
if(timer?.type == TimerType.STREAM_OFF) {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null, uid))
} else {
if (timer == null) { if (timer == null) {
sendSerialized( sendSerialized(
TimerResponse( TimerResponse(
TimerConfigService.getConfig(user)?.option ?: TimerType.REMOVE.value, TimerConfigService.getConfig(user)?.option ?: TimerType.REMOVE.value,
null null,
uid
) )
) )
} else { } else {
sendSerialized( sendSerialized(
TimerResponse( TimerResponse(
timer.type.value, timer.type.value,
timer.time timer.time,
uid
) )
) )
} }
} }
} }
}
try { try {
for (frame in incoming) { for (frame in incoming) {
when(frame) { when(frame) {
is Frame.Text -> { is Frame.Text -> {
if(frame.readText().trim() == "ping") { val text = frame.readText().trim()
if(text == "ping") {
send("pong") send("pong")
} else {
val data = Json.decodeFromString<TimerRequest>(text)
if (data.type == TimerType.ACK.value) {
ackMap[data.uid]?.get(this)?.complete(true)
ackMap[data.uid]?.remove(this)
}
} }
} }
is Frame.Ping -> send(Frame.Pong(frame.data)) is Frame.Ping -> send(Frame.Pong(frame.data))
else -> { else -> {}
}
} }
} }
} catch(e: ClosedReceiveChannelException) { } catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
removeSession(uid, this) removeSession(uid, this)
ackMap[uid]?.remove(this)
} }
} }
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
dispatcher.subscribe(TimerEvent::class) { dispatcher.subscribe(TimerEvent::class) {
logger.debug("TimerEvent: {} / {}", it.uid, it.type) logger.debug("TimerEvent: {} / {}", it.uid, it.type)
val user = UserService.getUser(it.uid) val user = UserService.getUser(it.uid)
CurrentTimer.setTimer(user!!, it) CurrentTimer.setTimer(user!!, it)
CoroutineScope(Dispatchers.Default).launch { timerScope.launch {
sessions[it.uid]?.forEach { ws -> broadcastMessage(it.uid, TimerResponse(it.type.value, it.time ?: "", it.uid))
ws.sendSerialized(TimerResponse(it.type.value, it.time ?: ""))
} }
} }
} }
}
@Serializable @Serializable
data class TimerResponse( data class TimerResponse(
val type: Int, val type: Int,
val time: String? val time: String?,
val uid: String
)
@Serializable
data class TimerRequest(
val type: Int,
val uid: String
) )