android: extend realtime pipeline for message stream updates
Some checks failed
CI / test (push) Failing after 2m16s

This commit is contained in:
Codex
2026-03-09 02:08:13 +03:00
parent 5a0add4d5c
commit c63f063726
4 changed files with 103 additions and 0 deletions

View File

@@ -78,3 +78,9 @@
- Added `MessageRepository` contracts/use-cases for observe/sync/pagination/send/edit/delete.
- Implemented `NetworkMessageRepository` with cache-first observation and optimistic text send.
- Wired message API and repository into Hilt modules.
### Step 13 - Sprint A / 3) Message realtime integration
- Extended realtime event model/parser with message-focused events (`message_delivered`, `message_read`, `typing_start`, `typing_stop`) and richer message payload mapping.
- Updated unified realtime handler to write `receive_message`, `message_updated`, `message_deleted` into `messages` Room state.
- Added delivery/read status updates in Room for message status events.
- Kept chat list sync updates in the same manager/use-case pipeline for consistency.

View File

@@ -25,8 +25,13 @@ class RealtimeEventParser @Inject constructor(
"receive_message" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
val messageObject = payload["message"]?.jsonObject
val messageId = messageObject?.get("id").longOrNull() ?: return RealtimeEvent.Ignored
val senderId = messageObject?.get("sender_id").longOrNull() ?: 0L
RealtimeEvent.ReceiveMessage(
chatId = chatId,
messageId = messageId,
senderId = senderId,
replyToMessageId = messageObject?.get("reply_to_message_id").longOrNull(),
text = messageObject?.get("text").stringOrNull(),
type = messageObject?.get("type").stringOrNull(),
createdAt = messageObject?.get("created_at").stringOrNull(),
@@ -36,8 +41,10 @@ class RealtimeEventParser @Inject constructor(
"message_updated" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
val messageObject = payload["message"]?.jsonObject
val messageId = messageObject?.get("id").longOrNull() ?: return RealtimeEvent.Ignored
RealtimeEvent.MessageUpdated(
chatId = chatId,
messageId = messageId,
text = messageObject?.get("text").stringOrNull(),
type = messageObject?.get("type").stringOrNull(),
updatedAt = messageObject?.get("updated_at").stringOrNull(),
@@ -75,6 +82,28 @@ class RealtimeEventParser @Inject constructor(
)
}
"message_delivered" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
val messageId = payload["message_id"].longOrNull() ?: return RealtimeEvent.Ignored
RealtimeEvent.MessageDelivered(chatId = chatId, messageId = messageId)
}
"message_read" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
val messageId = payload["message_id"].longOrNull() ?: return RealtimeEvent.Ignored
RealtimeEvent.MessageRead(chatId = chatId, messageId = messageId)
}
"typing_start" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
RealtimeEvent.TypingStart(chatId = chatId, userId = payload["user_id"].longOrNull())
}
"typing_stop" -> {
val chatId = payload["chat_id"].longOrNull() ?: return RealtimeEvent.Ignored
RealtimeEvent.TypingStop(chatId = chatId, userId = payload["user_id"].longOrNull())
}
else -> RealtimeEvent.Ignored
}
}

View File

@@ -3,6 +3,9 @@ package ru.daemonlord.messenger.domain.realtime.model
sealed interface RealtimeEvent {
data class ReceiveMessage(
val chatId: Long,
val messageId: Long,
val senderId: Long,
val replyToMessageId: Long?,
val text: String?,
val type: String?,
val createdAt: String?,
@@ -10,6 +13,7 @@ sealed interface RealtimeEvent {
data class MessageUpdated(
val chatId: Long,
val messageId: Long,
val text: String?,
val type: String?,
val updatedAt: String?,
@@ -37,5 +41,25 @@ sealed interface RealtimeEvent {
val lastSeenAt: String?,
) : RealtimeEvent
data class MessageDelivered(
val chatId: Long,
val messageId: Long,
) : RealtimeEvent
data class MessageRead(
val chatId: Long,
val messageId: Long,
) : RealtimeEvent
data class TypingStart(
val chatId: Long,
val userId: Long?,
) : RealtimeEvent
data class TypingStop(
val chatId: Long,
val userId: Long?,
) : RealtimeEvent
data object Ignored : RealtimeEvent
}

View File

@@ -7,6 +7,8 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import ru.daemonlord.messenger.data.chat.local.dao.ChatDao
import ru.daemonlord.messenger.data.message.local.dao.MessageDao
import ru.daemonlord.messenger.data.message.local.entity.MessageEntity
import ru.daemonlord.messenger.domain.chat.repository.ChatRepository
import ru.daemonlord.messenger.domain.realtime.RealtimeManager
import ru.daemonlord.messenger.domain.realtime.model.RealtimeEvent
@@ -18,6 +20,7 @@ class HandleRealtimeEventsUseCase @Inject constructor(
private val realtimeManager: RealtimeManager,
private val chatRepository: ChatRepository,
private val chatDao: ChatDao,
private val messageDao: MessageDao,
) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
@@ -30,6 +33,24 @@ class HandleRealtimeEventsUseCase @Inject constructor(
realtimeManager.events.collectLatest { event ->
when (event) {
is RealtimeEvent.ReceiveMessage -> {
messageDao.upsertMessages(
listOf(
MessageEntity(
id = event.messageId,
chatId = event.chatId,
senderId = event.senderId,
senderDisplayName = null,
senderUsername = null,
senderAvatarUrl = null,
replyToMessageId = event.replyToMessageId,
type = event.type ?: "text",
text = event.text,
status = null,
createdAt = event.createdAt ?: java.time.Instant.now().toString(),
updatedAt = null,
)
)
)
chatDao.updateLastMessage(
chatId = event.chatId,
lastMessageText = event.text,
@@ -41,6 +62,11 @@ class HandleRealtimeEventsUseCase @Inject constructor(
}
is RealtimeEvent.MessageUpdated -> {
messageDao.updateMessageText(
messageId = event.messageId,
text = event.text,
updatedAt = event.updatedAt,
)
chatDao.updateLastMessage(
chatId = event.chatId,
lastMessageText = event.text,
@@ -51,6 +77,7 @@ class HandleRealtimeEventsUseCase @Inject constructor(
}
is RealtimeEvent.MessageDeleted -> {
event.messageId?.let { messageDao.deleteMessage(it) }
chatRepository.refreshChats(archived = false)
chatRepository.refreshChats(archived = true)
}
@@ -79,6 +106,23 @@ class HandleRealtimeEventsUseCase @Inject constructor(
)
}
is RealtimeEvent.MessageDelivered -> {
messageDao.updateMessageStatus(
messageId = event.messageId,
status = "delivered",
)
}
is RealtimeEvent.MessageRead -> {
messageDao.updateMessageStatus(
messageId = event.messageId,
status = "read",
)
}
is RealtimeEvent.TypingStart -> Unit
is RealtimeEvent.TypingStop -> Unit
RealtimeEvent.Ignored -> Unit
}
}