diff --git a/android/CHANGELOG.md b/android/CHANGELOG.md index 7b33be1..04d23b6 100644 --- a/android/CHANGELOG.md +++ b/android/CHANGELOG.md @@ -37,3 +37,10 @@ - Implemented `NetworkChatRepository` with cache-first flow strategy (Room first, then server sync). - Added chat domain contracts/use-cases (`ChatRepository`, observe/refresh use-cases). - Wired chat API/repository via Hilt modules. + +### Step 7 - Realtime manager and chat list updates +- Added a unified realtime manager abstraction and WebSocket implementation for `/api/v1/realtime/ws?token=...`. +- Implemented auto-reconnect with exponential backoff and max cap. +- Added realtime event parser for `receive_message`, `message_updated`, `message_deleted`, `chat_updated`, `chat_deleted`, `user_online`, `user_offline`. +- Added use-case level realtime event handling that updates Room and triggers repository refreshes when needed. +- Wired realtime manager into DI. diff --git a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt new file mode 100644 index 0000000..bbfe6f3 --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt @@ -0,0 +1,89 @@ +package ru.daemonlord.messenger.data.realtime + +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.contentOrNull +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class RealtimeEventParser @Inject constructor( + private val json: Json, +) { + + fun parse(raw: String): RealtimeEvent { + val root = runCatching { json.parseToJsonElement(raw).jsonObject }.getOrNull() + ?: return RealtimeEvent.Ignored + val event = root["event"].stringOrNull() ?: return RealtimeEvent.Ignored + val payload = root["payload"]?.jsonObject ?: JsonObject(emptyMap()) + + return when (event) { + "receive_message" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + val messageObject = payload["message"]?.jsonObject + RealtimeEvent.ReceiveMessage( + chatId = chatId, + text = messageObject?.get("text").stringOrNull(), + type = messageObject?.get("type").stringOrNull(), + createdAt = messageObject?.get("created_at").stringOrNull(), + ) + } + + "message_updated" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + val messageObject = payload["message"]?.jsonObject + RealtimeEvent.MessageUpdated( + chatId = chatId, + text = messageObject?.get("text").stringOrNull(), + type = messageObject?.get("type").stringOrNull(), + updatedAt = messageObject?.get("updated_at").stringOrNull(), + ) + } + + "message_deleted" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + RealtimeEvent.MessageDeleted( + chatId = chatId, + messageId = payload["message_id"].longOrNull(), + ) + } + + "chat_updated" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + RealtimeEvent.ChatUpdated(chatId = chatId) + } + + "chat_deleted" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + RealtimeEvent.ChatDeleted(chatId = chatId) + } + + "user_online" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + RealtimeEvent.UserOnline(chatId = chatId) + } + + "user_offline" -> { + val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored + RealtimeEvent.UserOffline( + chatId = chatId, + lastSeenAt = payload["last_seen_at"].stringOrNull(), + ) + } + + else -> RealtimeEvent.Ignored + } + } + + private fun JsonElement?.stringOrNull(): String? { + return this?.jsonPrimitive?.contentOrNull + } + + private fun JsonElement?.longOrNull(): Long? { + return this?.jsonPrimitive?.contentOrNull?.toLongOrNull() + } +} diff --git a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt new file mode 100644 index 0000000..33d48c1 --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt @@ -0,0 +1,112 @@ +package ru.daemonlord.messenger.data.realtime + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response +import okhttp3.WebSocket +import okhttp3.WebSocketListener +import ru.daemonlord.messenger.BuildConfig +import ru.daemonlord.messenger.core.token.TokenRepository +import ru.daemonlord.messenger.di.RefreshClient +import ru.daemonlord.messenger.domain.realtime.RealtimeManager +import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent +import java.util.concurrent.atomic.AtomicBoolean +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class WsRealtimeManager @Inject constructor( + @RefreshClient private val okHttpClient: OkHttpClient, + private val tokenRepository: TokenRepository, + private val parser: RealtimeEventParser, +) : RealtimeManager { + + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private val eventFlow = MutableSharedFlow(extraBufferCapacity = 64) + private var socket: WebSocket? = null + private val isConnected = AtomicBoolean(false) + private val manualDisconnect = AtomicBoolean(false) + private var reconnectDelayMs: Long = INITIAL_RECONNECT_MS + + override val events: Flow = eventFlow.asSharedFlow() + + override fun connect() { + if (isConnected.get()) return + manualDisconnect.set(false) + scope.launch { openSocket() } + } + + override fun disconnect() { + manualDisconnect.set(true) + isConnected.set(false) + socket?.close(1000, "Client disconnect") + socket = null + } + + private suspend fun openSocket() { + val accessToken = tokenRepository.getTokens()?.accessToken ?: return + val wsUrl = BuildConfig.API_BASE_URL + .replace("http://", "ws://") + .replace("https://", "wss://") + .trimEnd('/') + "/api/v1/realtime/ws?token=$accessToken" + val request = Request.Builder() + .url(wsUrl) + .build() + socket = okHttpClient.newWebSocket(request, listener) + } + + private fun scheduleReconnect() { + if (manualDisconnect.get()) return + scope.launch { + delay(reconnectDelayMs) + reconnectDelayMs = (reconnectDelayMs * 2).coerceAtMost(MAX_RECONNECT_MS) + openSocket() + } + } + + private val listener = object : WebSocketListener() { + override fun onOpen(webSocket: WebSocket, response: Response) { + isConnected.set(true) + reconnectDelayMs = INITIAL_RECONNECT_MS + } + + override fun onMessage(webSocket: WebSocket, text: String) { + eventFlow.tryEmit(parser.parse(text)) + } + + override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { + isConnected.set(false) + webSocket.close(code, reason) + } + + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + isConnected.set(false) + scheduleReconnect() + } + + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + isConnected.set(false) + scheduleReconnect() + } + } + + @Suppress("unused") + fun shutdown() { + disconnect() + scope.cancel() + } + + private companion object { + const val INITIAL_RECONNECT_MS = 1_000L + const val MAX_RECONNECT_MS = 30_000L + } +} diff --git a/android/app/src/main/java/ru/daemonlord/messenger/di/RealtimeModule.kt b/android/app/src/main/java/ru/daemonlord/messenger/di/RealtimeModule.kt new file mode 100644 index 0000000..9b9dbf7 --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/di/RealtimeModule.kt @@ -0,0 +1,20 @@ +package ru.daemonlord.messenger.di + +import dagger.Binds +import dagger.Module +import dagger.hilt.InstallIn +import dagger.hilt.components.SingletonComponent +import ru.daemonlord.messenger.data.realtime.WsRealtimeManager +import ru.daemonlord.messenger.domain.realtime.RealtimeManager +import javax.inject.Singleton + +@Module +@InstallIn(SingletonComponent::class) +abstract class RealtimeModule { + + @Binds + @Singleton + abstract fun bindRealtimeManager( + manager: WsRealtimeManager, + ): RealtimeManager +} diff --git a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/RealtimeManager.kt b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/RealtimeManager.kt new file mode 100644 index 0000000..d03daa5 --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/RealtimeManager.kt @@ -0,0 +1,10 @@ +package ru.daemonlord.messenger.domain.realtime + +import kotlinx.coroutines.flow.Flow +import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent + +interface RealtimeManager { + val events: Flow + fun connect() + fun disconnect() +} diff --git a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt new file mode 100644 index 0000000..d047a2b --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt @@ -0,0 +1,41 @@ +package ru.daemonlord.messenger.domain.realtime.model + +sealed interface RealtimeEvent { + data class ReceiveMessage( + val chatId: Long, + val text: String?, + val type: String?, + val createdAt: String?, + ) : RealtimeEvent + + data class MessageUpdated( + val chatId: Long, + val text: String?, + val type: String?, + val updatedAt: String?, + ) : RealtimeEvent + + data class MessageDeleted( + val chatId: Long, + val messageId: Long?, + ) : RealtimeEvent + + data class ChatUpdated( + val chatId: Long, + ) : RealtimeEvent + + data class ChatDeleted( + val chatId: Long, + ) : RealtimeEvent + + data class UserOnline( + val chatId: Long, + ) : RealtimeEvent + + data class UserOffline( + val chatId: Long, + val lastSeenAt: String?, + ) : RealtimeEvent + + data object Ignored : RealtimeEvent +} diff --git a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt new file mode 100644 index 0000000..b4c411c --- /dev/null +++ b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt @@ -0,0 +1,93 @@ +package ru.daemonlord.messenger.domain.realtime.usecase + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.flow.collectLatest +import kotlinx.coroutines.launch +import ru.daemonlord.messenger.data.chat.local.dao.ChatDao +import ru.daemonlord.messenger.domain.chat.repository.ChatRepository +import ru.daemonlord.messenger.domain.realtime.RealtimeManager +import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class HandleRealtimeEventsUseCase @Inject constructor( + private val realtimeManager: RealtimeManager, + private val chatRepository: ChatRepository, + private val chatDao: ChatDao, +) { + + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + private var collectionJob: Job? = null + + fun start() { + if (collectionJob?.isActive == true) return + realtimeManager.connect() + collectionJob = scope.launch { + realtimeManager.events.collectLatest { event -> + when (event) { + is RealtimeEvent.ReceiveMessage -> { + chatDao.updateLastMessage( + chatId = event.chatId, + lastMessageText = event.text, + lastMessageType = event.type, + lastMessageCreatedAt = event.createdAt, + updatedSortAt = event.createdAt, + ) + chatDao.incrementUnread(chatId = event.chatId) + } + + is RealtimeEvent.MessageUpdated -> { + chatDao.updateLastMessage( + chatId = event.chatId, + lastMessageText = event.text, + lastMessageType = event.type, + lastMessageCreatedAt = event.updatedAt, + updatedSortAt = event.updatedAt, + ) + } + + is RealtimeEvent.MessageDeleted -> { + chatRepository.refreshChats(archived = false) + chatRepository.refreshChats(archived = true) + } + + is RealtimeEvent.ChatUpdated -> { + chatRepository.refreshChat(chatId = event.chatId) + } + + is RealtimeEvent.ChatDeleted -> { + chatRepository.deleteChat(chatId = event.chatId) + } + + is RealtimeEvent.UserOnline -> { + chatDao.updatePresence( + chatId = event.chatId, + isOnline = true, + lastSeenAt = null, + ) + } + + is RealtimeEvent.UserOffline -> { + chatDao.updatePresence( + chatId = event.chatId, + isOnline = false, + lastSeenAt = event.lastSeenAt, + ) + } + + RealtimeEvent.Ignored -> Unit + } + } + } + } + + fun stop() { + collectionJob?.cancel() + collectionJob = null + realtimeManager.disconnect() + } +}