feat(realtime): live online/offline events and unified search
Some checks failed
CI / test (push) Failing after 18s
Some checks failed
CI / test (push) Failing after 18s
- add websocket events user_online/user_offline - broadcast presence changes on first connect and final disconnect only - apply live presence updates in web chat store and realtime hook - move public discover into unified left search (users + groups/channels) - remove separate Discover Chats dialog/menu entry
This commit is contained in:
@@ -3,26 +3,30 @@ from redis.exceptions import RedisError
|
||||
from app.utils.redis_client import get_redis_client
|
||||
|
||||
|
||||
async def mark_user_online(user_id: int) -> None:
|
||||
async def mark_user_online(user_id: int) -> bool:
|
||||
try:
|
||||
redis = get_redis_client()
|
||||
key = f"presence:user:{user_id}"
|
||||
count = await redis.incr(key)
|
||||
if count == 1:
|
||||
await redis.expire(key, 3600)
|
||||
return True
|
||||
return False
|
||||
except RedisError:
|
||||
return
|
||||
return False
|
||||
|
||||
|
||||
async def mark_user_offline(user_id: int) -> None:
|
||||
async def mark_user_offline(user_id: int) -> bool:
|
||||
try:
|
||||
redis = get_redis_client()
|
||||
key = f"presence:user:{user_id}"
|
||||
value = await redis.decr(key)
|
||||
if value <= 0:
|
||||
await redis.delete(key)
|
||||
return True
|
||||
return False
|
||||
except RedisError:
|
||||
return
|
||||
return False
|
||||
|
||||
|
||||
async def is_user_online(user_id: int) -> bool:
|
||||
|
||||
@@ -15,6 +15,8 @@ RealtimeEventName = Literal[
|
||||
"typing_stop",
|
||||
"message_read",
|
||||
"message_delivered",
|
||||
"user_online",
|
||||
"user_offline",
|
||||
"error",
|
||||
]
|
||||
|
||||
|
||||
@@ -54,7 +54,9 @@ class RealtimeGateway:
|
||||
)
|
||||
for chat_id in user_chat_ids:
|
||||
self._chat_subscribers[chat_id].add(user_id)
|
||||
await mark_user_online(user_id)
|
||||
became_online = await mark_user_online(user_id)
|
||||
if became_online:
|
||||
await self._broadcast_presence(user_chat_ids, user_id=user_id, is_online=True, last_seen_at=None)
|
||||
await self._send_user_event(
|
||||
user_id,
|
||||
OutgoingRealtimeEvent(
|
||||
@@ -77,8 +79,15 @@ class RealtimeGateway:
|
||||
subscribers.discard(user_id)
|
||||
if not subscribers:
|
||||
self._chat_subscribers.pop(chat_id, None)
|
||||
await mark_user_offline(user_id)
|
||||
became_offline = await mark_user_offline(user_id)
|
||||
await self._persist_last_seen(user_id)
|
||||
if became_offline:
|
||||
await self._broadcast_presence(
|
||||
user_chat_ids,
|
||||
user_id=user_id,
|
||||
is_online=False,
|
||||
last_seen_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
async def handle_send_message(self, db: AsyncSession, user_id: int, payload: SendMessagePayload) -> None:
|
||||
message = await create_chat_message(
|
||||
@@ -195,12 +204,22 @@ class RealtimeGateway:
|
||||
user_connections.pop(connection_id, None)
|
||||
if not user_connections:
|
||||
self._connections.pop(user_id, None)
|
||||
affected_chat_ids: list[int] = []
|
||||
for chat_id, subscribers in list(self._chat_subscribers.items()):
|
||||
if user_id in subscribers:
|
||||
affected_chat_ids.append(chat_id)
|
||||
subscribers.discard(user_id)
|
||||
if not subscribers:
|
||||
self._chat_subscribers.pop(chat_id, None)
|
||||
await mark_user_offline(user_id)
|
||||
became_offline = await mark_user_offline(user_id)
|
||||
await self._persist_last_seen(user_id)
|
||||
if became_offline:
|
||||
await self._broadcast_presence(
|
||||
affected_chat_ids,
|
||||
user_id=user_id,
|
||||
is_online=False,
|
||||
last_seen_at=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_chat_id(channel: str) -> int | None:
|
||||
@@ -219,5 +238,24 @@ class RealtimeGateway:
|
||||
except Exception:
|
||||
return
|
||||
|
||||
async def _broadcast_presence(
|
||||
self,
|
||||
chat_ids: list[int],
|
||||
*,
|
||||
user_id: int,
|
||||
is_online: bool,
|
||||
last_seen_at: datetime | None,
|
||||
) -> None:
|
||||
event_name = "user_online" if is_online else "user_offline"
|
||||
for chat_id in chat_ids:
|
||||
payload = {
|
||||
"chat_id": chat_id,
|
||||
"user_id": user_id,
|
||||
"is_online": is_online,
|
||||
}
|
||||
if last_seen_at is not None:
|
||||
payload["last_seen_at"] = last_seen_at.isoformat()
|
||||
await self._publish_chat_event(chat_id, event=event_name, payload=payload)
|
||||
|
||||
|
||||
realtime_gateway = RealtimeGateway()
|
||||
|
||||
Reference in New Issue
Block a user