diff --git a/android/CHANGELOG.md b/android/CHANGELOG.md index 6559b3c..8d90b96 100644 --- a/android/CHANGELOG.md +++ b/android/CHANGELOG.md @@ -136,3 +136,9 @@ - Added domain model/use-cases for invite-link creation and join-by-invite. - Extended chat repository with invite operations and local chat upsert on successful join. - Added minimal Chat List UI flow for join-by-invite token input with loading/error handling and auto-open of joined chat. + +### Step 22 - Sprint P0 / 5) Realtime stability and reconcile +- Added heartbeat in WebSocket manager (`ping` interval + `pong` timeout detection) with forced reconnect on stale link. +- Improved socket lifecycle hygiene by cancelling heartbeat on close/failure/disconnect paths. +- Added `connect` event mapping and centralized reconcile trigger in realtime handler. +- On realtime reconnect, chat repository now refreshes `all` and `archived` snapshots to reduce stale state after transient disconnects. diff --git a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt index 4297b8b..ddcb8e7 100644 --- a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt +++ b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/RealtimeEventParser.kt @@ -22,6 +22,8 @@ class RealtimeEventParser @Inject constructor( val payload = root["payload"]?.jsonObject ?: JsonObject(emptyMap()) return when (event) { + "connect" -> RealtimeEvent.Connected + "receive_message" -> { val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored val messageObject = payload["message"]?.jsonObject @@ -104,6 +106,8 @@ class RealtimeEventParser @Inject constructor( RealtimeEvent.TypingStop(chatId = chatId, userId = payload["user_id"].longOrNull()) } + "pong" -> RealtimeEvent.Ignored + else -> RealtimeEvent.Ignored } } diff --git a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt index 33d48c1..9fd2f16 100644 --- a/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt +++ b/android/app/src/main/java/ru/daemonlord/messenger/data/realtime/WsRealtimeManager.kt @@ -2,6 +2,7 @@ package ru.daemonlord.messenger.data.realtime import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.delay @@ -20,6 +21,7 @@ import ru.daemonlord.messenger.di.RefreshClient import ru.daemonlord.messenger.domain.realtime.RealtimeManager import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong import javax.inject.Inject import javax.inject.Singleton @@ -36,6 +38,8 @@ class WsRealtimeManager @Inject constructor( private val isConnected = AtomicBoolean(false) private val manualDisconnect = AtomicBoolean(false) private var reconnectDelayMs: Long = INITIAL_RECONNECT_MS + private val lastPongAtMs = AtomicLong(0L) + private var heartbeatJob: Job? = null override val events: Flow = eventFlow.asSharedFlow() @@ -48,6 +52,8 @@ class WsRealtimeManager @Inject constructor( override fun disconnect() { manualDisconnect.set(true) isConnected.set(false) + heartbeatJob?.cancel() + heartbeatJob = null socket?.close(1000, "Client disconnect") socket = null } @@ -73,28 +79,51 @@ class WsRealtimeManager @Inject constructor( } } + private fun startHeartbeat(webSocket: WebSocket) { + heartbeatJob?.cancel() + lastPongAtMs.set(System.currentTimeMillis()) + heartbeatJob = scope.launch { + while (isConnected.get() && !manualDisconnect.get()) { + val now = System.currentTimeMillis() + if (now - lastPongAtMs.get() > PONG_TIMEOUT_MS) { + webSocket.close(1001, "Heartbeat timeout") + break + } + webSocket.send("""{"event":"ping","payload":{}}""") + delay(PING_INTERVAL_MS) + } + } + } + private val listener = object : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { isConnected.set(true) reconnectDelayMs = INITIAL_RECONNECT_MS + startHeartbeat(webSocket) } override fun onMessage(webSocket: WebSocket, text: String) { + if (text.contains("\"event\":\"pong\"")) { + lastPongAtMs.set(System.currentTimeMillis()) + } eventFlow.tryEmit(parser.parse(text)) } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { isConnected.set(false) + heartbeatJob?.cancel() webSocket.close(code, reason) } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { isConnected.set(false) + heartbeatJob?.cancel() scheduleReconnect() } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { isConnected.set(false) + heartbeatJob?.cancel() scheduleReconnect() } } @@ -108,5 +137,7 @@ class WsRealtimeManager @Inject constructor( private companion object { const val INITIAL_RECONNECT_MS = 1_000L const val MAX_RECONNECT_MS = 30_000L + const val PING_INTERVAL_MS = 25_000L + const val PONG_TIMEOUT_MS = 65_000L } } diff --git a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt index bf12c87..ba12d95 100644 --- a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt +++ b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/model/RealtimeEvent.kt @@ -1,6 +1,8 @@ package ru.daemonlord.messenger.domain.realtime.model sealed interface RealtimeEvent { + data object Connected : RealtimeEvent + data class ReceiveMessage( val chatId: Long, val messageId: Long, diff --git a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt index cf39262..9a330ff 100644 --- a/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt +++ b/android/app/src/main/java/ru/daemonlord/messenger/domain/realtime/usecase/HandleRealtimeEventsUseCase.kt @@ -32,6 +32,11 @@ class HandleRealtimeEventsUseCase @Inject constructor( collectionJob = scope.launch { realtimeManager.events.collectLatest { event -> when (event) { + RealtimeEvent.Connected -> { + chatRepository.refreshChats(archived = false) + chatRepository.refreshChats(archived = true) + } + is RealtimeEvent.ReceiveMessage -> { messageDao.upsertMessages( listOf( diff --git a/docs/android-checklist.md b/docs/android-checklist.md index 9382418..b3a5190 100644 --- a/docs/android-checklist.md +++ b/docs/android-checklist.md @@ -23,7 +23,7 @@ - [ ] Кэш медиа (Coil/Exo cache) - [ ] Offline-first чтение истории - [ ] Очередь отложенных действий (send/edit/delete) -- [ ] Конфликт-резолв и reconcile после reconnect +- [x] Конфликт-резолв и reconcile после reconnect ## 4. Авторизация и аккаунт - [x] Login/Register flow (email-first)