android: harden realtime heartbeat and reconnect reconcile
Some checks failed
CI / test (push) Failing after 2m10s

This commit is contained in:
Codex
2026-03-09 13:01:06 +03:00
parent e91884e14a
commit 08815bac7b
6 changed files with 49 additions and 1 deletions

View File

@@ -136,3 +136,9 @@
- Added domain model/use-cases for invite-link creation and join-by-invite.
- Extended chat repository with invite operations and local chat upsert on successful join.
- Added minimal Chat List UI flow for join-by-invite token input with loading/error handling and auto-open of joined chat.
### Step 22 - Sprint P0 / 5) Realtime stability and reconcile
- Added heartbeat in WebSocket manager (`ping` interval + `pong` timeout detection) with forced reconnect on stale link.
- Improved socket lifecycle hygiene by cancelling heartbeat on close/failure/disconnect paths.
- Added `connect` event mapping and centralized reconcile trigger in realtime handler.
- On realtime reconnect, chat repository now refreshes `all` and `archived` snapshots to reduce stale state after transient disconnects.

View File

@@ -22,6 +22,8 @@ class RealtimeEventParser @Inject constructor(
val payload = root["payload"]?.jsonObject ?: JsonObject(emptyMap())
return when (event) {
"connect" -> RealtimeEvent.Connected
"receive_message" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
val messageObject = payload["message"]?.jsonObject
@@ -104,6 +106,8 @@ class RealtimeEventParser @Inject constructor(
RealtimeEvent.TypingStop(chatId = chatId, userId = payload["user_id"].longOrNull())
}
"pong" -> RealtimeEvent.Ignored
else -> RealtimeEvent.Ignored
}
}

View File

@@ -2,6 +2,7 @@ package ru.daemonlord.messenger.data.realtime
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
@@ -20,6 +21,7 @@ import ru.daemonlord.messenger.di.RefreshClient
import ru.daemonlord.messenger.domain.realtime.RealtimeManager
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import javax.inject.Inject
import javax.inject.Singleton
@@ -36,6 +38,8 @@ class WsRealtimeManager @Inject constructor(
private val isConnected = AtomicBoolean(false)
private val manualDisconnect = AtomicBoolean(false)
private var reconnectDelayMs: Long = INITIAL_RECONNECT_MS
private val lastPongAtMs = AtomicLong(0L)
private var heartbeatJob: Job? = null
override val events: Flow<RealtimeEvent> = eventFlow.asSharedFlow()
@@ -48,6 +52,8 @@ class WsRealtimeManager @Inject constructor(
override fun disconnect() {
manualDisconnect.set(true)
isConnected.set(false)
heartbeatJob?.cancel()
heartbeatJob = null
socket?.close(1000, "Client disconnect")
socket = null
}
@@ -73,28 +79,51 @@ class WsRealtimeManager @Inject constructor(
}
}
private fun startHeartbeat(webSocket: WebSocket) {
heartbeatJob?.cancel()
lastPongAtMs.set(System.currentTimeMillis())
heartbeatJob = scope.launch {
while (isConnected.get() && !manualDisconnect.get()) {
val now = System.currentTimeMillis()
if (now - lastPongAtMs.get() > PONG_TIMEOUT_MS) {
webSocket.close(1001, "Heartbeat timeout")
break
}
webSocket.send("""{"event":"ping","payload":{}}""")
delay(PING_INTERVAL_MS)
}
}
}
private val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
isConnected.set(true)
reconnectDelayMs = INITIAL_RECONNECT_MS
startHeartbeat(webSocket)
}
override fun onMessage(webSocket: WebSocket, text: String) {
if (text.contains("\"event\":\"pong\"")) {
lastPongAtMs.set(System.currentTimeMillis())
}
eventFlow.tryEmit(parser.parse(text))
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
isConnected.set(false)
heartbeatJob?.cancel()
webSocket.close(code, reason)
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
isConnected.set(false)
heartbeatJob?.cancel()
scheduleReconnect()
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
isConnected.set(false)
heartbeatJob?.cancel()
scheduleReconnect()
}
}
@@ -108,5 +137,7 @@ class WsRealtimeManager @Inject constructor(
private companion object {
const val INITIAL_RECONNECT_MS = 1_000L
const val MAX_RECONNECT_MS = 30_000L
const val PING_INTERVAL_MS = 25_000L
const val PONG_TIMEOUT_MS = 65_000L
}
}

View File

@@ -1,6 +1,8 @@
package ru.daemonlord.messenger.domain.realtime.model
sealed interface RealtimeEvent {
data object Connected : RealtimeEvent
data class ReceiveMessage(
val chatId: Long,
val messageId: Long,

View File

@@ -32,6 +32,11 @@ class HandleRealtimeEventsUseCase @Inject constructor(
collectionJob = scope.launch {
realtimeManager.events.collectLatest { event ->
when (event) {
RealtimeEvent.Connected -> {
chatRepository.refreshChats(archived = false)
chatRepository.refreshChats(archived = true)
}
is RealtimeEvent.ReceiveMessage -> {
messageDao.upsertMessages(
listOf(

View File

@@ -23,7 +23,7 @@
- [ ] Кэш медиа (Coil/Exo cache)
- [ ] Offline-first чтение истории
- [ ] Очередь отложенных действий (send/edit/delete)
- [ ] Конфликт-резолв и reconcile после reconnect
- [x] Конфликт-резолв и reconcile после reconnect
## 4. Авторизация и аккаунт
- [x] Login/Register flow (email-first)