import json import re from sqlalchemy.ext.asyncio import AsyncSession from app.chats.repository import is_chat_muted_for_user, list_chat_members from app.messages.models import Message from app.notifications.repository import create_notification_log, list_user_notifications from app.notifications.schemas import NotificationRead, NotificationRequest from app.notifications.tasks import send_mention_notification_task, send_push_notification_task from app.realtime.presence import is_user_online from app.users.repository import list_users_by_ids _MENTION_RE = re.compile(r"@([A-Za-z0-9_]{3,50})") def _extract_mentions(text: str | None) -> set[str]: if not text: return set() return {match.group(1).lower() for match in _MENTION_RE.finditer(text)} async def enqueue_notification(db: AsyncSession, payload: NotificationRequest) -> None: await create_notification_log( db, user_id=payload.user_id, event_type=payload.event_type, payload=json.dumps(payload.payload, ensure_ascii=True), ) async def dispatch_message_notifications(db: AsyncSession, message: Message) -> None: members = await list_chat_members(db, chat_id=message.chat_id) recipient_ids = [m.user_id for m in members if m.user_id != message.sender_id] if not recipient_ids: return users = await list_users_by_ids(db, recipient_ids) user_by_username = {user.username.lower(): user for user in users} mentioned_usernames = _extract_mentions(message.text) mentioned_user_ids = {user_by_username[name].id for name in mentioned_usernames if name in user_by_username} sender_users = await list_users_by_ids(db, [message.sender_id]) sender_name = sender_users[0].username if sender_users else "Someone" for recipient in users: if await is_chat_muted_for_user(db, chat_id=message.chat_id, user_id=recipient.id): continue base_payload = { "chat_id": message.chat_id, "message_id": message.id, "sender_id": message.sender_id, } if recipient.id in mentioned_user_ids: payload = { **base_payload, "type": "mention", "text_preview": (message.text or "")[:120], } await create_notification_log( db, user_id=recipient.id, event_type="mention", payload=json.dumps(payload, ensure_ascii=True), ) send_mention_notification_task.delay( recipient.id, f"{sender_name} mentioned you", (message.text or "")[:120], payload, ) continue if not await is_user_online(recipient.id): payload = { **base_payload, "type": "offline_message", "text_preview": (message.text or "")[:120], } await create_notification_log( db, user_id=recipient.id, event_type="offline_message", payload=json.dumps(payload, ensure_ascii=True), ) send_push_notification_task.delay( recipient.id, f"New message from {sender_name}", (message.text or "")[:120], payload, ) await db.commit() async def get_notifications_for_user(db: AsyncSession, *, user_id: int, limit: int = 50) -> list[NotificationRead]: safe_limit = max(1, min(limit, 100)) rows = await list_user_notifications(db, user_id=user_id, limit=safe_limit) return [NotificationRead.model_validate(item) for item in rows]