import asyncio import json import logging from typing import Any import firebase_admin from firebase_admin import credentials, messaging from app.celery_app import celery_app from app.config.settings import settings from app.database.session import AsyncSessionLocal from app.notifications.repository import delete_push_device_token, list_push_tokens_for_user logger = logging.getLogger(__name__) _firebase_app: firebase_admin.App | None = None def _get_firebase_app() -> firebase_admin.App | None: global _firebase_app if _firebase_app is not None: return _firebase_app if not settings.firebase_enabled: return None cert_payload: dict[str, Any] | None = None if settings.firebase_credentials_json: try: cert_payload = json.loads(settings.firebase_credentials_json) except json.JSONDecodeError: logger.warning("FCM disabled: invalid FIREBASE_CREDENTIALS_JSON") return None elif settings.firebase_credentials_path: cert_payload = settings.firebase_credentials_path else: logger.warning("FCM disabled: credentials are not configured") return None try: cred = credentials.Certificate(cert_payload) # type: ignore[arg-type] _firebase_app = firebase_admin.initialize_app(cred) return _firebase_app except Exception: logger.exception("Failed to initialize Firebase app") return None async def _load_tokens(user_id: int) -> list[tuple[str, str]]: async with AsyncSessionLocal() as db: records = await list_push_tokens_for_user(db, user_id=user_id) return [(record.platform, record.token) for record in records] async def _delete_invalid_token(user_id: int, platform: str, token: str) -> None: async with AsyncSessionLocal() as db: await delete_push_device_token(db, user_id=user_id, platform=platform, token=token) await db.commit() def _send_fcm_to_user(user_id: int, title: str, body: str, data: dict[str, Any]) -> None: app = _get_firebase_app() if app is None: logger.info("Skipping FCM send for user=%s: Firebase disabled", user_id) return tokens = asyncio.run(_load_tokens(user_id)) if not tokens: return string_data = {str(key): str(value) for key, value in data.items()} for platform, token in tokens: webpush = None if platform == "web": webpush = messaging.WebpushConfig( fcm_options=messaging.WebpushFCMOptions(link=settings.firebase_webpush_link) ) message = messaging.Message( token=token, notification=messaging.Notification(title=title, body=body), data=string_data, webpush=webpush, ) try: messaging.send(message, app=app) except messaging.UnregisteredError: asyncio.run(_delete_invalid_token(user_id=user_id, platform=platform, token=token)) except messaging.SenderIdMismatchError: asyncio.run(_delete_invalid_token(user_id=user_id, platform=platform, token=token)) except Exception: logger.exception("FCM send failed for user=%s platform=%s", user_id, platform) @celery_app.task(name="notifications.send_push") def send_push_notification_task(user_id: int, title: str, body: str, data: dict) -> None: _send_fcm_to_user(user_id=user_id, title=title, body=body, data=data) @celery_app.task(name="notifications.send_mention") def send_mention_notification_task(user_id: int, title: str, body: str, data: dict) -> None: _send_fcm_to_user(user_id=user_id, title=title, body=body, data=data)