android: add websocket realtime manager and room event handling
This commit is contained in:
@@ -37,3 +37,10 @@
|
||||
- Implemented `NetworkChatRepository` with cache-first flow strategy (Room first, then server sync).
|
||||
- Added chat domain contracts/use-cases (`ChatRepository`, observe/refresh use-cases).
|
||||
- Wired chat API/repository via Hilt modules.
|
||||
|
||||
### Step 7 - Realtime manager and chat list updates
|
||||
- Added a unified realtime manager abstraction and WebSocket implementation for `/api/v1/realtime/ws?token=...`.
|
||||
- Implemented auto-reconnect with exponential backoff and max cap.
|
||||
- Added realtime event parser for `receive_message`, `message_updated`, `message_deleted`, `chat_updated`, `chat_deleted`, `user_online`, `user_offline`.
|
||||
- Added use-case level realtime event handling that updates Room and triggers repository refreshes when needed.
|
||||
- Wired realtime manager into DI.
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
package ru.daemonlord.messenger.data.realtime
|
||||
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.contentOrNull
|
||||
import kotlinx.serialization.json.jsonObject
|
||||
import kotlinx.serialization.json.jsonPrimitive
|
||||
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
|
||||
@Singleton
|
||||
class RealtimeEventParser @Inject constructor(
|
||||
private val json: Json,
|
||||
) {
|
||||
|
||||
fun parse(raw: String): RealtimeEvent {
|
||||
val root = runCatching { json.parseToJsonElement(raw).jsonObject }.getOrNull()
|
||||
?: return RealtimeEvent.Ignored
|
||||
val event = root["event"].stringOrNull() ?: return RealtimeEvent.Ignored
|
||||
val payload = root["payload"]?.jsonObject ?: JsonObject(emptyMap())
|
||||
|
||||
return when (event) {
|
||||
"receive_message" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
val messageObject = payload["message"]?.jsonObject
|
||||
RealtimeEvent.ReceiveMessage(
|
||||
chatId = chatId,
|
||||
text = messageObject?.get("text").stringOrNull(),
|
||||
type = messageObject?.get("type").stringOrNull(),
|
||||
createdAt = messageObject?.get("created_at").stringOrNull(),
|
||||
)
|
||||
}
|
||||
|
||||
"message_updated" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
val messageObject = payload["message"]?.jsonObject
|
||||
RealtimeEvent.MessageUpdated(
|
||||
chatId = chatId,
|
||||
text = messageObject?.get("text").stringOrNull(),
|
||||
type = messageObject?.get("type").stringOrNull(),
|
||||
updatedAt = messageObject?.get("updated_at").stringOrNull(),
|
||||
)
|
||||
}
|
||||
|
||||
"message_deleted" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
RealtimeEvent.MessageDeleted(
|
||||
chatId = chatId,
|
||||
messageId = payload["message_id"].longOrNull(),
|
||||
)
|
||||
}
|
||||
|
||||
"chat_updated" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
RealtimeEvent.ChatUpdated(chatId = chatId)
|
||||
}
|
||||
|
||||
"chat_deleted" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
RealtimeEvent.ChatDeleted(chatId = chatId)
|
||||
}
|
||||
|
||||
"user_online" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
RealtimeEvent.UserOnline(chatId = chatId)
|
||||
}
|
||||
|
||||
"user_offline" -> {
|
||||
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
|
||||
RealtimeEvent.UserOffline(
|
||||
chatId = chatId,
|
||||
lastSeenAt = payload["last_seen_at"].stringOrNull(),
|
||||
)
|
||||
}
|
||||
|
||||
else -> RealtimeEvent.Ignored
|
||||
}
|
||||
}
|
||||
|
||||
private fun JsonElement?.stringOrNull(): String? {
|
||||
return this?.jsonPrimitive?.contentOrNull
|
||||
}
|
||||
|
||||
private fun JsonElement?.longOrNull(): Long? {
|
||||
return this?.jsonPrimitive?.contentOrNull?.toLongOrNull()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
package ru.daemonlord.messenger.data.realtime
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.asSharedFlow
|
||||
import kotlinx.coroutines.launch
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.Response
|
||||
import okhttp3.WebSocket
|
||||
import okhttp3.WebSocketListener
|
||||
import ru.daemonlord.messenger.BuildConfig
|
||||
import ru.daemonlord.messenger.core.token.TokenRepository
|
||||
import ru.daemonlord.messenger.di.RefreshClient
|
||||
import ru.daemonlord.messenger.domain.realtime.RealtimeManager
|
||||
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
|
||||
@Singleton
|
||||
class WsRealtimeManager @Inject constructor(
|
||||
@RefreshClient private val okHttpClient: OkHttpClient,
|
||||
private val tokenRepository: TokenRepository,
|
||||
private val parser: RealtimeEventParser,
|
||||
) : RealtimeManager {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
private val eventFlow = MutableSharedFlow<RealtimeEvent>(extraBufferCapacity = 64)
|
||||
private var socket: WebSocket? = null
|
||||
private val isConnected = AtomicBoolean(false)
|
||||
private val manualDisconnect = AtomicBoolean(false)
|
||||
private var reconnectDelayMs: Long = INITIAL_RECONNECT_MS
|
||||
|
||||
override val events: Flow<RealtimeEvent> = eventFlow.asSharedFlow()
|
||||
|
||||
override fun connect() {
|
||||
if (isConnected.get()) return
|
||||
manualDisconnect.set(false)
|
||||
scope.launch { openSocket() }
|
||||
}
|
||||
|
||||
override fun disconnect() {
|
||||
manualDisconnect.set(true)
|
||||
isConnected.set(false)
|
||||
socket?.close(1000, "Client disconnect")
|
||||
socket = null
|
||||
}
|
||||
|
||||
private suspend fun openSocket() {
|
||||
val accessToken = tokenRepository.getTokens()?.accessToken ?: return
|
||||
val wsUrl = BuildConfig.API_BASE_URL
|
||||
.replace("http://", "ws://")
|
||||
.replace("https://", "wss://")
|
||||
.trimEnd('/') + "/api/v1/realtime/ws?token=$accessToken"
|
||||
val request = Request.Builder()
|
||||
.url(wsUrl)
|
||||
.build()
|
||||
socket = okHttpClient.newWebSocket(request, listener)
|
||||
}
|
||||
|
||||
private fun scheduleReconnect() {
|
||||
if (manualDisconnect.get()) return
|
||||
scope.launch {
|
||||
delay(reconnectDelayMs)
|
||||
reconnectDelayMs = (reconnectDelayMs * 2).coerceAtMost(MAX_RECONNECT_MS)
|
||||
openSocket()
|
||||
}
|
||||
}
|
||||
|
||||
private val listener = object : WebSocketListener() {
|
||||
override fun onOpen(webSocket: WebSocket, response: Response) {
|
||||
isConnected.set(true)
|
||||
reconnectDelayMs = INITIAL_RECONNECT_MS
|
||||
}
|
||||
|
||||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
eventFlow.tryEmit(parser.parse(text))
|
||||
}
|
||||
|
||||
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
|
||||
isConnected.set(false)
|
||||
webSocket.close(code, reason)
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
isConnected.set(false)
|
||||
scheduleReconnect()
|
||||
}
|
||||
|
||||
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
|
||||
isConnected.set(false)
|
||||
scheduleReconnect()
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
fun shutdown() {
|
||||
disconnect()
|
||||
scope.cancel()
|
||||
}
|
||||
|
||||
private companion object {
|
||||
const val INITIAL_RECONNECT_MS = 1_000L
|
||||
const val MAX_RECONNECT_MS = 30_000L
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package ru.daemonlord.messenger.di
|
||||
|
||||
import dagger.Binds
|
||||
import dagger.Module
|
||||
import dagger.hilt.InstallIn
|
||||
import dagger.hilt.components.SingletonComponent
|
||||
import ru.daemonlord.messenger.data.realtime.WsRealtimeManager
|
||||
import ru.daemonlord.messenger.domain.realtime.RealtimeManager
|
||||
import javax.inject.Singleton
|
||||
|
||||
@Module
|
||||
@InstallIn(SingletonComponent::class)
|
||||
abstract class RealtimeModule {
|
||||
|
||||
@Binds
|
||||
@Singleton
|
||||
abstract fun bindRealtimeManager(
|
||||
manager: WsRealtimeManager,
|
||||
): RealtimeManager
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package ru.daemonlord.messenger.domain.realtime
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
|
||||
|
||||
interface RealtimeManager {
|
||||
val events: Flow<RealtimeEvent>
|
||||
fun connect()
|
||||
fun disconnect()
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package ru.daemonlord.messenger.domain.realtime.model
|
||||
|
||||
sealed interface RealtimeEvent {
|
||||
data class ReceiveMessage(
|
||||
val chatId: Long,
|
||||
val text: String?,
|
||||
val type: String?,
|
||||
val createdAt: String?,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class MessageUpdated(
|
||||
val chatId: Long,
|
||||
val text: String?,
|
||||
val type: String?,
|
||||
val updatedAt: String?,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class MessageDeleted(
|
||||
val chatId: Long,
|
||||
val messageId: Long?,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class ChatUpdated(
|
||||
val chatId: Long,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class ChatDeleted(
|
||||
val chatId: Long,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class UserOnline(
|
||||
val chatId: Long,
|
||||
) : RealtimeEvent
|
||||
|
||||
data class UserOffline(
|
||||
val chatId: Long,
|
||||
val lastSeenAt: String?,
|
||||
) : RealtimeEvent
|
||||
|
||||
data object Ignored : RealtimeEvent
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
package ru.daemonlord.messenger.domain.realtime.usecase
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.flow.collectLatest
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.daemonlord.messenger.data.chat.local.dao.ChatDao
|
||||
import ru.daemonlord.messenger.domain.chat.repository.ChatRepository
|
||||
import ru.daemonlord.messenger.domain.realtime.RealtimeManager
|
||||
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
|
||||
import javax.inject.Inject
|
||||
import javax.inject.Singleton
|
||||
|
||||
@Singleton
|
||||
class HandleRealtimeEventsUseCase @Inject constructor(
|
||||
private val realtimeManager: RealtimeManager,
|
||||
private val chatRepository: ChatRepository,
|
||||
private val chatDao: ChatDao,
|
||||
) {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
|
||||
private var collectionJob: Job? = null
|
||||
|
||||
fun start() {
|
||||
if (collectionJob?.isActive == true) return
|
||||
realtimeManager.connect()
|
||||
collectionJob = scope.launch {
|
||||
realtimeManager.events.collectLatest { event ->
|
||||
when (event) {
|
||||
is RealtimeEvent.ReceiveMessage -> {
|
||||
chatDao.updateLastMessage(
|
||||
chatId = event.chatId,
|
||||
lastMessageText = event.text,
|
||||
lastMessageType = event.type,
|
||||
lastMessageCreatedAt = event.createdAt,
|
||||
updatedSortAt = event.createdAt,
|
||||
)
|
||||
chatDao.incrementUnread(chatId = event.chatId)
|
||||
}
|
||||
|
||||
is RealtimeEvent.MessageUpdated -> {
|
||||
chatDao.updateLastMessage(
|
||||
chatId = event.chatId,
|
||||
lastMessageText = event.text,
|
||||
lastMessageType = event.type,
|
||||
lastMessageCreatedAt = event.updatedAt,
|
||||
updatedSortAt = event.updatedAt,
|
||||
)
|
||||
}
|
||||
|
||||
is RealtimeEvent.MessageDeleted -> {
|
||||
chatRepository.refreshChats(archived = false)
|
||||
chatRepository.refreshChats(archived = true)
|
||||
}
|
||||
|
||||
is RealtimeEvent.ChatUpdated -> {
|
||||
chatRepository.refreshChat(chatId = event.chatId)
|
||||
}
|
||||
|
||||
is RealtimeEvent.ChatDeleted -> {
|
||||
chatRepository.deleteChat(chatId = event.chatId)
|
||||
}
|
||||
|
||||
is RealtimeEvent.UserOnline -> {
|
||||
chatDao.updatePresence(
|
||||
chatId = event.chatId,
|
||||
isOnline = true,
|
||||
lastSeenAt = null,
|
||||
)
|
||||
}
|
||||
|
||||
is RealtimeEvent.UserOffline -> {
|
||||
chatDao.updatePresence(
|
||||
chatId = event.chatId,
|
||||
isOnline = false,
|
||||
lastSeenAt = event.lastSeenAt,
|
||||
)
|
||||
}
|
||||
|
||||
RealtimeEvent.Ignored -> Unit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
collectionJob?.cancel()
|
||||
collectionJob = null
|
||||
realtimeManager.disconnect()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user