74 Commits

Author SHA1 Message Date
dalbodeule
f27c0da775 [feature] StreamInfo Thread sleep time changed (3min -> 10min) 2025-07-20 23:11:09 +09:00
dalbodeule
c528448b5e [feature] StreamInfo Thread sleep time added. 2025-07-20 23:04:24 +09:00
dalbodeule
db78bf3ff6 [feature] metric related fixed. (3x)
- active streamer gauge connected the wrong variable.
2025-07-16 19:23:50 +09:00
dalbodeule
d7d4228063 [feature] metric related fixed. (2x) 2025-07-16 19:09:10 +09:00
dalbodeule
b1432c662f [feature] metric related fixed. 2025-07-16 18:52:04 +09:00
dalbodeule
19fcc15fe1 [feature] loki handler fix (3x) 2025-07-16 13:17:04 +09:00
dalbodeule
b497c690af [feature] loki handler fix (2x) 2025-07-16 13:14:43 +09:00
dalbodeule
866fe19cb9 [feature] loki handler fix 2025-07-16 13:10:06 +09:00
dalbodeule
e21641da7b [feature] add loki handler. 2025-07-16 13:01:39 +09:00
dalbodeule
66df771cb7 [feature] add some api, etc..
- add some api
- add /metrics routing
2025-07-15 21:52:41 +09:00
dalbodeule
9e3a79a613 [hotfix] temporary debug. 2025-07-13 18:05:29 +09:00
dalbodeule
945d3fd5e4 [hotfix] add title change command; restrict usage to moderators. 2025-06-24 16:59:18 +09:00
dalbodeule
af9c3a2cf5 [hotfix] refactor badge checks; adjust listener initialization logic. 2025-06-24 16:46:46 +09:00
dalbodeule
bd31039f2b [hotfix] ensure listener unsubscribes on user offline. 2025-06-24 16:19:53 +09:00
dalbodeule
c5c115f6e6 [hotfix] make listener and messageHandler non-nullable; update related logic. 2025-06-24 16:11:42 +09:00
dalbodeule
5683edaa5e [hotfix] make listener and messageHandler nullable; add null-safe checks. 2025-06-18 15:28:15 +09:00
dalbodeule
c86f0b2ab3 [hotfix] role hotfix 2025-06-08 16:20:27 +09:00
dalbodeule
27cfbd9087 [hotfix] category command announcement added. 2025-06-08 16:16:17 +09:00
dalbodeule
9ce07a025a [hotfix] category command announcement added. 2025-06-08 16:15:36 +09:00
dalbodeule
073b50b9cd [hotfix] category command some fixed. 2025-06-08 16:11:11 +09:00
dalbodeule
f37dd2c59a [hotfix] add category command. 2025-06-08 16:02:22 +09:00
dalbodeule
864ba3748d [hotfix] readme and some url changed. 2025-06-08 15:50:49 +09:00
dalbodeule
593b98b7fb [hotfix] coroutine related fixed. (4x) 2025-06-08 15:36:33 +09:00
dalbodeule
95676e9b39 [hotfix] coroutine related fixed. (3x) 2025-06-08 15:28:50 +09:00
dalbodeule
722b5972d9 [hotfix] coroutine related fixed. (2x) 2025-06-08 15:24:33 +09:00
dalbodeule
a88f994ccd [hotfix] coroutine related fixed. 2025-06-08 15:10:41 +09:00
dalbodeule
49541f7289 [hotfix] connect function added. 2025-06-08 15:03:35 +09:00
dalbodeule
ba9fb052cd [hotfix] fixed logout redirect and removed unused debug print. 2025-06-06 15:37:52 +09:00
dalbodeule
51232ad593 [hotfix] please, this patch is the final patch! 2025-06-06 15:23:52 +09:00
dalbodeule
77eecaca34 [hotfix] token claim fixed. (2x) 2025-06-04 16:27:32 +09:00
dalbodeule
90230c4691 [hotfix] token claim fixed. 2025-06-04 16:18:02 +09:00
dalbodeule
b2f449bf65 [hotfix] ChzzkClient.refreshAccessToken added. 2025-06-04 15:48:28 +09:00
dalbodeule
5e3a350e15 Merge branch 'develop'
# Conflicts:
#	chatbot/src/main/kotlin/space/mori/chzzk_bot/chatbot/chzzk/ChzzkHandler.kt
#	chatbot/src/main/kotlin/space/mori/chzzk_bot/chatbot/utils/accessTokenRefresh.kt
2025-06-04 15:46:43 +09:00
dalbodeule
8a0a507e5b [feature] some logic fixed. 2025-06-04 15:42:31 +09:00
dalbodeule
1c4b818a85 Revert "Merge pull request #133 from dalbodeule/develop"
This reverts commit 83b5eaf345, reversing
changes made to a99f3b342a.
2025-05-27 13:18:52 +09:00
JinU Choi
83b5eaf345 Merge pull request #133 from dalbodeule/develop
[feature] accessToken refresh logic fix.
2025-05-27 13:13:24 +09:00
dalbodeule
b0be81df20 [feature] accessToken refresh logic fix. 2025-05-27 13:11:17 +09:00
JinU Choi
a99f3b342a Merge pull request #132 from dalbodeule/develop
[feature] manager detect logic fixed.
2025-05-20 11:21:51 +09:00
dalbodeule
a9d3ad436b [feature] manager detect logic fixed. 2025-05-20 11:17:36 +09:00
JinU Choi
53757476a7 Merge pull request #131 from dalbodeule/debug
[feature] timer debugs. (2x)
2025-05-18 09:36:27 +09:00
dalbodeule
27810c0b7f [feature] timer debugs. (2x) 2025-05-18 09:31:36 +09:00
JinU Choi
7257100adc Merge pull request #130 from dalbodeule/develop
[feature] timer debugs.
2025-05-18 09:27:14 +09:00
dalbodeule
f29370a31f [feature] timer debugs. 2025-05-18 09:24:11 +09:00
JinU Choi
2c0c887ba1 Merge pull request #129 from dalbodeule/develop
[feature] song list websocket service fixed.
2025-05-18 08:56:41 +09:00
dalbodeule
5223cbe2b2 [feature] song list websocket service fixed. 2025-05-18 08:55:01 +09:00
JinU Choi
11f9895198 Merge pull request #128 from dalbodeule/develop
[feature] thumbnail, etc. fixed
2025-05-18 08:17:15 +09:00
dalbodeule
a18b83fcc8 [feature] thumbnail, etc. fixed 2025-05-18 08:14:46 +09:00
JinU Choi
30d5edc5fe Merge pull request #127 from dalbodeule/develop
[feature] text size limited 100, 100ms delay added.
2025-05-17 14:47:18 +09:00
dalbodeule
0709b8f526 [feature] text size limited 100, 100ms delay added. 2025-05-17 14:37:39 +09:00
dalbodeule
1465716e72 [hotfix] hotfix on register and activate logics. 2025-05-16 00:54:43 +09:00
dalbodeule
d0292e0aa6 [hotfix] hotfix on alert embed tags. 2025-05-16 00:42:28 +09:00
dalbodeule
b2ffd18126 [hotfix] hotfix on lateinit botuid is not initialized 2025-05-16 00:38:26 +09:00
dalbodeule
5fa04a6725 [hotfix] hotfix on manage users. 2025-05-16 00:29:50 +09:00
dalbodeule
f65c446bed [hotfix] hotfix on some codes. 2025-05-16 00:25:28 +09:00
JinU Choi
729a88a2b3 Merge pull request #126 from dalbodeule/develop
[refactor] user and live stream handling logic
2025-05-16 00:01:07 +09:00
dalbodeule
a896269087 [refactor] user and live stream handling logic
Replaced ChzzkUserCache with event-based user fetching for cleaner architecture. Integrated new ChzzkUserFindEvent and ChzzkUserReceiveEvent to handle user data retrieval. Removed old utility methods and streamlined live stream status checks with updated APIs.
2025-05-15 04:57:17 +09:00
JinU Choi
d92ad1cc51 Merge pull request #125 from dalbodeule/develop
[feature] chzzk api applied.
2025-05-14 15:51:36 +09:00
dalbodeule
8d54d21620 [refactor] some refactor on UserService.setRefreshToken 2025-05-14 15:49:04 +09:00
dalbodeule
101db7d20c [refactor] chzzk4j update to 0.1.1 2025-05-13 21:31:17 +09:00
dalbodeule
3c3b9a79a2 debug chzzk login 4 2025-05-13 21:25:57 +09:00
dalbodeule
61a5f985c1 Refactor: replace songListScope with appropriate scopes
Replaced `songListScope` with `songScope` in `WSSongRoutes` and `timerScope` in `WSTimerRoutes` to better reflect their respective purposes. Improves code clarity and consistency in scope usage.
2025-04-24 17:48:11 +09:00
dalbodeule
aa95976005 Refactor WebSocket song list handling and improve session logic
Replaced individual WebSocket session management with `SessionHandler` to centralize and streamline logic. Improved code readability, reliability, and maintainability by reducing redundancy and encapsulating session and request handling in dedicated classes. Added retry mechanisms, acknowledgment handling, and better application shutdown handling.
2025-04-24 17:45:25 +09:00
dalbodeule
c5a98943c0 Refactor WebSocket ACK handling and improve message retries
Introduced `waitForAck` to centralize ACK handling logic and updated retry mechanism in `sendWithRetry` to improve reliability and readability. Cleaned up error handling in WebSocket session management and ensured proper cleanup of resources. These changes enhance maintainability and robustness of the WebSocket song list routes.
2025-04-24 17:08:12 +09:00
dalbodeule
8230762053 Refactor WebSocket handlers and add ACK-based message flow
Consolidated coroutine scopes into `songListScope` and `timerScope` for better management across WebSocket routes. Introduced ACK (acknowledgment) handling for reliable message delivery with retries and timeouts. Updated session handling for multiple WebSocket routes to improve code maintainability and consistency.
2025-04-24 16:56:49 +09:00
dalbodeule
d07cdb6ae8 Cancel route scope on application stop and simplify ACK handling.
Added a monitor to cancel the route scope when the application stops, ensuring proper resource cleanup. Removed the timeout logic in the ACK handling method, simplifying the flow while maintaining error handling.
2025-04-24 16:37:11 +09:00
dalbodeule
9c15c8f10d Remove SongListWebSocketManager and simplify wsSongListRoutes
The SongListWebSocketManager class and its associated logic were removed to streamline the codebase. The wsSongListRoutes function was updated accordingly to no longer require the manager as a parameter.
2025-04-24 16:26:25 +09:00
dalbodeule
5a7f78ff3e Refactor WebSocket route to use shared CoroutineScope
Introduced a shared `routeScope` with `SupervisorJob` for better coroutine management across WebSocket routes. This replaces ad-hoc CoroutineScope creation, preventing unnecessary scope overhead and supporting centralized cancellation. Mutexes were added for session and song-related operations to ensure thread safety.
2025-04-24 16:23:55 +09:00
dalbodeule
7a84a9e437 Configure SongListWebSocketManager in wsSongListRoutes.
This change adds a `SongListWebSocketManager` instance with a logger to the `wsSongListRoutes` setup. It improves manageability and ensures better logging for WebSocket interactions in the song list route.
2025-04-24 16:01:16 +09:00
dalbodeule
02cede87f8 Add SongListWebSocketManager and refactor WebSocket routes
Introduced SongListWebSocketManager for managing WebSocket sessions, including ping-pong handling and retry mechanisms. Refactored WSSongListRoutes to delegate session management and simplify logic by leveraging the new manager class.
2025-04-24 15:58:56 +09:00
dalbodeule
17d8065a34 Fix session cleanup in WebSocket routes
Add missing `finally` blocks to ensure session removal in WebSocket routes after exceptions. This prevents potential memory leaks and ensures proper resource cleanup.
2025-04-24 15:01:12 +09:00
dalbodeule
0e8462eaf1 Handle WebSocket session removal on channel closure
Add `removeSession` calls in WebSocket exception handling blocks to ensure proper session cleanup when a `ClosedReceiveChannelException` occurs. Prevents potential resource leaks and ensures consistency across WebSocket routes.
2025-04-24 14:56:00 +09:00
dalbodeule
83cb68b63f **Remove redundant session cleanup in WebSocket error handlers**
Removed unnecessary `removeSession` calls from WebSocket `finally` blocks as they are either handled elsewhere or no longer needed. This simplifies the error handling flow and ensures consistency across WebSocket route implementations.
2025-04-24 14:51:28 +09:00
JinU Choi
c2bb653ee1 Merge pull request #124 from dalbodeule/develop
if account deleted?
2025-03-31 18:47:13 +09:00
dalbodeule
8ab1dc585e if account deleted? 2025-03-31 18:43:12 +09:00
24 changed files with 1155 additions and 672 deletions

View File

