android: add deferred offline queue for send edit delete actions
Some checks failed
CI / test (push) Failing after 2m21s
Some checks failed
CI / test (push) Failing after 2m21s
This commit is contained in:
@@ -347,3 +347,9 @@
|
||||
### Step 58 - Keep authenticated session when offline at app start
|
||||
- Updated auth restore flow in `AuthViewModel`: network errors during session restore no longer force logout when local tokens exist.
|
||||
- App now opens authenticated flow in offline mode instead of redirecting to login.
|
||||
|
||||
### Step 59 - Deferred message action queue (send/edit/delete)
|
||||
- Added Room-backed pending action queue (`pending_message_actions`) for message operations that fail due to network issues.
|
||||
- Implemented enqueue + optimistic behavior for `sendText`, `editMessage`, and `deleteMessage` on network failures.
|
||||
- Added automatic pending-action flush on chat sync/load-more and before new message operations.
|
||||
- Kept non-network server failures as immediate errors (no queueing), while allowing offline continuation.
|
||||
|
||||
@@ -6,8 +6,10 @@ import ru.daemonlord.messenger.data.chat.local.dao.ChatDao
|
||||
import ru.daemonlord.messenger.data.chat.local.entity.ChatEntity
|
||||
import ru.daemonlord.messenger.data.chat.local.entity.UserShortEntity
|
||||
import ru.daemonlord.messenger.data.message.local.dao.MessageDao
|
||||
import ru.daemonlord.messenger.data.message.local.dao.PendingMessageActionDao
|
||||
import ru.daemonlord.messenger.data.message.local.entity.MessageAttachmentEntity
|
||||
import ru.daemonlord.messenger.data.message.local.entity.MessageEntity
|
||||
import ru.daemonlord.messenger.data.message.local.entity.PendingMessageActionEntity
|
||||
|
||||
@Database(
|
||||
entities = [
|
||||
@@ -15,11 +17,13 @@ import ru.daemonlord.messenger.data.message.local.entity.MessageEntity
|
||||
UserShortEntity::class,
|
||||
MessageEntity::class,
|
||||
MessageAttachmentEntity::class,
|
||||
PendingMessageActionEntity::class,
|
||||
],
|
||||
version = 8,
|
||||
version = 9,
|
||||
exportSchema = false,
|
||||
)
|
||||
abstract class MessengerDatabase : RoomDatabase() {
|
||||
abstract fun chatDao(): ChatDao
|
||||
abstract fun messageDao(): MessageDao
|
||||
abstract fun pendingMessageActionDao(): PendingMessageActionDao
|
||||
}
|
||||
|
||||
@@ -30,6 +30,9 @@ interface MessageDao {
|
||||
@Query("SELECT COUNT(*) FROM messages WHERE chat_id = :chatId")
|
||||
suspend fun countMessages(chatId: Long): Int
|
||||
|
||||
@Query("SELECT chat_id FROM messages WHERE id = :messageId LIMIT 1")
|
||||
suspend fun getChatIdByMessageId(messageId: Long): Long?
|
||||
|
||||
@Query(
|
||||
"""
|
||||
SELECT * FROM messages
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
package ru.daemonlord.messenger.data.message.local.dao
|
||||
|
||||
import androidx.room.Dao
|
||||
import androidx.room.Insert
|
||||
import androidx.room.OnConflictStrategy
|
||||
import androidx.room.Query
|
||||
import ru.daemonlord.messenger.data.message.local.entity.PendingMessageActionEntity
|
||||
|
||||
@Dao
|
||||
interface PendingMessageActionDao {
|
||||
|
||||
@Insert(onConflict = OnConflictStrategy.REPLACE)
|
||||
suspend fun enqueue(action: PendingMessageActionEntity): Long
|
||||
|
||||
@Query(
|
||||
"""
|
||||
SELECT * FROM pending_message_actions
|
||||
ORDER BY created_at ASC, id ASC
|
||||
LIMIT :limit
|
||||
"""
|
||||
)
|
||||
suspend fun listPending(limit: Int = 50): List<PendingMessageActionEntity>
|
||||
|
||||
@Query("DELETE FROM pending_message_actions WHERE id = :id")
|
||||
suspend fun deleteById(id: Long)
|
||||
|
||||
@Query(
|
||||
"""
|
||||
UPDATE pending_message_actions
|
||||
SET attempts = attempts + 1
|
||||
WHERE id = :id
|
||||
"""
|
||||
)
|
||||
suspend fun incrementAttempts(id: Long)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package ru.daemonlord.messenger.data.message.local.entity
|
||||
|
||||
import androidx.room.ColumnInfo
|
||||
import androidx.room.Entity
|
||||
import androidx.room.Index
|
||||
import androidx.room.PrimaryKey
|
||||
|
||||
@Entity(
|
||||
tableName = "pending_message_actions",
|
||||
indices = [Index(value = ["created_at"]), Index(value = ["chat_id"])],
|
||||
)
|
||||
data class PendingMessageActionEntity(
|
||||
@PrimaryKey(autoGenerate = true)
|
||||
@ColumnInfo(name = "id")
|
||||
val id: Long = 0L,
|
||||
@ColumnInfo(name = "type")
|
||||
val type: String,
|
||||
@ColumnInfo(name = "chat_id")
|
||||
val chatId: Long,
|
||||
@ColumnInfo(name = "message_id")
|
||||
val messageId: Long?,
|
||||
@ColumnInfo(name = "local_message_id")
|
||||
val localMessageId: Long?,
|
||||
@ColumnInfo(name = "text")
|
||||
val text: String?,
|
||||
@ColumnInfo(name = "reply_to_message_id")
|
||||
val replyToMessageId: Long?,
|
||||
@ColumnInfo(name = "for_all")
|
||||
val forAll: Boolean?,
|
||||
@ColumnInfo(name = "attempts")
|
||||
val attempts: Int,
|
||||
@ColumnInfo(name = "created_at")
|
||||
val createdAt: String,
|
||||
)
|
||||
|
||||
@@ -13,7 +13,9 @@ import ru.daemonlord.messenger.data.message.api.MessageApiService
|
||||
import ru.daemonlord.messenger.core.token.TokenRepository
|
||||
import ru.daemonlord.messenger.data.common.toAppError
|
||||
import ru.daemonlord.messenger.data.message.local.dao.MessageDao
|
||||
import ru.daemonlord.messenger.data.message.local.dao.PendingMessageActionDao
|
||||
import ru.daemonlord.messenger.data.message.local.entity.MessageEntity
|
||||
import ru.daemonlord.messenger.data.message.local.entity.PendingMessageActionEntity
|
||||
import ru.daemonlord.messenger.data.message.mapper.toDomain
|
||||
import ru.daemonlord.messenger.data.message.mapper.toEntity
|
||||
import ru.daemonlord.messenger.data.media.api.MediaApiService
|
||||
@@ -43,6 +45,7 @@ import kotlinx.serialization.json.jsonPrimitive
|
||||
class NetworkMessageRepository @Inject constructor(
|
||||
private val messageApiService: MessageApiService,
|
||||
private val messageDao: MessageDao,
|
||||
private val pendingMessageActionDao: PendingMessageActionDao,
|
||||
private val chatDao: ChatDao,
|
||||
private val mediaRepository: MediaRepository,
|
||||
private val mediaApiService: MediaApiService,
|
||||
@@ -69,6 +72,7 @@ class NetworkMessageRepository @Inject constructor(
|
||||
}
|
||||
|
||||
override suspend fun syncRecentMessages(chatId: Long): AppResult<Unit> = withContext(ioDispatcher) {
|
||||
flushPendingActions(chatId = chatId)
|
||||
try {
|
||||
val remoteMessages = messageApiService.getMessages(chatId = chatId)
|
||||
val messageEntities = remoteMessages.map { it.toEntity() }
|
||||
@@ -92,6 +96,7 @@ class NetworkMessageRepository @Inject constructor(
|
||||
}
|
||||
|
||||
override suspend fun loadMoreMessages(chatId: Long, beforeMessageId: Long): AppResult<Unit> = withContext(ioDispatcher) {
|
||||
flushPendingActions(chatId = chatId)
|
||||
try {
|
||||
val page = messageApiService.getMessages(
|
||||
chatId = chatId,
|
||||
@@ -128,6 +133,7 @@ class NetworkMessageRepository @Inject constructor(
|
||||
text: String,
|
||||
replyToMessageId: Long?,
|
||||
): AppResult<Unit> = withContext(ioDispatcher) {
|
||||
flushPendingActions(chatId = chatId)
|
||||
val tempId = -System.currentTimeMillis()
|
||||
val tempMessage = MessageEntity(
|
||||
id = tempId,
|
||||
@@ -177,12 +183,39 @@ class NetworkMessageRepository @Inject constructor(
|
||||
)
|
||||
AppResult.Success(Unit)
|
||||
} catch (error: Throwable) {
|
||||
val mapped = error.toAppError()
|
||||
if (mapped is AppError.Network) {
|
||||
pendingMessageActionDao.enqueue(
|
||||
PendingMessageActionEntity(
|
||||
type = PendingActionType.SEND_TEXT.name,
|
||||
chatId = chatId,
|
||||
messageId = null,
|
||||
localMessageId = tempId,
|
||||
text = text,
|
||||
replyToMessageId = replyToMessageId,
|
||||
forAll = null,
|
||||
attempts = 0,
|
||||
createdAt = java.time.Instant.now().toString(),
|
||||
)
|
||||
)
|
||||
AppResult.Success(Unit)
|
||||
} else {
|
||||
messageDao.deleteMessage(tempId)
|
||||
AppResult.Error(error.toAppError())
|
||||
AppResult.Error(mapped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun editMessage(messageId: Long, newText: String): AppResult<Unit> = withContext(ioDispatcher) {
|
||||
val chatId = messageDao.getChatIdByMessageId(messageId) ?: 0L
|
||||
if (chatId > 0L) {
|
||||
flushPendingActions(chatId = chatId)
|
||||
}
|
||||
messageDao.updateMessageText(
|
||||
messageId = messageId,
|
||||
text = newText,
|
||||
updatedAt = java.time.Instant.now().toString(),
|
||||
)
|
||||
try {
|
||||
val updated = messageApiService.editMessage(
|
||||
messageId = messageId,
|
||||
@@ -191,11 +224,33 @@ class NetworkMessageRepository @Inject constructor(
|
||||
messageDao.upsertMessages(listOf(updated.toEntity()))
|
||||
AppResult.Success(Unit)
|
||||
} catch (error: Throwable) {
|
||||
AppResult.Error(error.toAppError())
|
||||
val mapped = error.toAppError()
|
||||
if (mapped is AppError.Network) {
|
||||
pendingMessageActionDao.enqueue(
|
||||
PendingMessageActionEntity(
|
||||
type = PendingActionType.EDIT.name,
|
||||
chatId = chatId,
|
||||
messageId = messageId,
|
||||
localMessageId = null,
|
||||
text = newText,
|
||||
replyToMessageId = null,
|
||||
forAll = null,
|
||||
attempts = 0,
|
||||
createdAt = java.time.Instant.now().toString(),
|
||||
)
|
||||
)
|
||||
AppResult.Success(Unit)
|
||||
} else {
|
||||
AppResult.Error(mapped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun deleteMessage(messageId: Long, forAll: Boolean): AppResult<Unit> = withContext(ioDispatcher) {
|
||||
val chatId = messageDao.getChatIdByMessageId(messageId) ?: 0L
|
||||
if (chatId > 0L) {
|
||||
flushPendingActions(chatId = chatId)
|
||||
}
|
||||
try {
|
||||
messageApiService.deleteMessage(
|
||||
messageId = messageId,
|
||||
@@ -204,7 +259,26 @@ class NetworkMessageRepository @Inject constructor(
|
||||
messageDao.deleteMessage(messageId)
|
||||
AppResult.Success(Unit)
|
||||
} catch (error: Throwable) {
|
||||
AppResult.Error(error.toAppError())
|
||||
val mapped = error.toAppError()
|
||||
if (mapped is AppError.Network) {
|
||||
messageDao.deleteMessage(messageId)
|
||||
pendingMessageActionDao.enqueue(
|
||||
PendingMessageActionEntity(
|
||||
type = PendingActionType.DELETE.name,
|
||||
chatId = chatId,
|
||||
messageId = messageId,
|
||||
localMessageId = null,
|
||||
text = null,
|
||||
replyToMessageId = null,
|
||||
forAll = forAll,
|
||||
attempts = 0,
|
||||
createdAt = java.time.Instant.now().toString(),
|
||||
)
|
||||
)
|
||||
AppResult.Success(Unit)
|
||||
} else {
|
||||
AppResult.Error(mapped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -397,6 +471,98 @@ class NetworkMessageRepository @Inject constructor(
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun flushPendingActions(chatId: Long) {
|
||||
val pending = pendingMessageActionDao.listPending(limit = 100)
|
||||
for (action in pending) {
|
||||
if (action.chatId != chatId && action.chatId != 0L) continue
|
||||
when (val result = performPendingAction(action)) {
|
||||
is AppResult.Success -> pendingMessageActionDao.deleteById(action.id)
|
||||
is AppResult.Error -> {
|
||||
pendingMessageActionDao.incrementAttempts(action.id)
|
||||
if (result.reason is AppError.Network) {
|
||||
return
|
||||
} else {
|
||||
pendingMessageActionDao.deleteById(action.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun performPendingAction(action: PendingMessageActionEntity): AppResult<Unit> {
|
||||
val type = runCatching { PendingActionType.valueOf(action.type) }.getOrNull()
|
||||
?: return AppResult.Success(Unit)
|
||||
return when (type) {
|
||||
PendingActionType.SEND_TEXT -> {
|
||||
val text = action.text ?: return AppResult.Success(Unit)
|
||||
val chatId = action.chatId
|
||||
runCatching {
|
||||
messageApiService.sendMessage(
|
||||
request = MessageCreateRequestDto(
|
||||
chatId = chatId,
|
||||
type = "text",
|
||||
text = text,
|
||||
clientMessageId = UUID.randomUUID().toString(),
|
||||
replyToMessageId = action.replyToMessageId,
|
||||
)
|
||||
)
|
||||
}.fold(
|
||||
onSuccess = { sent ->
|
||||
action.localMessageId?.let { messageDao.deleteMessage(it) }
|
||||
messageDao.upsertMessages(listOf(sent.toEntity()))
|
||||
chatDao.updateLastMessage(
|
||||
chatId = chatId,
|
||||
lastMessageText = sent.text,
|
||||
lastMessageType = sent.type,
|
||||
lastMessageCreatedAt = sent.createdAt,
|
||||
updatedSortAt = sent.createdAt,
|
||||
)
|
||||
AppResult.Success(Unit)
|
||||
},
|
||||
onFailure = { error -> AppResult.Error(error.toAppError()) }
|
||||
)
|
||||
}
|
||||
|
||||
PendingActionType.EDIT -> {
|
||||
val messageId = action.messageId ?: return AppResult.Success(Unit)
|
||||
val text = action.text ?: return AppResult.Success(Unit)
|
||||
runCatching {
|
||||
messageApiService.editMessage(
|
||||
messageId = messageId,
|
||||
request = MessageUpdateRequestDto(text = text),
|
||||
)
|
||||
}.fold(
|
||||
onSuccess = { updated ->
|
||||
messageDao.upsertMessages(listOf(updated.toEntity()))
|
||||
AppResult.Success(Unit)
|
||||
},
|
||||
onFailure = { error -> AppResult.Error(error.toAppError()) }
|
||||
)
|
||||
}
|
||||
|
||||
PendingActionType.DELETE -> {
|
||||
val messageId = action.messageId ?: return AppResult.Success(Unit)
|
||||
runCatching {
|
||||
messageApiService.deleteMessage(
|
||||
messageId = messageId,
|
||||
forAll = action.forAll == true,
|
||||
)
|
||||
}.fold(
|
||||
onSuccess = {
|
||||
AppResult.Success(Unit)
|
||||
},
|
||||
onFailure = { error -> AppResult.Error(error.toAppError()) }
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private enum class PendingActionType {
|
||||
SEND_TEXT,
|
||||
EDIT,
|
||||
DELETE,
|
||||
}
|
||||
|
||||
private fun String.extractUserIdFromJwt(): Long? {
|
||||
val payloadPart = split('.').getOrNull(1) ?: return null
|
||||
val normalized = payloadPart
|
||||
|
||||
@@ -10,6 +10,7 @@ import dagger.hilt.components.SingletonComponent
|
||||
import ru.daemonlord.messenger.data.chat.local.dao.ChatDao
|
||||
import ru.daemonlord.messenger.data.chat.local.db.MessengerDatabase
|
||||
import ru.daemonlord.messenger.data.message.local.dao.MessageDao
|
||||
import ru.daemonlord.messenger.data.message.local.dao.PendingMessageActionDao
|
||||
import javax.inject.Singleton
|
||||
|
||||
@Module
|
||||
@@ -36,4 +37,9 @@ object DatabaseModule {
|
||||
@Provides
|
||||
@Singleton
|
||||
fun provideMessageDao(database: MessengerDatabase): MessageDao = database.messageDao()
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
fun providePendingMessageActionDao(database: MessengerDatabase): PendingMessageActionDao =
|
||||
database.pendingMessageActionDao()
|
||||
}
|
||||
|
||||
@@ -107,6 +107,7 @@ class NetworkMessageRepositoryTest {
|
||||
return NetworkMessageRepository(
|
||||
messageApiService = fakeApi,
|
||||
messageDao = db.messageDao(),
|
||||
pendingMessageActionDao = db.pendingMessageActionDao(),
|
||||
chatDao = db.chatDao(),
|
||||
mediaRepository = FakeMediaRepository(),
|
||||
mediaApiService = FakeMediaApiService(),
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
- [x] DataStore для настроек
|
||||
- [ ] Кэш медиа (Coil/Exo cache)
|
||||
- [x] Offline-first чтение истории
|
||||
- [ ] Очередь отложенных действий (send/edit/delete)
|
||||
- [x] Очередь отложенных действий (send/edit/delete)
|
||||
- [x] Конфликт-резолв и reconcile после reconnect
|
||||
|
||||
## 4. Авторизация и аккаунт
|
||||
|
||||
Reference in New Issue
Block a user