From 65c20faecde7e531c6b26cd5762e28a6ef389565 Mon Sep 17 00:00:00 2001 From: benya Date: Sun, 8 Mar 2026 19:55:32 +0300 Subject: [PATCH] fix(realtime): clear typing and recording indicators on disconnect --- app/realtime/service.py | 48 +++++++++++++++++++++++++++++++++++++++++ docs/api-reference.md | 2 ++ 2 files changed, 50 insertions(+) diff --git a/app/realtime/service.py b/app/realtime/service.py index 10333bf..10c9fe6 100644 --- a/app/realtime/service.py +++ b/app/realtime/service.py @@ -26,6 +26,9 @@ class RealtimeGateway: self._distributed_enabled = False self._connections: dict[int, dict[str, ConnectionContext]] = defaultdict(dict) self._chat_subscribers: dict[int, set[int]] = defaultdict(set) + self._typing_chats_by_user: dict[int, set[int]] = defaultdict(set) + self._recording_voice_chats_by_user: dict[int, set[int]] = defaultdict(set) + self._recording_video_chats_by_user: dict[int, set[int]] = defaultdict(set) async def start(self) -> None: try: @@ -71,6 +74,7 @@ class RealtimeGateway: user_connections = self._connections.get(user_id, {}) user_connections.pop(connection_id, None) if not user_connections: + await self._flush_user_activity(user_id) self._connections.pop(user_id, None) for chat_id in user_chat_ids: subscribers = self._chat_subscribers.get(chat_id) @@ -110,6 +114,7 @@ class RealtimeGateway: async def handle_typing_event(self, db: AsyncSession, user_id: int, payload: ChatEventPayload, event: str) -> None: await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=user_id) + self._update_user_activity(user_id=user_id, chat_id=payload.chat_id, event=event) await self._publish_chat_event( payload.chat_id, event=event, @@ -205,6 +210,49 @@ class RealtimeGateway: return await self._handle_redis_event(f"chat:{chat_id}", event_payload) + def _update_user_activity(self, *, user_id: int, chat_id: int, event: str) -> None: + if event == "typing_start": + self._typing_chats_by_user[user_id].add(chat_id) + return + if event == "typing_stop": + self._typing_chats_by_user[user_id].discard(chat_id) + return + if event == "recording_voice_start": + self._recording_voice_chats_by_user[user_id].add(chat_id) + return + if event == "recording_voice_stop": + self._recording_voice_chats_by_user[user_id].discard(chat_id) + return + if event == "recording_video_start": + self._recording_video_chats_by_user[user_id].add(chat_id) + return + if event == "recording_video_stop": + self._recording_video_chats_by_user[user_id].discard(chat_id) + + async def _flush_user_activity(self, user_id: int) -> None: + typing_chats = list(self._typing_chats_by_user.pop(user_id, set())) + voice_chats = list(self._recording_voice_chats_by_user.pop(user_id, set())) + video_chats = list(self._recording_video_chats_by_user.pop(user_id, set())) + + for chat_id in typing_chats: + await self._publish_chat_event( + chat_id, + event="typing_stop", + payload={"chat_id": chat_id, "user_id": user_id}, + ) + for chat_id in voice_chats: + await self._publish_chat_event( + chat_id, + event="recording_voice_stop", + payload={"chat_id": chat_id, "user_id": user_id}, + ) + for chat_id in video_chats: + await self._publish_chat_event( + chat_id, + event="recording_video_stop", + payload={"chat_id": chat_id, "user_id": user_id}, + ) + async def publish_message_created( self, *, diff --git a/docs/api-reference.md b/docs/api-reference.md index 1d50776..2f45c4b 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -81,6 +81,8 @@ For `/health/ready` failure: - `recording_video_start` - `recording_video_stop` +Server behavior: when a user disconnects, active typing/recording indicators are auto-cleared with corresponding `*_stop` events. + ## 3. Models (request/response) ## 3.1 Auth