@@ -1,6 +1,6 @@
# nabot_chzzk_bot # chibot_chzzk_bot
[![Discord](https://img.shields.io/discord/1250093195870867577)](https://discord.gg/up8ANZegmy)   [![Build Status](https://teamcity.mori.space/app/rest/builds/buildType:NabotChzzkBot_Build/statusIcon)](https://teamcity.mori.space/project/NabotChzzkBot)   [![Docker Image Version](https://img.shields.io/docker/v/dalbodeule/chzzkbot)](https://hub.docker.com/repository/docker/dalbodeule/chzzkbot/general) [![Discord](https://img.shields.io/discord/1250093195870867577)](https://discord.gg/up8ANZegmy)   [![Build Status](https://teamcity.mori.space/app/rest/builds/buildType:NabotChzzkBot_Build/statusIcon)](https://teamcity.mori.space/project/NabotChzzkBot)
## Chzzk Chatbot with [JDA5](https://github.com/discord-jda/JDA), [chzzk4j](https://github.com/R2turnTrue/chzzk4j) ## Chzzk Chatbot with [JDA5](https://github.com/discord-jda/JDA), [chzzk4j](https://github.com/R2turnTrue/chzzk4j)
@@ -17,17 +17,6 @@
- [x] \<daily_counter:counter_name> - [x] \<daily_counter:counter_name>
- [x] \<days:yyyy-mm-dd> - [x] \<days:yyyy-mm-dd>
### 관리 명령어 (on Discord)
- [x] /hook token: \[디스코드 연동 페이지에서 받은 Token]
- [x] /alert channel: \[디스코드 Channel ID] content: \[알림 내용]
- [x] /add label: \[명령어] content: \[내용]
- [ ] /list
- [x] /update label: \[명령어] content: \[내용]
- [x] /delete label: \[명령어]
### 매니저 명령어 (on Discord)
- [x] /addmanager user: \[Discord user]
- [x] /listmanager
- [x] /removemanager user: \[Discord user]
### 관리 명령어 (on Chzzk chat) ### 관리 명령어 (on Chzzk chat)
- [x] !명령어추가 \[명령어] \[내용] - [x] !명령어추가 \[명령어] \[내용]
- [x] !명령어수정 \[명령어] \[내용] - [x] !명령어수정 \[명령어] \[내용]
@@ -42,32 +31,6 @@
- [ ] !노래삭제 \[번호] - [ ] !노래삭제 \[번호]
- [ ] !노래설정 \[내용] \[켜기/끄기] - [ ] !노래설정 \[내용] \[켜기/끄기]
### Envs
- DISCORD_TOKEN
- DB_URL
- DB_USER
- DB_PASS
- RUN_AGENT = `false`
- NID_AUT
- NID_SES
### 사용 예시
- 팔로우
- `/add label: !팔로우 content: <name>님은 오늘로 <following>일째 팔로우네요!`
- 출첵
- `/add label: !출첵 content: <name>님의 <daily_counter:attendance>번째 출석! fail_content: <name>님은 오늘 이미 출석했어요! <daily_counter:attendance>번 했네요?`
- `/add label: ? content: <name>님이 <counter:hook>개째 갈고리 수집`
- ㄱㅇㅇ
- `/add label: ㄱㅇㅇ content: <counter:cute>번째 ㄱㅇㅇ`
- `/add label: ㄱㅇㅇ content: 나누 귀여움 +<counter:cute>`
-
- `/add label: 풉 content: <counter:poop>번째 비웃음?`
- `/add label: 풉키풉키 content: <counter:poop>번째 비웃음?`
- 바보
- `/add label: 바보 content: 나 바보 아니다?`
- `/add label: 바보 content: <counter:fool> 번째 바보? 나 바보 아니다?`
- 첫방송
- `/add label: 첫방송 content: 24년 7월 23일부터 <days:2024-07-23>일 째 방송중!`
## 사용 기술스택 ## 사용 기술스택
- [Exposed](https://github.com/JetBrains/Exposed) - [Exposed](https://github.com/JetBrains/Exposed)

View File

@@ -28,7 +28,8 @@ repositories {
dependencies { dependencies {
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic // https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
implementation("ch.qos.logback:logback-classic:1.5.12") implementation("ch.qos.logback:logback-classic:1.5.13")
implementation("com.github.loki4j:loki-logback-appender:2.0.0")
// https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core // https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")

View File

@@ -16,10 +16,10 @@ dependencies {
} }
// https://mvnrepository.com/artifact/io.github.R2turnTrue/chzzk4j // https://mvnrepository.com/artifact/io.github.R2turnTrue/chzzk4j
implementation("io.github.R2turnTrue:chzzk4j:0.0.12") implementation("io.github.R2turnTrue:chzzk4j:0.1.1")
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic // https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
implementation("ch.qos.logback:logback-classic:1.5.12") implementation("ch.qos.logback:logback-classic:1.5.13")
// https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core // https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.9.0")

View File

@@ -1,29 +1,34 @@
package space.mori.chzzk_bot.chatbot.chzzk package space.mori.chzzk_bot.chatbot.chzzk
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.await import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.chatbot.chzzk.Connector.chzzk
import space.mori.chzzk_bot.chatbot.chzzk.Connector.getChannel import space.mori.chzzk_bot.chatbot.chzzk.Connector.getChannel
import space.mori.chzzk_bot.chatbot.discord.Discord import space.mori.chzzk_bot.chatbot.discord.Discord
import space.mori.chzzk_bot.chatbot.utils.refreshAccessToken
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
import space.mori.chzzk_bot.common.metrics.Metrics
import space.mori.chzzk_bot.common.models.User import space.mori.chzzk_bot.common.models.User
import space.mori.chzzk_bot.common.services.LiveStatusService import space.mori.chzzk_bot.common.services.LiveStatusService
import space.mori.chzzk_bot.common.services.TimerConfigService import space.mori.chzzk_bot.common.services.TimerConfigService
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.* import space.mori.chzzk_bot.common.utils.getChzzkChannelId
import xyz.r2turntrue.chzzk4j.chat.ChatEventListener import space.mori.chzzk_bot.common.utils.getUptime
import xyz.r2turntrue.chzzk4j.chat.ChatMessage import xyz.r2turntrue.chzzk4j.ChzzkClient
import xyz.r2turntrue.chzzk4j.chat.ChzzkChat import xyz.r2turntrue.chzzk4j.session.ChzzkSessionBuilder
import xyz.r2turntrue.chzzk4j.session.ChzzkSessionSubscriptionType
import xyz.r2turntrue.chzzk4j.session.ChzzkUserSession
import xyz.r2turntrue.chzzk4j.session.event.SessionChatMessageEvent
import xyz.r2turntrue.chzzk4j.types.channel.ChzzkChannel import xyz.r2turntrue.chzzk4j.types.channel.ChzzkChannel
import java.lang.Exception import xyz.r2turntrue.chzzk4j.types.channel.live.ChzzkLiveDetail
import java.lang.Runnable
import java.lang.Thread
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.nio.charset.Charset
import java.time.LocalDateTime import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
object ChzzkHandler { object ChzzkHandler {
private val handlers = mutableListOf<UserHandler>() private val handlers = mutableListOf<UserHandler>()
@@ -32,20 +37,27 @@ object ChzzkHandler {
@Volatile private var running: Boolean = false @Volatile private var running: Boolean = false
private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
private val lastRunMap = ConcurrentHashMap<String, Long>()
private val requiredWait = 300_000L
fun addUser(chzzkChannel: ChzzkChannel, user: User) { fun addUser(chzzkChannel: ChzzkChannel, user: User) {
handlers.add(UserHandler(chzzkChannel, logger, user, streamStartTime = null)) handlers.add(UserHandler(chzzkChannel, logger, user, streamStartTime = LocalDateTime.now()))
} }
fun enable() { fun enable() {
botUid = chzzk.loggedUser.userId botUid = Connector.client.fetchLoggedUser().userId
UserService.getAllUsers().map { UserService.getAllUsers().map {
if(!it.isDisabled) if(!it.isDisabled)
chzzk.getChannel(it.token)?.let { token -> addUser(token, it) } try {
getChannel(it.token)?.let { token -> addUser(token, it) }
} catch(e: Exception) {
logger.info("Exception: ${it.token}(${it.username}) not found. ${e.stackTraceToString()}")
}
} }
handlers.forEach { handler -> handlers.forEach { handler ->
val streamInfo = getStreamInfo(handler.listener.channelId) val streamInfo = Connector.getLive(handler.channel.channelId)
if (streamInfo.content?.status == "OPEN") handler.isActive(true, streamInfo) if (streamInfo?.isOnline == true) handler.isActive(true, streamInfo)
} }
dispatcher.subscribe(UserRegisterEvent::class) { dispatcher.subscribe(UserRegisterEvent::class) {
@@ -74,8 +86,10 @@ object ChzzkHandler {
} }
fun disable() { fun disable() {
handlers.forEach { handler -> CoroutineScope(Dispatchers.Default).launch {
handler.disable() handlers.forEach { handler ->
handler.disable()
}
} }
} }
@@ -104,20 +118,21 @@ object ChzzkHandler {
handlers.forEach { handlers.forEach {
if (!running) return@forEach if (!running) return@forEach
try { try {
val streamInfo = getStreamInfo(it.channel.channelId) val streamInfo = Connector.getLive(it.channel.channelId)
if (streamInfo.content?.status == "OPEN" && !it.isActive) { if (streamInfo?.isOnline == true && !it.isActive) {
try { try {
it.isActive(true, streamInfo) it.isActive(true, streamInfo)
} catch(e: Exception) { } catch(e: Exception) {
logger.info("Exception: ${e.stackTraceToString()}") logger.info("Thread 1 Exception: ${e.stackTraceToString()}")
} }
} }
if (streamInfo.content?.status == "CLOSE" && it.isActive) it.isActive(false, streamInfo) if (streamInfo?.isOnline == false && it.isActive) it.isActive(false, streamInfo)
} catch (e: SocketTimeoutException) { } catch (e: SocketTimeoutException) {
logger.info("Thread 1 Timeout: ${it.channel.channelName} / ${e.stackTraceToString()}") logger.info("Thread 1 Timeout: ${it.channel.channelName} / ${e.stackTraceToString()}")
} catch (e: Exception) { } catch (e: Exception) {
logger.info("Thread 1 Exception: ${it.channel.channelName} / ${e.stackTraceToString()}") logger.info("Thread 1 Exception: ${it.channel.channelName} / ${e.stackTraceToString()}")
} finally { } finally {
lastRunMap[it.channel.channelId] = System.currentTimeMillis()
Thread.sleep(5000) Thread.sleep(5000)
} }
} }
@@ -126,21 +141,30 @@ object ChzzkHandler {
} }
val threadRunner2 = Runnable { val threadRunner2 = Runnable {
logger.info("Thread 2 started!")
logger.info("Thread 2 started!") logger.info("Thread 2 started!")
while (running) { while (running) {
handlers.forEach { handlers.forEach {
if (!running) return@forEach if (!running) return@forEach
try { try {
val streamInfo = getStreamInfo(it.channel.channelId) val now = System.currentTimeMillis()
if (streamInfo.content?.status == "OPEN" && !it.isActive) { val lastRun = lastRunMap[it.channel.channelId] ?: 0L
val waitedTime = now - lastRun
if(waitedTime < requiredWait) {
val sleepTime = requiredWait - waitedTime
logger.info("Thread 2 Sleep: ${it.channel.channelName} / ${sleepTime}ms")
Thread.sleep(sleepTime)
}
val streamInfo = Connector.getLive(it.channel.channelId)
if (streamInfo?.isOnline == true && !it.isActive) {
try { try {
it.isActive(true, streamInfo) it.isActive(true, streamInfo)
} catch(e: Exception) { } catch(e: Exception) {
logger.info("Exception: ${e.stackTraceToString()}") logger.info("Thread 2 Exception: ${e.stackTraceToString()}")
} }
} }
if (streamInfo.content?.status == "CLOSE" && it.isActive) it.isActive(false, streamInfo) if (streamInfo?.isOnline == false && it.isActive) it.isActive(false, streamInfo)
} catch (e: SocketTimeoutException) { } catch (e: SocketTimeoutException) {
logger.info("Thread 2 Timeout: ${it.channel.channelName} / ${e.stackTraceToString()}") logger.info("Thread 2 Timeout: ${it.channel.channelName} / ${e.stackTraceToString()}")
} catch (e: Exception) { } catch (e: Exception) {
@@ -188,14 +212,18 @@ object ChzzkHandler {
} }
} }
@OptIn(DelicateCoroutinesApi::class)
class UserHandler( class UserHandler(
val channel: ChzzkChannel, val channel: ChzzkChannel,
val logger: Logger, val logger: Logger,
private var user: User, private var user: User,
var streamStartTime: LocalDateTime?, var streamStartTime: LocalDateTime?,
val chatLogger: Logger = LoggerFactory.getLogger("${channel.channelName}-chat"),
) { ) {
var messageHandler: MessageHandler lateinit var client: ChzzkClient
var listener: ChzzkChat lateinit var chatChannelId: String
var listener: ChzzkUserSession? = null
lateinit var messageHandler: MessageHandler
private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
private var _isActive: Boolean private var _isActive: Boolean
@@ -204,36 +232,56 @@ class UserHandler(
LiveStatusService.updateOrCreate(user, value) LiveStatusService.updateOrCreate(user, value)
} }
init { private suspend fun connect() {
listener = chzzk.chat(channel.channelId) val user = UserService.getUser(channel.channelId)
.withAutoReconnect(true)
.withChatListener(object : ChatEventListener {
override fun onConnect(chat: ChzzkChat, isReconnecting: Boolean) {
logger.info("${channel.channelName} - ${channel.channelId} / reconnected: $isReconnecting")
}
override fun onError(ex: Exception) { if(user?.accessToken == null || user.refreshToken == null) {
logger.info("ChzzkChat error. ${channel.channelName} - ${channel.channelId}") throw RuntimeException("AccessToken or RefreshToken is not valid.")
logger.info(ex.stackTraceToString()) }
}
override fun onChat(msg: ChatMessage) { val tokens = user.refreshToken?.let { token -> Connector.client.refreshAccessToken(token)}
if(!_isActive) return if(tokens == null) {
messageHandler.handle(msg, user) throw RuntimeException("AccessToken is not valid.")
} }
client = Connector.getClient(tokens.first, tokens.second)
UserService.setRefreshToken(user, tokens.first, tokens.second)
chatChannelId = getChzzkChannelId(channel.channelId) ?: throw RuntimeException("Chat Channel ID is not found.")
client.loginAsync().await()
listener = ChzzkSessionBuilder(client).buildUserSession()
delay(5000L)
override fun onConnectionClosed(code: Int, reason: String?, remote: Boolean, tryingToReconnect: Boolean) {
logger.info("ChzzkChat closed. ${channel.channelName} - ${channel.channelId}")
logger.info("Reason: $reason / $tryingToReconnect")
}
})
.build()
messageHandler = MessageHandler(this@UserHandler) messageHandler = MessageHandler(this@UserHandler)
logger.info("${user.username} message handler init.")
listener?.on(SessionChatMessageEvent::class.java) {
messageHandler.handle(it.message, user)
}
logger.info("${user.username} is connected.")
val timer = TimerConfigService.getConfig(user)
if (timer?.option == TimerType.UPTIME.value)
dispatcher.post(
TimerEvent(
channel.channelId,
TimerType.UPTIME,
getUptime(streamStartTime!!)
)
)
else dispatcher.post(
TimerEvent(
channel.channelId,
TimerType.entries.firstOrNull { it.value == timer?.option } ?: TimerType.REMOVE,
null
)
)
} }
internal suspend fun disable() {
listener?.unsubscribeAsync(ChzzkSessionSubscriptionType.CHAT)?.await()
listener?.disconnectAsync()?.await()
internal fun disable() { _isActive = false
listener.closeAsync()
} }
internal fun reloadCommand() { internal fun reloadCommand() {
@@ -247,17 +295,21 @@ class UserHandler(
internal val isActive: Boolean internal val isActive: Boolean
get() = _isActive get() = _isActive
internal fun isActive(value: Boolean, status: IData<IStreamInfo?>) { internal fun isActive(value: Boolean, status: ChzzkLiveDetail) {
if(value) { if(value) {
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
if(listener == null)
connect()
listener!!.createAndConnectAsync()?.await()
listener!!.subscribeAsync(ChzzkSessionSubscriptionType.CHAT)?.await()
logger.info("${user.username} is live.") logger.info("${user.username} is live.")
reloadUser(UserService.getUser(user.id.value)!!) reloadUser(UserService.getUser(user.id.value)!!)
logger.info("ChzzkChat connecting... ${channel.channelName} - ${channel.channelId}") logger.info("ChzzkChat connecting... ${channel.channelName} - ${channel.channelId}")
listener.connectAsync().await() streamStartTime = LocalDateTime.now()
Metrics.increaseStreaming()
streamStartTime = status.content?.openDate?.let { convertChzzkDateToLocalDateTime(it) }
if(!_isActive) { if(!_isActive) {
_isActive = true _isActive = true
@@ -281,7 +333,7 @@ class UserHandler(
delay(5000L) delay(5000L)
try { try {
if(!user.isDisableStartupMsg) if(!user.isDisableStartupMsg)
listener.sendChat("${user.username} 님! 오늘도 열심히 방송하세요!") sendChat("${user.username} 님! 오늘도 열심히 방송하세요!")
Discord.sendDiscord(user, status) Discord.sendDiscord(user, status)
} catch(e: Exception) { } catch(e: Exception) {
logger.info("Stream on logic has some error: ${e.stackTraceToString()}") logger.info("Stream on logic has some error: ${e.stackTraceToString()}")
@@ -291,8 +343,10 @@ class UserHandler(
} else { } else {
logger.info("${user.username} is offline.") logger.info("${user.username} is offline.")
streamStartTime = null streamStartTime = null
listener.closeAsync() listener?.unsubscribeAsync(ChzzkSessionSubscriptionType.CHAT)?.join()
listener?.disconnectAsync()?.join()
_isActive = false _isActive = false
Metrics.decreaseStreaming()
CoroutineScope(Dispatchers.Default).launch { CoroutineScope(Dispatchers.Default).launch {
val events = listOf( val events = listOf(
@@ -315,4 +369,23 @@ class UserHandler(
} }
} }
} }
private fun String.limitUtf8Length(maxBytes: Int): String {
val bytes = this.toByteArray(Charset.forName("UTF-8"))
if (bytes.size <= maxBytes) return this
var truncatedString = this
while (truncatedString.toByteArray(Charset.forName("UTF-8")).size > maxBytes) {
truncatedString = truncatedString.substring(0, truncatedString.length - 1)
}
return truncatedString
}
@OptIn(DelicateCoroutinesApi::class)
internal fun sendChat(msg: String) {
GlobalScope.launch {
delay(100L)
client.sendChatToLoggedInChannel(msg.limitUtf8Length(100))
chatLogger.info("[SEND]${channel.channelName}: ${msg.limitUtf8Length(100)}{${msg.length} / 100}")
}
}
} }

View File

@@ -1,24 +1,66 @@
package space.mori.chzzk_bot.chatbot.chzzk package space.mori.chzzk_bot.chatbot.chzzk
import io.github.cdimascio.dotenv.dotenv import io.github.cdimascio.dotenv.dotenv
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import xyz.r2turntrue.chzzk4j.Chzzk import space.mori.chzzk_bot.common.events.ChzzkUserFindEvent
import xyz.r2turntrue.chzzk4j.ChzzkBuilder import space.mori.chzzk_bot.common.events.ChzzkUserReceiveEvent
import space.mori.chzzk_bot.common.events.CoroutinesEventBus
import xyz.r2turntrue.chzzk4j.ChzzkClient
import xyz.r2turntrue.chzzk4j.ChzzkClientBuilder
import xyz.r2turntrue.chzzk4j.auth.ChzzkLegacyLoginAdapter
import xyz.r2turntrue.chzzk4j.auth.ChzzkSimpleUserLoginAdapter
import xyz.r2turntrue.chzzk4j.types.channel.ChzzkChannel import xyz.r2turntrue.chzzk4j.types.channel.ChzzkChannel
import xyz.r2turntrue.chzzk4j.types.channel.live.ChzzkLiveDetail
import kotlin.getValue
val dotenv = dotenv { val dotenv = dotenv {
ignoreIfMissing = true ignoreIfMissing = true
} }
@OptIn(DelicateCoroutinesApi::class)
object Connector { object Connector {
val chzzk: Chzzk = ChzzkBuilder() val adapter = ChzzkLegacyLoginAdapter(dotenv["NID_AUT"], dotenv["NID_SES"])
.withAuthorization(dotenv["NID_AUT"], dotenv["NID_SES"]) val client: ChzzkClient = ChzzkClientBuilder(dotenv["NAVER_CLIENT_ID"], dotenv["NAVER_CLIENT_SECRET"])
.withLoginAdapter(adapter)
.build() .build()
private val logger = LoggerFactory.getLogger(this::class.java) private val logger = LoggerFactory.getLogger(this::class.java)
private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
fun getChannel(channelId: String): ChzzkChannel? = chzzk.getChannel(channelId) fun getChannel(channelId: String): ChzzkChannel? = client.fetchChannel(channelId)
fun getLive(channelId: String): ChzzkLiveDetail? = client.fetchLiveDetail(channelId)
init { init {
logger.info("chzzk logged: ${chzzk.isLoggedIn} / ${chzzk.loggedUser?.nickname ?: "----"}") logger.info("chzzk logged: ${client.isLoggedIn}")
client.loginAsync().join()
dispatcher.subscribe(ChzzkUserFindEvent::class) { event ->
GlobalScope.launch {
val user = getChannel(event.uid)
dispatcher.post(ChzzkUserReceiveEvent(
find = user != null,
uid = user?.channelId,
nickname = user?.channelName,
isStreamOn = user?.isBroadcasting,
avatarUrl = user?.channelImageUrl
))
}
}
} }
fun getClient(accessToken: String, refreshToken: String): ChzzkClient {
val adapter = ChzzkSimpleUserLoginAdapter(accessToken, refreshToken)
val client = ChzzkClientBuilder(dotenv["NAVER_CLIENT_ID"], dotenv["NAVER_CLIENT_SECRET"])
.withLoginAdapter(adapter)
.build()
return client
}
} }

View File

@@ -11,17 +11,16 @@ import space.mori.chzzk_bot.common.services.*
import space.mori.chzzk_bot.common.utils.getFollowDate import space.mori.chzzk_bot.common.utils.getFollowDate
import space.mori.chzzk_bot.common.utils.getUptime import space.mori.chzzk_bot.common.utils.getUptime
import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import xyz.r2turntrue.chzzk4j.chat.ChatMessage import xyz.r2turntrue.chzzk4j.session.message.SessionChatMessage
import xyz.r2turntrue.chzzk4j.chat.ChzzkChat import xyz.r2turntrue.chzzk4j.types.channel.live.ChzzkLiveSettings
import java.time.LocalDateTime import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
class MessageHandler( class MessageHandler(
private val handler: UserHandler private val handler: UserHandler
) { ) {
private val commands = mutableMapOf<String, (msg: ChatMessage, user: User) -> Unit>() private val commands = mutableMapOf<String, (msg: SessionChatMessage, user: User) -> Unit>()
private val counterPattern = Regex("<counter:([^>]+)>") private val counterPattern = Regex("<counter:([^>]+)>")
private val personalCounterPattern = Regex("<counter_personal:([^>]+)>") private val personalCounterPattern = Regex("<counter_personal:([^>]+)>")
@@ -32,17 +31,20 @@ class MessageHandler(
private val channel = handler.channel private val channel = handler.channel
private val logger = handler.logger private val logger = handler.logger
private val chatLogger = handler.chatLogger
private val listener = handler.listener private val listener = handler.listener
private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) private val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
init { init {
reloadCommand() reloadCommand()
dispatcher.subscribe(SongEvent::class) { CoroutineScope(Dispatchers.Default).launch {
if(it.type == SongType.STREAM_OFF) { dispatcher.subscribe(SongEvent::class) {
val user = UserService.getUser(channel.channelId) if(it.type == SongType.STREAM_OFF) {
if(! user?.let { usr -> SongListService.getSong(usr) }.isNullOrEmpty()) { val user = UserService.getUser(channel.channelId)
SongListService.deleteUser(user) if(! user?.let { usr -> SongListService.getSong(usr) }.isNullOrEmpty()) {
SongListService.deleteUser(user)
}
} }
} }
} }
@@ -61,6 +63,8 @@ class MessageHandler(
"!신청곡" to this::songAddCommand, "!신청곡" to this::songAddCommand,
"!노래목록" to this::songListCommand, "!노래목록" to this::songListCommand,
"!노래시작" to this::songStartCommand, "!노래시작" to this::songStartCommand,
"!카테고리" to this::categoryChangeCommand,
"!방제" to this::titleChangeCommand,
) )
manageCommands.forEach { (commandName, command) -> manageCommands.forEach { (commandName, command) ->
@@ -71,85 +75,90 @@ class MessageHandler(
this.commands.put(it.command.lowercase()) { msg, user -> this.commands.put(it.command.lowercase()) { msg, user ->
logger.debug("${channel.channelName} - ${it.command} - ${it.content}/${it.failContent}") logger.debug("${channel.channelName} - ${it.command} - ${it.content}/${it.failContent}")
val result = replaceCounters(Pair(it.content, it.failContent), user, msg, listener, msg.profile?.nickname ?: "") val result = replaceCounters(
listener.sendChat(result) Pair(it.content, it.failContent),
user,
msg,
msg.profile?.nickname ?: ""
)
handler.sendChat(result)
} }
} }
} }
private fun commandListCommand(msg: ChatMessage, user: User) { private fun commandListCommand(msg: SessionChatMessage, user: User) {
listener.sendChat("리스트는 여기입니다. https://nabot.mori.space/commands/${user.token}") handler.sendChat("리스트는 여기입니다. https://chibot.mori.space/commands/${user.token}")
} }
private fun manageAddCommand(msg: ChatMessage, user: User) { private fun manageAddCommand(msg: SessionChatMessage, user: User) {
if (msg.profile?.userRoleCode == "common_user") { if (msg.profile.badges.none { it.isModerator() }) {
listener.sendChat("매니저만 명령어를 추가할 수 있습니다.") handler.sendChat("매니저만 명령어를 추가할 수 있습니다.")
return return
} }
val parts = msg.content.split(" ", limit = 3) val parts = msg.content.split(" ", limit = 3)
if (parts.size < 3) { if (parts.size < 3) {
listener.sendChat("명령어 추가 형식은 '!명령어추가 명령어 내용'입니다.") handler.sendChat("명령어 추가 형식은 '!명령어추가 명령어 내용'입니다.")
return return
} }
if (commands.containsKey(parts[1])) { if (commands.containsKey(parts[1])) {
listener.sendChat("${parts[1]} 명령어는 이미 있는 명령어입니다.") handler.sendChat("${parts[1]} 명령어는 이미 있는 명령어입니다.")
return return
} }
val command = parts[1] val command = parts[1]
val content = parts[2] val content = parts[2]
CommandService.saveCommand(user, command, content, "") CommandService.saveCommand(user, command, content, "")
listener.sendChat("명령어 '$command' 추가되었습니다.") handler.sendChat("명령어 '$command' 추가되었습니다.")
} }
private fun manageUpdateCommand(msg: ChatMessage, user: User) { private fun manageUpdateCommand(msg: SessionChatMessage, user: User) {
if (msg.profile?.userRoleCode == "common_user") { if (msg.profile.badges.none { it.isModerator() }) {
listener.sendChat("매니저만 명령어를 추가할 수 있습니다.") handler.sendChat("매니저만 명령어를 추가할 수 있습니다.")
return return
} }
val parts = msg.content.split(" ", limit = 3) val parts = msg.content.split(" ", limit = 3)
if (parts.size < 3) { if (parts.size < 3) {
listener.sendChat("명령어 수정 형식은 '!명령어수정 명령어 내용'입니다.") handler.sendChat("명령어 수정 형식은 '!명령어수정 명령어 내용'입니다.")
return return
} }
if (!commands.containsKey(parts[1])) { if (!commands.containsKey(parts[1])) {
listener.sendChat("${parts[1]} 명령어는 없는 명령어입니다.") handler.sendChat("${parts[1]} 명령어는 없는 명령어입니다.")
return return
} }
val command = parts[1] val command = parts[1]
val content = parts[2] val content = parts[2]
CommandService.updateCommand(user, command, content, "") CommandService.updateCommand(user, command, content, "")
listener.sendChat("명령어 '$command' 수정되었습니다.") handler.sendChat("명령어 '$command' 수정되었습니다.")
ChzzkHandler.reloadCommand(channel) ChzzkHandler.reloadCommand(channel)
} }
private fun manageRemoveCommand(msg: ChatMessage, user: User) { private fun manageRemoveCommand(msg: SessionChatMessage, user: User) {
if (msg.profile?.userRoleCode == "common_user") { if (msg.profile.badges.none { it.isModerator() }) {
listener.sendChat("매니저만 명령어를 삭제할 수 있습니다.") handler.sendChat("매니저만 명령어를 삭제할 수 있습니다.")
return return
} }
val parts = msg.content.split(" ", limit = 2) val parts = msg.content.split(" ", limit = 2)
if (parts.size < 2) { if (parts.size < 2) {
listener.sendChat("명령어 삭제 형식은 '!명령어삭제 명령어'입니다.") handler.sendChat("명령어 삭제 형식은 '!명령어삭제 명령어'입니다.")
return return
} }
val command = parts[1] val command = parts[1]
CommandService.removeCommand(user, command) CommandService.removeCommand(user, command)
listener.sendChat("명령어 '$command' 삭제되었습니다.") handler.sendChat("명령어 '$command' 삭제되었습니다.")
ChzzkHandler.reloadCommand(channel) ChzzkHandler.reloadCommand(channel)
} }
private fun timerCommand(msg: ChatMessage, user: User) { private fun timerCommand(msg: SessionChatMessage, user: User) {
if (msg.profile?.userRoleCode == "common_user") { if (msg.profile.badges.none { it.isModerator() }) {
listener.sendChat("매니저만 이 명령어를 사용할 수 있습니다.") handler.sendChat("매니저만 이 명령어를 사용할 수 있습니다.")
return return
} }
val parts = msg.content.split(" ", limit = 3) val parts = msg.content.split(" ", limit = 3)
if (parts.size < 2) { if (parts.size < 2) {
listener.sendChat("타이머 명령어 형식을 잘 찾아봐주세요!") handler.sendChat("타이머 명령어 형식을 잘 찾아봐주세요!")
return return
} }
@@ -178,13 +187,13 @@ class MessageHandler(
when (parts[2]) { when (parts[2]) {
"업타임" -> { "업타임" -> {
TimerConfigService.saveOrUpdateConfig(user, TimerType.UPTIME) TimerConfigService.saveOrUpdateConfig(user, TimerType.UPTIME)
listener.sendChat("기본 타이머 설정이 업타임으로 바뀌었습니다.") handler.sendChat("기본 타이머 설정이 업타임으로 바뀌었습니다.")
} }
"삭제" -> { "삭제" -> {
TimerConfigService.saveOrUpdateConfig(user, TimerType.REMOVE) TimerConfigService.saveOrUpdateConfig(user, TimerType.REMOVE)
listener.sendChat("기본 타이머 설정이 삭제로 바뀌었습니다.") handler.sendChat("기본 타이머 설정이 삭제로 바뀌었습니다.")
} }
else -> listener.sendChat("!타이머 설정 (업타임/삭제) 형식으로 써주세요!") else -> handler.sendChat("!타이머 설정 (업타임/삭제) 형식으로 써주세요!")
} }
} }
else -> { else -> {
@@ -198,9 +207,9 @@ class MessageHandler(
dispatcher.post(TimerEvent(user.token, TimerType.TIMER, timestamp.toString())) dispatcher.post(TimerEvent(user.token, TimerType.TIMER, timestamp.toString()))
} }
} catch (e: NumberFormatException) { } catch (e: NumberFormatException) {
listener.sendChat("!타이머/숫자 형식으로 적어주세요! 단위: 분") handler.sendChat("!타이머/숫자 형식으로 적어주세요! 단위: 분")
} catch (e: Exception) { } catch (e: Exception) {
listener.sendChat("타이머 설정 중 오류가 발생했습니다.") handler.sendChat("타이머 설정 중 오류가 발생했습니다.")
logger.error("Error processing timer command: ${e.message}", e) logger.error("Error processing timer command: ${e.message}", e)
} }
} }
@@ -208,21 +217,21 @@ class MessageHandler(
} }
// songs // songs
private fun songAddCommand(msg: ChatMessage, user: User) { private fun songAddCommand(msg: SessionChatMessage, user: User) {
if(SongConfigService.getConfig(user).disabled) { if(SongConfigService.getConfig(user).disabled) {
return return
} }
val parts = msg.content.split(" ", limit = 2) val parts = msg.content.split(" ", limit = 2)
if (parts.size < 2) { if (parts.size < 2) {
listener.sendChat("유튜브 URL을 입력해주세요!") handler.sendChat("유튜브 URL을 입력해주세요!")
return return
} }
val config = SongConfigService.getConfig(user) val config = SongConfigService.getConfig(user)
if(config.streamerOnly && msg.profile?.userRoleCode == "common_user") { if(config.streamerOnly && msg.profile.badges.none { it.imageUrl.contains("manager") || it.imageUrl.contains("streamer") }) {
listener.sendChat("매니저만 이 명령어를 사용할 수 있습니다.") handler.sendChat("매니저만 이 명령어를 사용할 수 있습니다.")
return return
} }
@@ -230,34 +239,34 @@ class MessageHandler(
val songs = SongListService.getSong(user) val songs = SongListService.getSong(user)
if(songs.size >= config.queueLimit) { if(songs.size >= config.queueLimit) {
listener.sendChat("더이상 노래를 신청할 수 없습니다. 잠시 뒤 다시 시도해주세요!") handler.sendChat("더이상 노래를 신청할 수 없습니다. 잠시 뒤 다시 시도해주세요!")
return return
} }
if(songs.filter { it.uid == msg.userId }.size >= config.personalLimit) { if(songs.filter { it.uid == msg.senderChannelId }.size >= config.personalLimit) {
listener.sendChat("더이상 노래를 신청할 수 없습니다. 잠시 뒤 다시 시도해주세요!") handler.sendChat("더이상 노래를 신청할 수 없습니다. 잠시 뒤 다시 시도해주세요!")
return return
} }
try { try {
val video = getYoutubeVideo(url) val video = getYoutubeVideo(url)
if (video == null) { if (video == null) {
listener.sendChat("유튜브에서 찾을 수 없어요!") handler.sendChat("유튜브에서 찾을 수 없어요!")
return return
} }
if (songs.any { it.url == video.url }) { if (songs.any { it.url == video.url }) {
listener.sendChat("같은 노래가 이미 신청되어 있습니다.") handler.sendChat("같은 노래가 이미 신청되어 있습니다.")
return return
} }
if (video.length > 600) { if (video.length > 600) {
listener.sendChat("10분이 넘는 노래는 신청할 수 없습니다.") handler.sendChat("10분이 넘는 노래는 신청할 수 없습니다.")
return return
} }
SongListService.saveSong( SongListService.saveSong(
user, user,
msg.userId, msg.senderChannelId,
video.url, video.url,
video.name, video.name,
video.author, video.author,
@@ -269,31 +278,31 @@ class MessageHandler(
SongEvent( SongEvent(
user.token, user.token,
SongType.ADD, SongType.ADD,
msg.userId, msg.senderChannelId,
null, null,
video, video,
) )
) )
} }
listener.sendChat("노래가 추가되었습니다. ${video.name} - ${video.author}") handler.sendChat("노래가 추가되었습니다. ${video.name} - ${video.author}")
} catch(e: Exception) { } catch(e: Exception) {
listener.sendChat("유튜브 영상 주소로 다시 신청해주세요!") handler.sendChat("유튜브 영상 주소로 다시 신청해주세요!")
logger.info(e.stackTraceToString()) logger.info(e.stackTraceToString())
} }
} }
private fun songListCommand(msg: ChatMessage, user: User) { private fun songListCommand(msg: SessionChatMessage, user: User) {
if(SongConfigService.getConfig(user).disabled) { if(SongConfigService.getConfig(user).disabled) {
return return
} }
listener.sendChat("리스트는 여기입니다. https://nabot.mori.space/songs/${user.token}") handler.sendChat("리스트는 여기입니다. https://chibot.mori.space/songs/${user.token}")
} }
private fun songStartCommand(msg: ChatMessage, user: User) { private fun songStartCommand(msg: SessionChatMessage, user: User) {
if (msg.profile?.userRoleCode == "common_user") { if (msg.profile.badges.none { it.isModerator() }) {
listener.sendChat("매니저만 이 명령어를 사용할 수 있습니다.") handler.sendChat("매니저만 이 명령어를 사용할 수 있습니다.")
return return
} }
@@ -301,33 +310,85 @@ class MessageHandler(
if(user.discord != null) { if(user.discord != null) {
bot.retrieveUserById(user.discord!!).queue { discordUser -> bot.retrieveUserById(user.discord!!).queue { discordUser ->
discordUser?.openPrivateChannel()?.queue { channel -> discordUser?.openPrivateChannel()?.queue { channel ->
channel.sendMessage("여기로 접속해주세요! ||https://nabot.mori.space/songlist||.") channel.sendMessage("여기로 접속해주세요! ||https://chibot.mori.space/songlist||.")
.queue() .queue()
} }
} }
} else { } else {
listener.sendChat("나봇 홈페이지의 노래목록 페이지를 이용해주세요! 디스코드 연동을 하시면 DM으로 바로 전송됩니다.") handler.sendChat("나봇 홈페이지의 노래목록 페이지를 이용해주세요! 디스코드 연동을 하시면 DM으로 바로 전송됩니다.")
} }
} }
internal fun handle(msg: ChatMessage, user: User) { private fun categoryChangeCommand(msg: SessionChatMessage, user: User) {
if(msg.userId == ChzzkHandler.botUid) return if (msg.profile.badges.none { it.isModerator() }) {
handler.sendChat("매니저만 이 명령어를 사용할 수 있습니다.")
return
}
val parts = msg.content.split(" ", limit = 2)
if(parts.size <= 1) {
handler.sendChat("카테고리가 없습니다.")
return
}
val category = parts[1].let {
return@let when(it) {
"저챗", "토크", "JustChatting", "Just Chatting", "" -> "talk"
else -> it
}
}
handler.sendChat("$category 카테고리로 수정을 시도해볼게요!")
handler.client.searchCategories(category).handle { result, _ ->
if(result.size == 0) {
handler.sendChat("$category 카테고리는 없습니다.")
return@handle
}
val settings = ChzzkLiveSettings()
settings.category = result.first {
it.categoryValue == category
} ?: result[0]
handler.client.modifyLiveSettings(settings)
handler.sendChat("$category 로 수정했어요!")
}
}
private fun titleChangeCommand(msg: SessionChatMessage, user: User) {
if (msg.profile.badges.none { it.isModerator() }) {
handler.sendChat("매니저만 이 명령어를 사용할 수 있습니다.")
return
}
val parts = msg.content.split(" ", limit = 2)
if(parts.size <= 1) {
handler.sendChat("입력된 방송 제목이 없습니다.")
return
}
val title = parts[1]
val settings = ChzzkLiveSettings()
settings.defaultLiveTitle = title
handler.client.modifyLiveSettings(settings)
handler.sendChat("$title 로 수정했어요!")
}
internal fun handle(msg: SessionChatMessage, user: User) {
if(msg.senderChannelId == ChzzkHandler.botUid) return
chatLogger.info("[RECV]${channel.channelName}: ${msg.content}{${msg.content.length} / 100}")
val commandKey = msg.content.split(' ')[0] val commandKey = msg.content.split(' ')[0]
commands[commandKey.lowercase()]?.let { it(msg, user) } commands[commandKey.lowercase()]?.let { it(msg, user) }
} }
private fun replaceCounters(chat: Pair<String, String>, user: User, msg: ChatMessage, listener: ChzzkChat, userName: String): String { private fun replaceCounters(chat: Pair<String, String>, user: User, msg: SessionChatMessage, userName: String): String {
var result = chat.first var result = chat.first
var isFail = false var isFail = false
// Replace dailyCounterPattern // Replace dailyCounterPattern
result = dailyCounterPattern.replace(result) { matchResult -> result = dailyCounterPattern.replace(result) { matchResult ->
val name = matchResult.groupValues[1] val name = matchResult.groupValues[1]
val dailyCounter = CounterService.getDailyCounterValue(name, msg.userId, user) val dailyCounter = CounterService.getDailyCounterValue(name, msg.senderChannelId, user)
if (dailyCounter.second) { if (dailyCounter.second) {
CounterService.updateDailyCounterValue(name, msg.userId, 1, user).first.toString() CounterService.updateDailyCounterValue(name, msg.senderChannelId, 1, user).first.toString()
} else { } else {
isFail = true isFail = true
dailyCounter.first.toString() dailyCounter.first.toString()
@@ -339,7 +400,7 @@ class MessageHandler(
result = chat.second result = chat.second
result = dailyCounterPattern.replace(result) { matchResult -> result = dailyCounterPattern.replace(result) { matchResult ->
val name = matchResult.groupValues[1] val name = matchResult.groupValues[1]
val dailyCounter = CounterService.getDailyCounterValue(name, msg.userId, user) val dailyCounter = CounterService.getDailyCounterValue(name, msg.senderChannelId, user)
dailyCounter.first.toString() dailyCounter.first.toString()
} }
} }
@@ -347,15 +408,15 @@ class MessageHandler(
// Replace followPattern // Replace followPattern
result = followPattern.replace(result) { _ -> result = followPattern.replace(result) { _ ->
try { try {
val followingDate = getFollowDate(listener.chatId, msg.userId) val followingDate = handler.chatChannelId?.let { getFollowDate(it, msg.senderChannelId) }
.content?.streamingProperty?.following?.followDate ?.content?.streamingProperty?.following?.followDate ?: LocalDateTime.now().minusDays(1).toString()
val period = followingDate?.let { val period = followingDate.let {
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val pastDate = LocalDateTime.parse(it, formatter) val pastDate = LocalDateTime.parse(it, formatter)
val today = LocalDateTime.now() val today = LocalDateTime.now()
ChronoUnit.DAYS.between(pastDate, today) ChronoUnit.DAYS.between(pastDate, today)
} ?: 0 } + 1
period.toString() period.toString()
} catch (e: Exception) { } catch (e: Exception) {
@@ -383,7 +444,7 @@ class MessageHandler(
// Replace personalCounterPattern // Replace personalCounterPattern
result = personalCounterPattern.replace(result) { matchResult -> result = personalCounterPattern.replace(result) { matchResult ->
val name = matchResult.groupValues[1] val name = matchResult.groupValues[1]
CounterService.updatePersonalCounterValue(name, msg.userId, 1, user).toString() CounterService.updatePersonalCounterValue(name, msg.senderChannelId, 1, user).toString()
} }
// Replace namePattern // Replace namePattern
@@ -391,5 +452,8 @@ class MessageHandler(
return result return result
} }
} }
fun SessionChatMessage.Profile.Badge.isManager() = imageUrl.contains("manager")
fun SessionChatMessage.Profile.Badge.isStreamer() = imageUrl.contains("streamer")
fun SessionChatMessage.Profile.Badge.isModerator() = isManager() || isStreamer()

View File

@@ -12,11 +12,12 @@ import net.dv8tion.jda.api.events.interaction.command.SlashCommandInteractionEve
import net.dv8tion.jda.api.hooks.ListenerAdapter import net.dv8tion.jda.api.hooks.ListenerAdapter
import net.dv8tion.jda.api.utils.messages.MessageCreateBuilder import net.dv8tion.jda.api.utils.messages.MessageCreateBuilder
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.utils.IData
import space.mori.chzzk_bot.common.utils.IStreamInfo
import space.mori.chzzk_bot.chatbot.discord.commands.* import space.mori.chzzk_bot.chatbot.discord.commands.*
import space.mori.chzzk_bot.common.models.User import space.mori.chzzk_bot.common.models.User
import xyz.r2turntrue.chzzk4j.types.channel.live.ChzzkLiveDetail
import xyz.r2turntrue.chzzk4j.types.channel.live.Resolution
import java.time.Instant import java.time.Instant
import kotlin.jvm.optionals.getOrNull
val dotenv = dotenv { val dotenv = dotenv {
ignoreIfMissing = true ignoreIfMissing = true
@@ -33,20 +34,26 @@ class Discord: ListenerAdapter() {
return bot.getGuildById(guildId)?.getTextChannelById(channelId) return bot.getGuildById(guildId)?.getTextChannelById(channelId)
} }
fun sendDiscord(user: User, status: IData<IStreamInfo?>) { fun sendDiscord(user: User, status: ChzzkLiveDetail) {
if(status.content == null) return
if(user.liveAlertMessage != null && user.liveAlertGuild != null && user.liveAlertChannel != null) { if(user.liveAlertMessage != null && user.liveAlertGuild != null && user.liveAlertChannel != null) {
val channel = getChannel(user.liveAlertGuild ?: 0, user.liveAlertChannel ?: 0) val channel = getChannel(user.liveAlertGuild ?: 0, user.liveAlertChannel ?: 0)
?: throw RuntimeException("${user.liveAlertChannel} is not valid.") ?: throw RuntimeException("${user.liveAlertChannel} is not valid.")
val embed = EmbedBuilder() val embed = EmbedBuilder()
embed.setTitle(status.content!!.liveTitle, "https://chzzk.naver.com/live/${user.token}") embed.setTitle(status.title, "https://chzzk.naver.com/live/${user.token}")
embed.setDescription("${user.username} 님이 방송을 시작했습니다.") embed.setDescription("${user.username} 님이 방송을 시작했습니다.")
embed.setTimestamp(Instant.now()) embed.setTimestamp(Instant.now())
embed.setAuthor(user.username, "https://chzzk.naver.com/live/${user.token}", status.content!!.channel.channelImageUrl) embed.setAuthor(user.username, "https://chzzk.naver.com/live/${user.token}")
embed.addField("카테고리", status.content!!.liveCategoryValue, true) embed.addField("카테고리", status.liveCategoryValue, true)
embed.addField("태그", status.content!!.tags.joinToString(", "), true) embed.addField("태그", status.tags.joinToString(", ") { it.trim() }, true)
embed.setImage(status.content!!.liveImageUrl.replace("{type}", "1080")) status.defaultThumbnailImageUrl.getOrNull()?.let { embed.setImage(it) }
?: Resolution.entries.reversed().forEach {
val thumbnail = status.getLiveImageUrl(it)
if (thumbnail != null) {
embed.setImage(thumbnail)
return@forEach
}
}
channel.sendMessage( channel.sendMessage(
MessageCreateBuilder() MessageCreateBuilder()

View File

@@ -0,0 +1,46 @@
package space.mori.chzzk_bot.chatbot.utils
import com.google.gson.reflect.TypeToken
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import space.mori.chzzk_bot.chatbot.chzzk.dotenv
import space.mori.chzzk_bot.common.utils.IData
import space.mori.chzzk_bot.common.utils.client
import xyz.r2turntrue.chzzk4j.ChzzkClient
import java.io.IOException
data class RefreshTokenResponse(
val accessToken: String,
val refreshToken: String,
val expiresIn: Int,
val tokenType: String = "Bearer",
val scope: String
)
fun ChzzkClient.refreshAccessToken(refreshToken: String): Pair<String, String> {
val url = "https://openapi.chzzk.naver.com/auth/v1/token"
val request = Request.Builder()
.url(url)
.header("Content-Type", "application/json")
.post(gson.toJson(mapOf(
"grantType" to "refresh_token",
"refreshToken" to refreshToken,
"clientId" to dotenv["NAVER_CLIENT_ID"],
"clientSecret" to dotenv["NAVER_CLIENT_SECRET"]
)).toRequestBody("application/json; charset=utf-8".toMediaType()))
.build()
client.newCall(request).execute().use { response ->
try {
if(!response.isSuccessful) throw IOException("Unexpected code ${response.code}")
val body = response.body?.string()
val data = gson.fromJson(body, object: TypeToken<IData<RefreshTokenResponse>>() {})
return Pair(data.content.accessToken, data.content.refreshToken)
} catch(e: Exception) {
throw e
}
}
}

View File

@@ -23,7 +23,7 @@ dependencies {
api("com.zaxxer:HikariCP:6.1.0") api("com.zaxxer:HikariCP:6.1.0")
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic // https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
implementation("ch.qos.logback:logback-classic:1.5.12") implementation("ch.qos.logback:logback-classic:1.5.13")
// https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client // https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client
implementation("org.mariadb.jdbc:mariadb-java-client:3.5.0") implementation("org.mariadb.jdbc:mariadb-java-client:3.5.0")
@@ -37,6 +37,11 @@ dependencies {
// https://mvnrepository.com/artifact/com.google.code.gson/gson // https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation("com.google.code.gson:gson:2.11.0") implementation("com.google.code.gson:gson:2.11.0")
api("io.micrometer:micrometer-registry-prometheus:1.15.1")
// https://mvnrepository.com/artifact/io.insert-koin/koin-core
api("io.insert-koin:koin-core:4.0.0")
testImplementation(kotlin("test")) testImplementation(kotlin("test"))
} }

View File

@@ -0,0 +1,7 @@
package space.mori.chzzk_bot.common.events
data class ChzzkUserFindEvent(
val uid: String
): Event {
val TAG = javaClass.simpleName
}

View File

@@ -0,0 +1,11 @@
package space.mori.chzzk_bot.common.events
data class ChzzkUserReceiveEvent(
val find: Boolean = true,
val uid: String? = null,
val nickname: String? = null,
val isStreamOn: Boolean? = null,
val avatarUrl: String? = null,
): Event {
val TAG = javaClass.simpleName
}

View File

@@ -5,7 +5,8 @@ enum class TimerType(var value: Int) {
TIMER(1), TIMER(1),
REMOVE(2), REMOVE(2),
STREAM_OFF(50) STREAM_OFF(50),
ACK(51)
} }
class TimerEvent( class TimerEvent(

View File

@@ -0,0 +1,43 @@
package space.mori.chzzk_bot.common.metrics
import io.micrometer.core.instrument.Gauge
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import space.mori.chzzk_bot.common.services.UserService
object Metrics {
val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
var streamer = 0.0
val streamerGauge: Gauge = Gauge.builder("streamer_gauge", this) { streamer }
.description("Current All Streamer Count")
.register(registry)
var activeStreamer = 0.0
val activateGauge: Gauge = Gauge.builder("active_streamer_gauge", this) { activeStreamer }
.description("Current Active Streamer Count")
.register(registry)
var streaming: Double = 0.0
val streamingGauge: Gauge = Gauge.builder("streaming_gauge", this) { streaming }
.description("Current Streaming Streamer Count")
.register(registry)
fun refreshStreamerMetrics() {
val streamers = UserService.getAllUsers()
streamer = streamers.size.toDouble()
activeStreamer = streamers.filter { !it.isDisabled }.size.toDouble()
}
fun increaseStreaming(inc: Int = 1) {
streaming += inc
}
fun decreaseStreaming(dec: Int = 1) {
streaming -= dec
}
init {
refreshStreamerMetrics()
}
}

View File

@@ -52,47 +52,18 @@ data class NicknameColor(
val colorCode: String = "" val colorCode: String = ""
) )
// Stream info data class LiveStatus(
data class IStreamInfo( val liveTitle: String,
val liveId: Int = 0, val status: String,
val liveTitle: String = "", val concurrentUserCount: Int,
val status: String = "", val accumulateCount: Int,
val liveImageUrl: String = "", val paidPromotion: Boolean,
val defaultThumbnailImageUrl: String? = null, val adult: Boolean,
val concurrentUserCount: Int = 0, val krOnlyViewing: Boolean,
val accumulateCount: Int = 0, val openDate: String,
val openDate: String = "", val closeDate: String?,
val closeDate: String = "", val clipActive: Boolean,
val adult: Boolean = false, val chatChannelId: String
val clipActive: Boolean = false,
val tags: List<String> = emptyList(),
val chatChannelId: String = "",
val categoryType: String = "",
val liveCategory: String = "",
val liveCategoryValue: String = "",
val chatActive: Boolean = true,
val chatAvailableGroup: String = "",
val paidPromotion: Boolean = false,
val chatAvailableCondition: String = "",
val minFollowerMinute: Int = 0,
val livePlaybackJson: String = "",
val p2pQuality: List<Any> = emptyList(),
val channel: Channel = Channel(),
val livePollingStatusJson: String = "",
val userAdultStatus: String? = null,
val chatDonationRankingExposure: Boolean = true,
val adParameter: AdParameter = AdParameter()
)
data class Channel(
val channelId: String = "",
val channelName: String = "",
val channelImageUrl: String = "",
val verifiedMark: Boolean = false
)
data class AdParameter(
val tag: String = ""
) )
// OkHttpClient에 Interceptor 추가 // OkHttpClient에 Interceptor 추가
@@ -128,38 +99,21 @@ fun getFollowDate(chatID: String, userId: String) : IData<IFollowContent?> {
} }
} }
fun getStreamInfo(userId: String) : IData<IStreamInfo?> { fun getChzzkChannelId(channelId: String): String? {
val url = "https://api.chzzk.naver.com/service/v3/channels/${userId}/live-detail" val url = "https://api.chzzk.naver.com/polling/v3/channels/$channelId/live-status?includePlayerRecommendContent=false"
val request = Request.Builder() val request = Request.Builder()
.url(url) .url(url)
.header("Content-Type", "application/json")
.get()
.build() .build()
client.newCall(request).execute().use { response -> client.newCall(request).execute().use { response ->
try { try {
if(!response.isSuccessful) throw IOException("Unexpected code ${response.code}") if(!response.isSuccessful) throw IOException("Unexpected code ${response.code}")
val body = response.body?.string() val body = response.body?.string()
val follow = gson.fromJson(body, object: TypeToken<IData<IStreamInfo?>>() {}) val data = gson.fromJson(body, object: TypeToken<IData<LiveStatus?>>() {})
return follow return data.content?.chatChannelId
} catch(e: Exception) {
throw e
}
}
}
fun getUserInfo(userId: String): IData<Channel?> {
val url = "https://api.chzzk.naver.com/service/v1/channels/${userId}"
val request = Request.Builder()
.url(url)
.build()
client.newCall(request).execute().use { response ->
try {
if(!response.isSuccessful) throw IOException("Unexpected code ${response.code}")
val body = response.body?.string()
val channel = gson.fromJson(body, object: TypeToken<IData<Channel?>>() {})
return channel
} catch(e: Exception) { } catch(e: Exception) {
throw e throw e
} }

View File

@@ -12,6 +12,7 @@ import space.mori.chzzk_bot.chatbot.discord.Discord
import space.mori.chzzk_bot.chatbot.chzzk.Connector as ChzzkConnector import space.mori.chzzk_bot.chatbot.chzzk.Connector as ChzzkConnector
import space.mori.chzzk_bot.common.Connector import space.mori.chzzk_bot.common.Connector
import space.mori.chzzk_bot.common.events.CoroutinesEventBus import space.mori.chzzk_bot.common.events.CoroutinesEventBus
import space.mori.chzzk_bot.common.metrics.Metrics
import space.mori.chzzk_bot.webserver.start import space.mori.chzzk_bot.webserver.start
import space.mori.chzzk_bot.webserver.stop import space.mori.chzzk_bot.webserver.stop
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@@ -25,6 +26,7 @@ val logger: Logger = LoggerFactory.getLogger("main")
fun main(args: Array<String>) { fun main(args: Array<String>) {
val dispatcher = module { val dispatcher = module {
single { CoroutinesEventBus() } single { CoroutinesEventBus() }
single { Metrics.registry }
} }
startKoin { startKoin {
modules(dispatcher) modules(dispatcher)

View File

@@ -1,5 +1,5 @@
<configuration> <configuration>
<property name="lokiUrl" value="${LOKI_ENDPOINT}"/>
<!-- 콘솔에 출력하는 기본 로그 설정 --> <!-- 콘솔에 출력하는 기본 로그 설정 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
@@ -10,6 +10,15 @@
</encoder> </encoder>
</appender> </appender>
<appender name="LOKI" class="com.github.loki4j.logback.Loki4jAppender">
<http>
<url>${lokiUrl}</url>
</http>
<labels>
app = chibot-chzzk-bot
</labels>
</appender>
<!-- HikariCP 로그 레벨 설정 --> <!-- HikariCP 로그 레벨 설정 -->
<logger name="com.zaxxer.hikari" level="INFO" /> <logger name="com.zaxxer.hikari" level="INFO" />
<logger name="o.m.jdbc.client.impl.StandardClient" level="INFO" /> <logger name="o.m.jdbc.client.impl.StandardClient" level="INFO" />
@@ -31,5 +40,8 @@
<root level="DEBUG"> <root level="DEBUG">
<appender-ref ref="STDOUT" /> <appender-ref ref="STDOUT" />
</root> </root>
<root level="INFO">
<appender-ref ref="LOKI" />
</root>
</configuration> </configuration>

View File

@@ -10,7 +10,7 @@ repositories {
mavenCentral() mavenCentral()
} }
val ktorVersion = "3.0.1" val ktorVersion = "3.1.3"
dependencies { dependencies {
implementation("io.ktor:ktor-server-core:$ktorVersion") implementation("io.ktor:ktor-server-core:$ktorVersion")
@@ -34,16 +34,16 @@ dependencies {
// https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect // https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect
implementation("org.jetbrains.kotlin:kotlin-reflect:2.0.21") implementation("org.jetbrains.kotlin:kotlin-reflect:2.0.21")
// https://mvnrepository.com/artifact/io.insert-koin/koin-core
implementation("io.insert-koin:koin-core:4.0.0")
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic // https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
implementation("ch.qos.logback:logback-classic:1.5.12") implementation("ch.qos.logback:logback-classic:1.5.12")
// https://mvnrepository.com/artifact/io.github.cdimascio/dotenv-kotlin // https://mvnrepository.com/artifact/io.github.cdimascio/dotenv-kotlin
implementation("io.github.cdimascio:dotenv-kotlin:6.4.2") implementation("io.github.cdimascio:dotenv-kotlin:6.4.2")
implementation("io.ktor:ktor-server-metrics-micrometer:$ktorVersion")
implementation(project(":common")) implementation(project(":common"))
implementation("io.ktor:ktor-server-metrics-micrometer:3.1.3")
testImplementation(kotlin("test")) testImplementation(kotlin("test"))
} }

View File

@@ -11,6 +11,7 @@ import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.* import io.ktor.server.application.*
import io.ktor.server.auth.* import io.ktor.server.auth.*
import io.ktor.server.engine.* import io.ktor.server.engine.*
import io.ktor.server.metrics.micrometer.MicrometerMetrics
import io.ktor.server.netty.* import io.ktor.server.netty.*
import io.ktor.server.plugins.contentnegotiation.* import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.plugins.cors.routing.* import io.ktor.server.plugins.cors.routing.*
@@ -19,16 +20,26 @@ import io.ktor.server.response.*
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.sessions.* import io.ktor.server.sessions.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
import io.micrometer.prometheusmetrics.PrometheusConfig
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import org.koin.core.context.startKoin
import org.koin.dsl.module
import org.koin.java.KoinJavaComponent.inject
import space.mori.chzzk_bot.common.events.CoroutinesEventBus
import space.mori.chzzk_bot.common.events.UserRegisterEvent
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.webserver.routes.* import space.mori.chzzk_bot.webserver.routes.*
import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits import space.mori.chzzk_bot.webserver.utils.DiscordRatelimits
import wsSongListRoutes
import java.math.BigInteger import java.math.BigInteger
import java.security.SecureRandom import java.security.SecureRandom
import java.time.Duration import java.time.Duration
import kotlin.getValue
import kotlin.time.toKotlinDuration import kotlin.time.toKotlinDuration
val dotenv = dotenv { val dotenv = dotenv {
@@ -82,6 +93,8 @@ val server = embeddedServer(Netty, port = 8080, ) {
} }
} }
routing { routing {
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
route("/auth") { route("/auth") {
// discord login // discord login
authenticate("auth-oauth-discord") { authenticate("auth-oauth-discord") {
@@ -176,7 +189,7 @@ val server = embeddedServer(Netty, port = 8080, ) {
clientSecret = dotenv["NAVER_CLIENT_SECRET"] clientSecret = dotenv["NAVER_CLIENT_SECRET"]
) )
val response = applicationHttpClient.post("https://chzzk.naver.com/auth/v1/token") { val response = applicationHttpClient.post("https://openapi.chzzk.naver.com/auth/v1/token") {
contentType(ContentType.Application.Json) contentType(ContentType.Application.Json)
setBody(tokenRequest) setBody(tokenRequest)
} }
@@ -192,7 +205,12 @@ val server = embeddedServer(Netty, port = 8080, ) {
val userInfo = getChzzkUser(tokenResponse.content.accessToken) val userInfo = getChzzkUser(tokenResponse.content.accessToken)
if(userInfo.content != null) { if(userInfo.content != null) {
val user = UserService.getUser(userInfo.content.channelId) var user = UserService.getUser(userInfo.content.channelId)
if(user == null) {
user = UserService.saveUser(userInfo.content.channelName , userInfo.content.channelId)
}
call.sessions.set( call.sessions.set(
UserSession( UserSession(
session.state, session.state,
@@ -200,7 +218,13 @@ val server = embeddedServer(Netty, port = 8080, ) {
listOf() listOf()
) )
) )
user?.let { UserService.setRefreshToken(it, tokenResponse.content.accessToken, tokenResponse.content.refreshToken ?: "") } UserService.setRefreshToken(user,
tokenResponse.content.accessToken,
tokenResponse.content.refreshToken ?: ""
)
dispatcher.post(UserRegisterEvent(user.token))
call.respondRedirect(getFrontendURL("")) call.respondRedirect(getFrontendURL(""))
} }
} catch (e: Exception) { } catch (e: Exception) {
@@ -212,11 +236,22 @@ val server = embeddedServer(Netty, port = 8080, ) {
// common: logout // common: logout
get("/logout") { get("/logout") {
call.sessions.clear<UserSession>() call.sessions.clear<UserSession>()
call.response.status(HttpStatusCode.OK) call.respondRedirect(getFrontendURL(""))
return@get
} }
} }
val appMicrometerRegistry: PrometheusMeterRegistry by inject(PrometheusMeterRegistry::class.java)
install(MicrometerMetrics) {
registry = appMicrometerRegistry
meterBinders = listOf(
JvmMemoryMetrics(),
JvmGcMetrics(),
ProcessorMetrics()
)
}
apiRoutes() apiRoutes()
apiSongRoutes() apiSongRoutes()
apiCommandRoutes() apiCommandRoutes()
@@ -227,6 +262,8 @@ val server = embeddedServer(Netty, port = 8080, ) {
wsSongRoutes() wsSongRoutes()
wsSongListRoutes() wsSongListRoutes()
metricRoutes()
swaggerUI("swagger-ui/index.html", "openapi/documentation.yaml") { swaggerUI("swagger-ui/index.html", "openapi/documentation.yaml") {
options { options {
version = "1.2.0" version = "1.2.0"

View File

@@ -1,6 +1,7 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.http.* import io.ktor.http.*
import io.ktor.server.request.receive
import io.ktor.server.response.* import io.ktor.server.response.*
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.sessions.* import io.ktor.server.sessions.*
@@ -11,7 +12,11 @@ import space.mori.chzzk_bot.common.events.CoroutinesEventBus
import space.mori.chzzk_bot.common.services.SongConfigService import space.mori.chzzk_bot.common.services.SongConfigService
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.webserver.UserSession import space.mori.chzzk_bot.webserver.UserSession
import space.mori.chzzk_bot.webserver.utils.ChzzkUserCache import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.withTimeoutOrNull
import space.mori.chzzk_bot.common.events.ChzzkUserFindEvent
import space.mori.chzzk_bot.common.events.ChzzkUserReceiveEvent
import space.mori.chzzk_bot.common.metrics.Metrics
@Serializable @Serializable
data class GetUserDTO( data class GetUserDTO(
@@ -36,6 +41,20 @@ data class GetSessionDTO(
fun Routing.apiRoutes() { fun Routing.apiRoutes() {
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
suspend fun getChzzkUserWithId(uid: String): ChzzkUserReceiveEvent? {
val completableDeferred = CompletableDeferred<ChzzkUserReceiveEvent>()
dispatcher.subscribe(ChzzkUserReceiveEvent::class) { event ->
if (event.uid == uid) {
completableDeferred.complete(event)
}
}
val user = withTimeoutOrNull(5000) {
dispatcher.post(ChzzkUserFindEvent(uid))
completableDeferred.await()
}
return user
}
route("/") { route("/") {
get { get {
call.respondText("Hello World!", status = call.respondText("Hello World!", status =
@@ -50,21 +69,21 @@ fun Routing.apiRoutes() {
route("/user/{uid}") { route("/user/{uid}") {
get { get {
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
if(uid == null) { if(uid == null) {
call.respondText("Require UID", status = HttpStatusCode.NotFound) call.respondText("Require UID", status = HttpStatusCode.NotFound)
return@get return@get
} }
val user = ChzzkUserCache.getCachedUser(uid) val user = getChzzkUserWithId(uid)
if(user?.content == null) { if (user?.find == false) {
call.respondText("User not found", status = HttpStatusCode.NotFound) call.respondText("User not found", status = HttpStatusCode.NotFound)
return@get return@get
} else { } else {
call.respond(HttpStatusCode.OK, GetUserDTO( call.respond(HttpStatusCode.OK, GetUserDTO(
user.content!!.channel.channelId, user?.uid ?: "",
user.content!!.channel.channelName, user?.nickname ?: "",
user.content!!.status == "OPEN", user?.isStreamOn ?: false,
user.content!!.channel.channelImageUrl user?.avatarUrl ?: ""
)) ))
} }
} }
@@ -82,7 +101,7 @@ fun Routing.apiRoutes() {
user = UserService.saveUser("임시닉네임", session.id) user = UserService.saveUser("임시닉네임", session.id)
} }
val songConfig = SongConfigService.getConfig(user) val songConfig = SongConfigService.getConfig(user)
val status = ChzzkUserCache.getCachedUser(session.id) val status = getChzzkUserWithId(user.token)
val returnUsers = mutableListOf<GetSessionDTO>() val returnUsers = mutableListOf<GetSessionDTO>()
if(status == null) { if(status == null) {
@@ -91,14 +110,14 @@ fun Routing.apiRoutes() {
} }
if (user.username == "임시닉네임") { if (user.username == "임시닉네임") {
status.content?.channel?.let { it1 -> UserService.updateUser(user, it1.channelId, it1.channelName) } status.let { stats -> UserService.updateUser(user, stats.uid ?: "", stats.nickname ?: "") }
} }
returnUsers.add(GetSessionDTO( returnUsers.add(GetSessionDTO(
status.content?.channel?.channelId ?: user.username, status.uid ?: user.token,
status.content?.channel?.channelName ?: user.token, status.nickname ?: user.username,
status.content?.status == "OPEN", status.isStreamOn == true,
status.content?.channel?.channelImageUrl ?: "", status.avatarUrl ?: "",
songConfig.queueLimit, songConfig.queueLimit,
songConfig.personalLimit, songConfig.personalLimit,
songConfig.streamerOnly, songConfig.streamerOnly,
@@ -109,15 +128,15 @@ fun Routing.apiRoutes() {
user.subordinates.toList() user.subordinates.toList()
} }
returnUsers.addAll(subordinates.map { returnUsers.addAll(subordinates.map {
val subStatus = ChzzkUserCache.getCachedUser(it.token) val subStatus = getChzzkUserWithId(it.token)
return@map if (subStatus?.content == null) { return@map if (subStatus == null) {
null null
} else { } else {
GetSessionDTO( GetSessionDTO(
subStatus.content!!.channel.channelId, subStatus.uid ?: "",
subStatus.content!!.channel.channelName, subStatus.nickname ?: "",
subStatus.content!!.status == "OPEN", subStatus.isStreamOn == true,
subStatus.content!!.channel.channelImageUrl, subStatus.avatarUrl ?: "",
0, 0,
0, 0,
false, false,
@@ -129,4 +148,50 @@ fun Routing.apiRoutes() {
call.respond(HttpStatusCode.OK, returnUsers) call.respond(HttpStatusCode.OK, returnUsers)
} }
} }
route("/settings") {
get {
val session = call.sessions.get<UserSession>()
if(session == null) {
call.respondText("No session found", status = HttpStatusCode.Unauthorized)
return@get
}
val user = UserService.getUser(session.id)
if(user == null) {
call.respondText("No user found", status = HttpStatusCode.NotFound)
return@get
}
call.respond(HttpStatusCode.OK, IUserSettingsDTO(
user.isDisabled,
user.isDisableStartupMsg
))
}
post {
val session = call.sessions.get<UserSession>()
val body: IUserSettingsDTO = call.receive()
if(session == null) {
call.respondText("No session found", status = HttpStatusCode.Unauthorized)
return@post
}
val user = UserService.getUser(session.id)
if(user == null) {
call.respondText("No user found", status = HttpStatusCode.NotFound)
return@post
}
UserService.setIsDisabled(user, body.isBotDisabled)
UserService.setIsStartupDisabled(user, body.isBotMsgDisabled)
Metrics.refreshStreamerMetrics()
call.respond(HttpStatusCode.OK, body)
}
}
} }
data class IUserSettingsDTO(
val isBotDisabled: Boolean,
val isBotMsgDisabled: Boolean
)

View File

@@ -0,0 +1,36 @@
package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.response.respondText
import io.ktor.server.routing.Routing
import io.ktor.server.routing.get
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import org.koin.java.KoinJavaComponent.inject
import space.mori.chzzk_bot.common.events.CoroutinesEventBus
import space.mori.chzzk_bot.common.events.UserRegisterEvent
import space.mori.chzzk_bot.common.metrics.Metrics
import kotlin.getValue
val metricScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.metricRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
metricScope.cancel()
}
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val registry: PrometheusMeterRegistry by inject(PrometheusMeterRegistry::class.java)
dispatcher.subscribe(UserRegisterEvent::class) {
Metrics.refreshStreamerMetrics()
}
get("/metrics") {
call.respondText(registry.scrape())
}
}

View File

@@ -1,115 +1,54 @@
package space.mori.chzzk_bot.webserver.routes
import io.ktor.client.plugins.websocket.WebSocketException
import io.ktor.server.application.*
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.sessions.* import io.ktor.server.sessions.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.util.logging.Logger
import io.ktor.websocket.* import io.ktor.websocket.*
import io.ktor.websocket.Frame.* import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.launch import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
import space.mori.chzzk_bot.common.models.SongList import space.mori.chzzk_bot.common.models.SongList
import space.mori.chzzk_bot.common.models.SongLists.uid
import space.mori.chzzk_bot.common.models.User import space.mori.chzzk_bot.common.models.User
import space.mori.chzzk_bot.common.services.SongConfigService
import space.mori.chzzk_bot.common.services.SongListService import space.mori.chzzk_bot.common.services.SongListService
import space.mori.chzzk_bot.common.services.UserService import space.mori.chzzk_bot.common.services.UserService
import space.mori.chzzk_bot.common.utils.YoutubeVideo import space.mori.chzzk_bot.common.utils.YoutubeVideo
import space.mori.chzzk_bot.common.utils.getYoutubeVideo import space.mori.chzzk_bot.common.utils.getYoutubeVideo
import space.mori.chzzk_bot.webserver.UserSession import space.mori.chzzk_bot.webserver.UserSession
import space.mori.chzzk_bot.webserver.routes.SongResponse
import space.mori.chzzk_bot.webserver.routes.toSerializable
import space.mori.chzzk_bot.webserver.utils.CurrentSong import space.mori.chzzk_bot.webserver.utils.CurrentSong
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
fun Routing.wsSongListRoutes() { fun Routing.wsSongListRoutes() {
val sessions = ConcurrentHashMap<String, WebSocketServerSession>()
val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongListRoutes") val logger = LoggerFactory.getLogger("WSSongListRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java) val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val songListScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun addSession(uid: String, session: WebSocketServerSession) { // Manage all active sessions
if (sessions[uid] != null) { val sessionHandlers = ConcurrentHashMap<String, SessionHandler>()
CoroutineScope(Dispatchers.Default).launch {
sessions[uid]?.close( // Handle application shutdown
CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Duplicated sessions.") environment.monitor.subscribe(ApplicationStopped) {
) sessionHandlers.values.forEach {
songListScope.launch {
it.close(CloseReason(CloseReason.Codes.NORMAL, "Server shutting down"))
} }
} }
sessions[uid] = session
}
fun removeSession(uid: String) {
sessions.remove(uid)
}
suspend fun waitForAck(ws: WebSocketServerSession, expectedType: Int): Boolean {
val timeout = 5000L // 5 seconds timeout
val startTime = System.currentTimeMillis()
while (System.currentTimeMillis() - startTime < timeout) {
for (frame in ws.incoming) {
if (frame is Text) {
val message = frame.readText()
if(message == "ping") {
return true
}
val data = Json.decodeFromString<SongRequest>(message)
if (data.type == SongType.ACK.value) {
return true // ACK received
}
}
}
delay(100) // Check every 100 ms
}
return false // Timeout
}
suspend fun sendWithRetry(uid: String, res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0
var sentSuccessfully = false
while (attempt < maxRetries && !sentSuccessfully) {
val ws = sessions[uid]
try {
if(ws == null) {
delay(delayMillis)
continue
}
// Attempt to send the message
ws.sendSerialized(res)
logger.debug("Message sent successfully to $uid on attempt $attempt")
// Wait for ACK
val ackReceived = waitForAck(ws, res.type)
if (ackReceived == true) {
sentSuccessfully = true
} else {
logger.warn("ACK not received for message to $uid on attempt $attempt.")
}
} catch (e: Exception) {
attempt++
logger.warn("Failed to send message to $uid on attempt $attempt. Retrying in $delayMillis ms.")
logger.warn(e.stackTraceToString())
} finally {
// Wait before retrying
delay(delayMillis)
}
}
if (!sentSuccessfully) {
logger.error("Failed to send message to $uid after $maxRetries attempts.")
}
} }
// WebSocket endpoint
webSocket("/songlist") { webSocket("/songlist") {
val session = call.sessions.get<UserSession>() val session = call.sessions.get<UserSession>()
val user = session?.id?.let { UserService.getUser(it) } val user: User? = session?.id?.let { UserService.getUser(it) }
if (user == null) { if (user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid SID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid SID"))
return@webSocket return@webSocket
@@ -117,175 +56,291 @@ fun Routing.wsSongListRoutes() {
val uid = user.token val uid = user.token
addSession(uid, this) // Ensure only one session per user
sessionHandlers[uid]?.close(CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Another session is already active."))
if (status[uid] == SongType.STREAM_OFF) { val handler = SessionHandler(uid, this, dispatcher, logger)
CoroutineScope(Dispatchers.Default).launch { sessionHandlers[uid] = handler
sendSerialized(SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
))
}
removeSession(uid)
}
// Initialize session
handler.initialize()
// Listen for incoming frames
try { try {
for (frame in incoming) { for (frame in incoming) {
when (frame) { when (frame) {
is Text -> { is Frame.Text -> handler.handleTextFrame(frame.readText())
if (frame.readText().trim() == "ping") { is Frame.Ping -> send(Frame.Pong(frame.data))
send("pong") else -> Unit
} else {
val data = frame.readText().let { Json.decodeFromString<SongRequest>(it) }
// Handle song requests
handleSongRequest(data, user, dispatcher, logger)
}
}
is Ping -> send(Pong(frame.data))
else -> ""
} }
} }
} catch (e: ClosedReceiveChannelException) { } catch (e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.info("Session closed: ${e.message}")
} catch (e: IOException) {
logger.error("IO error: ${e.message}")
} catch (e: Exception) {
logger.error("Unexpected error: ${e.message}")
} finally { } finally {
removeSession(uid) sessionHandlers.remove(uid)
handler.close(CloseReason(CloseReason.Codes.NORMAL, "Session ended"))
} }
} }
dispatcher.subscribe(SongEvent::class) { // Subscribe to SongEvents
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) dispatcher.subscribe(SongEvent::class) { event ->
CoroutineScope(Dispatchers.Default).launch { val handler = sessionHandlers[event.uid]
val user = UserService.getUser(it.uid) songListScope.launch {
if (user != null) { handler?.sendSongResponse(event)
sendWithRetry(
user.token, SongResponse(
it.type.value,
it.uid,
it.reqUid,
it.current?.toSerializable(),
it.next?.toSerializable(),
it.delUrl
)
)
}
} }
} }
dispatcher.subscribe(TimerEvent::class) { // Subscribe to TimerEvents
if (it.type == TimerType.STREAM_OFF) { dispatcher.subscribe(TimerEvent::class) { event ->
CoroutineScope(Dispatchers.Default).launch { if (event.type == TimerType.STREAM_OFF) {
val user = UserService.getUser(it.uid) val handler = sessionHandlers[event.uid]
if (user != null) { songListScope.launch {
sendWithRetry( handler?.sendTimerOff()
user.token, SongResponse(
it.type.value,
it.uid,
null,
null,
null,
)
)
}
} }
} }
} }
} }
suspend fun handleSongRequest( class SessionHandler(
data: SongRequest, private val uid: String,
user: User, private val session: WebSocketServerSession,
dispatcher: CoroutinesEventBus, private val dispatcher: CoroutinesEventBus,
logger: Logger private val logger: Logger
) { ) {
if (data.maxQueue != null && data.maxQueue > 0) SongConfigService.updateQueueLimit(user, data.maxQueue) private val ackMap = ConcurrentHashMap<String, CompletableDeferred<Boolean>>()
if (data.maxUserLimit != null && data.maxUserLimit > 0) SongConfigService.updatePersonalLimit(user, data.maxUserLimit) private val sessionMutex = Mutex()
if (data.isStreamerOnly != null) SongConfigService.updateStreamerOnly(user, data.isStreamerOnly) private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
if (data.isDisabled != null) SongConfigService.updateDisabled(user, data.isDisabled)
when (data.type) { suspend fun initialize() {
SongType.ADD.value -> { // Send initial status if needed,
data.url?.let { url -> // For example, send STREAM_OFF if applicable
try { // This can be extended based on your requirements
val youtubeVideo = getYoutubeVideo(url) }
if (youtubeVideo != null) {
CoroutineScope(Dispatchers.Default).launch { suspend fun handleTextFrame(text: String) {
SongListService.saveSong( if (text.trim() == "ping") {
user, session.send("pong")
user.token, return
url, }
youtubeVideo.name,
youtubeVideo.author, val data = try {
youtubeVideo.length, Json.decodeFromString<SongRequest>(text)
user.username } catch (e: Exception) {
) logger.warn("Failed to decode SongRequest: ${e.message}")
dispatcher.post( return
SongEvent( }
user.token,
SongType.ADD, when (data.type) {
user.token, SongType.ACK.value -> handleAck(data.uid)
CurrentSong.getSong(user), else -> handleSongRequest(data)
youtubeVideo }
) }
)
} private fun handleAck(requestUid: String) {
} ackMap[requestUid]?.complete(true)
} catch (e: Exception) { ackMap.remove(requestUid)
logger.debug("SongType.ADD Error: $uid $e") }
private fun handleSongRequest(data: SongRequest) {
scope.launch {
SongRequestProcessor.process(data, uid, dispatcher, this@SessionHandler, logger)
}
}
suspend fun sendSongResponse(event: SongEvent) {
val response = SongResponse(
type = event.type.value,
uid = event.uid,
reqUid = event.reqUid,
current = event.current?.toSerializable(),
next = event.next?.toSerializable(),
delUrl = event.delUrl
)
sendWithRetry(response)
}
suspend fun sendTimerOff() {
val response = SongResponse(
type = TimerType.STREAM_OFF.value,
uid = uid,
reqUid = null,
current = null,
next = null,
delUrl = null
)
sendWithRetry(response)
}
private suspend fun sendWithRetry(res: SongResponse, maxRetries: Int = 5, delayMillis: Long = 3000L) {
var attempt = 0
while (attempt < maxRetries) {
try {
session.sendSerialized(res)
val ackDeferred = CompletableDeferred<Boolean>()
ackMap[res.uid] = ackDeferred
val ackReceived = withTimeoutOrNull(5000L) { ackDeferred.await() } ?: false
if (ackReceived) {
logger.debug("ACK received for message to $uid on attempt $attempt.")
return
} else {
logger.warn("ACK not received for message to $uid on attempt $attempt.")
} }
} catch (e: IOException) {
logger.warn("Failed to send message to $uid on attempt $attempt: ${e.message}")
if (e is WebSocketException) {
close(CloseReason(CloseReason.Codes.PROTOCOL_ERROR, "WebSocket error"))
return
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
logger.warn("Unexpected error while sending message to $uid on attempt $attempt: ${e.message}")
}
attempt++
delay(delayMillis)
}
logger.error("Failed to send message to $uid after $maxRetries attempts.")
}
suspend fun close(reason: CloseReason) {
try {
session.close(reason)
} catch (e: Exception) {
logger.warn("Error closing session: ${e.message}")
}
}
}
object SongRequestProcessor {
private val songMutex = Mutex()
suspend fun process(
data: SongRequest,
uid: String,
dispatcher: CoroutinesEventBus,
handler: SessionHandler,
logger: Logger
) {
val user = UserService.getUser(uid) ?: return
when (data.type) {
SongType.ADD.value -> handleAdd(data, user, dispatcher, handler, logger)
SongType.REMOVE.value -> handleRemove(data, user, dispatcher, logger)
SongType.NEXT.value -> handleNext(user, dispatcher, logger)
else -> {
// Handle other types if necessary
} }
} }
SongType.REMOVE.value -> { }
data.url?.let { url ->
val songs = SongListService.getSong(user) private suspend fun handleAdd(
val exactSong = songs.firstOrNull { it.url == url } data: SongRequest,
if (exactSong != null) { user: User,
SongListService.deleteSong(user, exactSong.uid, exactSong.name) dispatcher: CoroutinesEventBus,
} handler: SessionHandler,
dispatcher.post( logger: Logger
SongEvent( ) {
user.token, val url = data.url ?: return
SongType.REMOVE, val youtubeVideo = getYoutubeVideo(url) ?: run {
null, logger.warn("Failed to fetch YouTube video for URL: $url")
null, return
null, }
url
) songMutex.withLock {
) SongListService.saveSong(
user,
user.token,
url,
youtubeVideo.name,
youtubeVideo.author,
youtubeVideo.length,
user.username
)
}
dispatcher.post(
SongEvent(
uid = user.token,
type = SongType.ADD,
reqUid = user.token,
current = CurrentSong.getSong(user),
next = youtubeVideo
)
)
}
private suspend fun handleRemove(
data: SongRequest,
user: User,
dispatcher: CoroutinesEventBus,
logger: Logger
) {
val url = data.url ?: return
songMutex.withLock {
val songs = SongListService.getSong(user)
val exactSong = songs.firstOrNull { it.url == url }
if (exactSong != null) {
SongListService.deleteSong(user, exactSong.uid, exactSong.name)
} }
} }
SongType.NEXT.value -> {
dispatcher.post(
SongEvent(
uid = user.token,
type = SongType.REMOVE,
delUrl = url,
reqUid = null,
current = null,
next = null,
)
)
}
private suspend fun handleNext(
user: User,
dispatcher: CoroutinesEventBus,
logger: Logger
) {
var song: SongList? = null
var youtubeVideo: YoutubeVideo? = null
songMutex.withLock {
val songList = SongListService.getSong(user) val songList = SongListService.getSong(user)
var song: SongList? = null
var youtubeVideo: YoutubeVideo? = null
if (songList.isNotEmpty()) { if (songList.isNotEmpty()) {
song = songList[0] song = songList[0]
SongListService.deleteSong(user, song.uid, song.name) SongListService.deleteSong(user, song.uid, song.name)
} }
song?.let {
youtubeVideo = YoutubeVideo(
song.url,
song.name,
song.author,
song.time
)
}
dispatcher.post(
SongEvent(
user.token,
SongType.NEXT,
song?.uid,
youtubeVideo
)
)
CurrentSong.setSong(user, youtubeVideo)
} }
song?.let {
youtubeVideo = YoutubeVideo(
it.url,
it.name,
it.author,
it.time
)
}
dispatcher.post(
SongEvent(
uid = user.token,
type = SongType.NEXT,
current = null,
next = youtubeVideo,
reqUid = null,
delUrl = null
)
)
CurrentSong.setSong(user, youtubeVideo)
} }
} }
@@ -293,10 +348,10 @@ suspend fun handleSongRequest(
data class SongRequest( data class SongRequest(
val type: Int, val type: Int,
val uid: String, val uid: String,
val url: String?, val url: String? = null,
val maxQueue: Int?, val maxQueue: Int? = null,
val maxUserLimit: Int?, val maxUserLimit: Int? = null,
val isStreamerOnly: Boolean?, val isStreamerOnly: Boolean? = null,
val remove: Int?, val remove: Int? = null,
val isDisabled: Boolean?, val isDisabled: Boolean? = null
) )

View File

@@ -1,14 +1,20 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
@@ -17,10 +23,16 @@ import space.mori.chzzk_bot.common.utils.YoutubeVideo
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
val songScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.wsSongRoutes() { fun Routing.wsSongRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
songScope.cancel()
}
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val status = ConcurrentHashMap<String, SongType>() val status = ConcurrentHashMap<String, SongType>()
val logger = LoggerFactory.getLogger("WSSongRoutes") val logger = LoggerFactory.getLogger("WSSongRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val ackMap = ConcurrentHashMap<String, ConcurrentHashMap<WebSocketServerSession, CompletableDeferred<Boolean>>>()
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
@@ -28,7 +40,7 @@ fun Routing.wsSongRoutes() {
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session) sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) { if (sessions[uid]?.isEmpty() == true) {
sessions.remove(uid) sessions.remove(uid)
} }
} }
@@ -42,104 +54,141 @@ fun Routing.wsSongRoutes() {
var attempt = 0 var attempt = 0
while (attempt < maxRetries) { while (attempt < maxRetries) {
try { try {
session.sendSerialized(message) // 메시지 전송 시도 session.sendSerialized(message)
return true // 성공하면 true 반환 val ackDeferred = CompletableDeferred<Boolean>()
ackMap.computeIfAbsent(message.uid) { ConcurrentHashMap() }[session] = ackDeferred
val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false
if (ackReceived) {
ackMap[message.uid]?.remove(session)
return true
} else {
attempt++
logger.warn("ACK not received for message to ${message.uid} on attempt $attempt.")
}
} catch (e: Exception) { } catch (e: Exception) {
attempt++ attempt++
logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.") logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
e.printStackTrace() e.printStackTrace()
delay(delayMillis) // 재시도 전 대기 delay(delayMillis)
} }
} }
return false // 재시도 실패 시 false 반환 return false
} }
fun broadcastMessage(userId: String, message: SongResponse) { fun broadcastMessage(userId: String, message: SongResponse) {
val userSessions = sessions[userId] val userSessions = sessions[userId]
userSessions?.forEach { session -> userSessions?.forEach { session ->
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
val success = sendWithRetry(session, message) val success = sendWithRetry(session, message)
if (!success) { if (!success) {
println("Removing session for user $userId due to repeated failures.") logger.info("Removing session for user $userId due to repeated failures.")
userSessions.remove(session) // 실패 시 세션 제거 removeSession(userId, session)
} }
} }
} }
} }
webSocket("/song/{uid}") { webSocket("/song/{uid}") {
logger.info("WebSocket connection attempt received")
val uid = call.parameters["uid"] val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) } val user = uid?.let { UserService.getUser(it) }
if (uid == null) { if (uid == null || user == null) {
logger.warn("Invalid UID: $uid")
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket return@webSocket
} }
if (user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket
}
addSession(uid, this)
if(status[uid] == SongType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch {
sendSerialized(SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
))
}
}
try { try {
for (frame in incoming) { addSession(uid, this)
when(frame) { logger.info("WebSocket connection established for user: $uid")
is Frame.Text -> {
if(frame.readText().trim() == "ping") {
send("pong")
}
}
is Frame.Ping -> send(Frame.Pong(frame.data))
else -> {
// Start heartbeat
val heartbeatJob = songScope.launch {
while (true) {
try {
send(Frame.Ping(ByteArray(0)))
delay(30000) // 30 seconds
} catch (e: Exception) {
logger.error("Heartbeat failed for user $uid", e)
break
} }
} }
} }
} catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") if (status[uid] == SongType.STREAM_OFF) {
} finally { songScope.launch {
removeSession(uid, this) sendSerialized(
SongResponse(
SongType.STREAM_OFF.value,
uid,
null,
null,
null,
)
)
}
}
try {
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText().trim()
if (text == "ping") {
send("pong")
} else {
val data = Json.decodeFromString<SongRequest>(text)
if (data.type == SongType.ACK.value) {
ackMap[data.uid]?.get(this)?.complete(true)
ackMap[data.uid]?.remove(this)
}
}
}
is Frame.Ping -> send(Frame.Pong(frame.data))
else -> {}
}
}
} catch (e: ClosedReceiveChannelException) {
logger.error("WebSocket connection closed for user $uid: ${e.message}")
} catch (e: Exception) {
logger.error("Unexpected error in WebSocket for user $uid", e)
} finally {
logger.info("Cleaning up WebSocket connection for user $uid")
removeSession(uid, this)
ackMap[uid]?.remove(this)
heartbeatJob.cancel()
}
} catch(e: Exception) {
logger.error("Unexpected error in WebSocket for user $uid", e)
} }
} }
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
dispatcher.subscribe(SongEvent::class) { dispatcher.subscribe(SongEvent::class) {
logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name) logger.debug("SongEvent: {} / {} {}", it.uid, it.type, it.current?.name)
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(
it.type.value, it.uid, SongResponse(
it.uid, it.type.value,
it.reqUid, it.uid,
it.current?.toSerializable(), it.reqUid,
it.next?.toSerializable(), it.current?.toSerializable(),
it.delUrl it.next?.toSerializable(),
)) it.delUrl
)
)
} }
} }
dispatcher.subscribe(TimerEvent::class) { dispatcher.subscribe(TimerEvent::class) {
if(it.type == TimerType.STREAM_OFF) { if (it.type == TimerType.STREAM_OFF) {
CoroutineScope(Dispatchers.Default).launch { songScope.launch {
broadcastMessage(it.uid, SongResponse( broadcastMessage(
it.type.value, it.uid, SongResponse(
it.uid, it.type.value,
null, it.uid,
null, null,
null, null,
)) null,
)
)
} }
} }
} }
@@ -152,9 +201,7 @@ data class SerializableYoutubeVideo(
val author: String, val author: String,
val length: Int val length: Int
) )
fun YoutubeVideo.toSerializable() = SerializableYoutubeVideo(url, name, author, length) fun YoutubeVideo.toSerializable() = SerializableYoutubeVideo(url, name, author, length)
@Serializable @Serializable
data class SongResponse( data class SongResponse(
val type: Int, val type: Int,

View File

@@ -1,13 +1,20 @@
package space.mori.chzzk_bot.webserver.routes package space.mori.chzzk_bot.webserver.routes
import io.ktor.server.application.ApplicationStopped
import io.ktor.server.routing.* import io.ktor.server.routing.*
import io.ktor.server.websocket.* import io.ktor.server.websocket.*
import io.ktor.websocket.* import io.ktor.websocket.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import org.koin.java.KoinJavaComponent.inject import org.koin.java.KoinJavaComponent.inject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.events.* import space.mori.chzzk_bot.common.events.*
@@ -17,14 +24,19 @@ import space.mori.chzzk_bot.webserver.utils.CurrentTimer
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
val timerScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun Routing.wsTimerRoutes() { fun Routing.wsTimerRoutes() {
environment.monitor.subscribe(ApplicationStopped) {
timerScope.cancel()
}
val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>() val sessions = ConcurrentHashMap<String, ConcurrentLinkedQueue<WebSocketServerSession>>()
val logger = LoggerFactory.getLogger("WSTimerRoutes") val logger = LoggerFactory.getLogger("WSTimerRoutes")
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
val ackMap = ConcurrentHashMap<String, ConcurrentHashMap<WebSocketServerSession, CompletableDeferred<Boolean>>>()
fun addSession(uid: String, session: WebSocketServerSession) { fun addSession(uid: String, session: WebSocketServerSession) {
sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session) sessions.computeIfAbsent(uid) { ConcurrentLinkedQueue() }.add(session)
} }
fun removeSession(uid: String, session: WebSocketServerSession) { fun removeSession(uid: String, session: WebSocketServerSession) {
sessions[uid]?.remove(session) sessions[uid]?.remove(session)
if(sessions[uid]?.isEmpty() == true) { if(sessions[uid]?.isEmpty() == true) {
@@ -32,82 +44,132 @@ fun Routing.wsTimerRoutes() {
} }
} }
webSocket("/timer/{uid}") { suspend fun sendWithRetry(
val uid = call.parameters["uid"] session: WebSocketServerSession,
val user = uid?.let { UserService.getUser(it) } message: TimerResponse,
if (uid == null) { maxRetries: Int = 3,
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) delayMillis: Long = 2000L
return@webSocket ): Boolean {
} var attempt = 0
if (user == null) { while (attempt < maxRetries) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID")) try {
return@webSocket session.sendSerialized(message)
} val ackDeferred = CompletableDeferred<Boolean>()
ackMap.computeIfAbsent(message.uid) { ConcurrentHashMap() }[session] = ackDeferred
addSession(uid, this) val ackReceived = withTimeoutOrNull(delayMillis) { ackDeferred.await() } ?: false
val timer = CurrentTimer.getTimer(user) if (ackReceived) {
ackMap[message.uid]?.remove(session)
if(timer?.type == TimerType.STREAM_OFF) { return true
CoroutineScope(Dispatchers.Default).launch {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null))
}
} else {
CoroutineScope(Dispatchers.Default).launch {
if (timer == null) {
sendSerialized(
TimerResponse(
TimerConfigService.getConfig(user)?.option ?: TimerType.REMOVE.value,
null
)
)
} else { } else {
sendSerialized( attempt++
TimerResponse( logger.warn("ACK not received for message to ${message.uid} on attempt $attempt.")
timer.type.value, }
timer.time } catch (e: Exception) {
) attempt++
) logger.info("Failed to send message on attempt $attempt. Retrying in $delayMillis ms.")
e.printStackTrace()
delay(delayMillis)
}
}
return false
}
fun broadcastMessage(uid: String, message: TimerResponse) {
val userSessions = sessions[uid]
userSessions?.forEach { session ->
timerScope.launch {
val success = sendWithRetry(session, message.copy(uid = uid))
if (!success) {
logger.info("Removing session for user $uid due to repeated failures.")
removeSession(uid, session)
} }
} }
} }
}
webSocket("/timer/{uid}") {
val uid = call.parameters["uid"]
val user = uid?.let { UserService.getUser(it) }
if (uid == null || user == null) {
close(CloseReason(CloseReason.Codes.CANNOT_ACCEPT, "Invalid UID"))
return@webSocket
}
addSession(uid, this)
val timer = CurrentTimer.getTimer(user)
if (timer?.type == TimerType.STREAM_OFF) {
timerScope.launch {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null, uid))
}
} else {
timerScope.launch {
if(timer?.type == TimerType.STREAM_OFF) {
sendSerialized(TimerResponse(TimerType.STREAM_OFF.value, null, uid))
} else {
if (timer == null) {
sendSerialized(
TimerResponse(
TimerConfigService.getConfig(user)?.option ?: TimerType.REMOVE.value,
null,
uid
)
)
} else {
sendSerialized(
TimerResponse(
timer.type.value,
timer.time,
uid
)
)
}
}
}
}
try { try {
for (frame in incoming) { for (frame in incoming) {
when(frame) { when(frame) {
is Frame.Text -> { is Frame.Text -> {
if(frame.readText().trim() == "ping") { val text = frame.readText().trim()
if(text == "ping") {
send("pong") send("pong")
} else {
val data = Json.decodeFromString<TimerRequest>(text)
if (data.type == TimerType.ACK.value) {
ackMap[data.uid]?.get(this)?.complete(true)
ackMap[data.uid]?.remove(this)
}
} }
} }
is Frame.Ping -> send(Frame.Pong(frame.data)) is Frame.Ping -> send(Frame.Pong(frame.data))
else -> { else -> {}
}
} }
} }
} catch(e: ClosedReceiveChannelException) { } catch(e: ClosedReceiveChannelException) {
logger.error("Error in WebSocket: ${e.message}") logger.error("Error in WebSocket: ${e.message}")
} finally { } finally {
removeSession(uid, this) removeSession(uid, this)
ackMap[uid]?.remove(this)
} }
} }
val dispatcher: CoroutinesEventBus by inject(CoroutinesEventBus::class.java)
dispatcher.subscribe(TimerEvent::class) { dispatcher.subscribe(TimerEvent::class) {
logger.debug("TimerEvent: {} / {}", it.uid, it.type) logger.debug("TimerEvent: {} / {}", it.uid, it.type)
val user = UserService.getUser(it.uid) val user = UserService.getUser(it.uid)
CurrentTimer.setTimer(user!!, it) CurrentTimer.setTimer(user!!, it)
CoroutineScope(Dispatchers.Default).launch { timerScope.launch {
sessions[it.uid]?.forEach { ws -> broadcastMessage(it.uid, TimerResponse(it.type.value, it.time ?: "", it.uid))
ws.sendSerialized(TimerResponse(it.type.value, it.time ?: ""))
}
} }
} }
} }
@Serializable @Serializable
data class TimerResponse( data class TimerResponse(
val type: Int, val type: Int,
val time: String? val time: String?,
val uid: String
)
@Serializable
data class TimerRequest(
val type: Int,
val uid: String
) )

View File

@@ -1,50 +0,0 @@
package space.mori.chzzk_bot.webserver.utils
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.slf4j.LoggerFactory
import space.mori.chzzk_bot.common.utils.IData
import space.mori.chzzk_bot.common.utils.IStreamInfo
import space.mori.chzzk_bot.common.utils.getStreamInfo
import space.mori.chzzk_bot.common.utils.getUserInfo
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
object ChzzkUserCache {
private val cache = ConcurrentHashMap<String, CachedUser>()
private const val EXP_SECONDS = 600L
private val mutex = Mutex()
private val logger = LoggerFactory.getLogger(this::class.java)
suspend fun getCachedUser(id: String): IData<IStreamInfo?>? {
val now = Instant.now()
var user = cache[id]
if(user == null || user.timestamp.plusSeconds(EXP_SECONDS).isBefore(now)) {
mutex.withLock {
if(user == null || user.timestamp.plusSeconds(EXP_SECONDS)?.isBefore(now) != false) {
var findUser = getStreamInfo(id)
if(findUser.content == null) {
val userInfo = getUserInfo(id)
if(userInfo.content == null) return null
findUser = IData(200, null, IStreamInfo(
channel = userInfo.content!!
))
}
user = CachedUser(findUser)
user.let { cache[id] = user }
}
}
}
return cache[id]?.user
}
}
data class CachedUser(
val user: IData<IStreamInfo?>,
val timestamp: Instant = Instant.now(),
)