fix(realtime): clear typing and recording indicators on disconnect
All checks were successful
CI / test (push) Successful in 46s
All checks were successful
CI / test (push) Successful in 46s
This commit is contained in:
@@ -26,6 +26,9 @@ class RealtimeGateway:
|
|||||||
self._distributed_enabled = False
|
self._distributed_enabled = False
|
||||||
self._connections: dict[int, dict[str, ConnectionContext]] = defaultdict(dict)
|
self._connections: dict[int, dict[str, ConnectionContext]] = defaultdict(dict)
|
||||||
self._chat_subscribers: dict[int, set[int]] = defaultdict(set)
|
self._chat_subscribers: dict[int, set[int]] = defaultdict(set)
|
||||||
|
self._typing_chats_by_user: dict[int, set[int]] = defaultdict(set)
|
||||||
|
self._recording_voice_chats_by_user: dict[int, set[int]] = defaultdict(set)
|
||||||
|
self._recording_video_chats_by_user: dict[int, set[int]] = defaultdict(set)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
try:
|
try:
|
||||||
@@ -71,6 +74,7 @@ class RealtimeGateway:
|
|||||||
user_connections = self._connections.get(user_id, {})
|
user_connections = self._connections.get(user_id, {})
|
||||||
user_connections.pop(connection_id, None)
|
user_connections.pop(connection_id, None)
|
||||||
if not user_connections:
|
if not user_connections:
|
||||||
|
await self._flush_user_activity(user_id)
|
||||||
self._connections.pop(user_id, None)
|
self._connections.pop(user_id, None)
|
||||||
for chat_id in user_chat_ids:
|
for chat_id in user_chat_ids:
|
||||||
subscribers = self._chat_subscribers.get(chat_id)
|
subscribers = self._chat_subscribers.get(chat_id)
|
||||||
@@ -110,6 +114,7 @@ class RealtimeGateway:
|
|||||||
|
|
||||||
async def handle_typing_event(self, db: AsyncSession, user_id: int, payload: ChatEventPayload, event: str) -> None:
|
async def handle_typing_event(self, db: AsyncSession, user_id: int, payload: ChatEventPayload, event: str) -> None:
|
||||||
await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=user_id)
|
await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=user_id)
|
||||||
|
self._update_user_activity(user_id=user_id, chat_id=payload.chat_id, event=event)
|
||||||
await self._publish_chat_event(
|
await self._publish_chat_event(
|
||||||
payload.chat_id,
|
payload.chat_id,
|
||||||
event=event,
|
event=event,
|
||||||
@@ -205,6 +210,49 @@ class RealtimeGateway:
|
|||||||
return
|
return
|
||||||
await self._handle_redis_event(f"chat:{chat_id}", event_payload)
|
await self._handle_redis_event(f"chat:{chat_id}", event_payload)
|
||||||
|
|
||||||
|
def _update_user_activity(self, *, user_id: int, chat_id: int, event: str) -> None:
|
||||||
|
if event == "typing_start":
|
||||||
|
self._typing_chats_by_user[user_id].add(chat_id)
|
||||||
|
return
|
||||||
|
if event == "typing_stop":
|
||||||
|
self._typing_chats_by_user[user_id].discard(chat_id)
|
||||||
|
return
|
||||||
|
if event == "recording_voice_start":
|
||||||
|
self._recording_voice_chats_by_user[user_id].add(chat_id)
|
||||||
|
return
|
||||||
|
if event == "recording_voice_stop":
|
||||||
|
self._recording_voice_chats_by_user[user_id].discard(chat_id)
|
||||||
|
return
|
||||||
|
if event == "recording_video_start":
|
||||||
|
self._recording_video_chats_by_user[user_id].add(chat_id)
|
||||||
|
return
|
||||||
|
if event == "recording_video_stop":
|
||||||
|
self._recording_video_chats_by_user[user_id].discard(chat_id)
|
||||||
|
|
||||||
|
async def _flush_user_activity(self, user_id: int) -> None:
|
||||||
|
typing_chats = list(self._typing_chats_by_user.pop(user_id, set()))
|
||||||
|
voice_chats = list(self._recording_voice_chats_by_user.pop(user_id, set()))
|
||||||
|
video_chats = list(self._recording_video_chats_by_user.pop(user_id, set()))
|
||||||
|
|
||||||
|
for chat_id in typing_chats:
|
||||||
|
await self._publish_chat_event(
|
||||||
|
chat_id,
|
||||||
|
event="typing_stop",
|
||||||
|
payload={"chat_id": chat_id, "user_id": user_id},
|
||||||
|
)
|
||||||
|
for chat_id in voice_chats:
|
||||||
|
await self._publish_chat_event(
|
||||||
|
chat_id,
|
||||||
|
event="recording_voice_stop",
|
||||||
|
payload={"chat_id": chat_id, "user_id": user_id},
|
||||||
|
)
|
||||||
|
for chat_id in video_chats:
|
||||||
|
await self._publish_chat_event(
|
||||||
|
chat_id,
|
||||||
|
event="recording_video_stop",
|
||||||
|
payload={"chat_id": chat_id, "user_id": user_id},
|
||||||
|
)
|
||||||
|
|
||||||
async def publish_message_created(
|
async def publish_message_created(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
|
|||||||
@@ -81,6 +81,8 @@ For `/health/ready` failure:
|
|||||||
- `recording_video_start`
|
- `recording_video_start`
|
||||||
- `recording_video_stop`
|
- `recording_video_stop`
|
||||||
|
|
||||||
|
Server behavior: when a user disconnects, active typing/recording indicators are auto-cleared with corresponding `*_stop` events.
|
||||||
|
|
||||||
## 3. Models (request/response)
|
## 3. Models (request/response)
|
||||||
|
|
||||||
## 3.1 Auth
|
## 3.1 Auth
|
||||||
|
|||||||
Reference in New Issue
Block a user