- avoid reserved FCM key names like message_type - send push metadata in camelCase keys already supported by Android
142 lines
5.0 KiB
Python
142 lines
5.0 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
import firebase_admin
|
|
from firebase_admin import credentials, messaging
|
|
|
|
from app.celery_app import celery_app
|
|
from app.config.settings import settings
|
|
from app.database.session import AsyncSessionLocal
|
|
from app.notifications.repository import delete_push_device_token, list_push_tokens_for_user
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_firebase_app: firebase_admin.App | None = None
|
|
_worker_loop: asyncio.AbstractEventLoop | None = None
|
|
|
|
|
|
def _get_worker_loop() -> asyncio.AbstractEventLoop:
|
|
global _worker_loop
|
|
if _worker_loop is None or _worker_loop.is_closed():
|
|
_worker_loop = asyncio.new_event_loop()
|
|
return _worker_loop
|
|
|
|
|
|
def _run_async(coro):
|
|
loop = _get_worker_loop()
|
|
return loop.run_until_complete(coro)
|
|
|
|
|
|
def _get_firebase_app() -> firebase_admin.App | None:
|
|
global _firebase_app
|
|
if _firebase_app is not None:
|
|
return _firebase_app
|
|
if not settings.firebase_enabled:
|
|
return None
|
|
cert_payload: dict[str, Any] | None = None
|
|
if settings.firebase_credentials_json:
|
|
try:
|
|
cert_payload = json.loads(settings.firebase_credentials_json)
|
|
except json.JSONDecodeError:
|
|
logger.warning("FCM disabled: invalid FIREBASE_CREDENTIALS_JSON")
|
|
return None
|
|
elif settings.firebase_credentials_path:
|
|
cert_payload = settings.firebase_credentials_path
|
|
else:
|
|
logger.warning("FCM disabled: credentials are not configured")
|
|
return None
|
|
|
|
try:
|
|
cred = credentials.Certificate(cert_payload) # type: ignore[arg-type]
|
|
_firebase_app = firebase_admin.initialize_app(cred)
|
|
return _firebase_app
|
|
except Exception:
|
|
logger.exception("Failed to initialize Firebase app")
|
|
return None
|
|
|
|
|
|
async def _load_tokens(user_id: int) -> list[tuple[str, str]]:
|
|
async with AsyncSessionLocal() as db:
|
|
records = await list_push_tokens_for_user(db, user_id=user_id)
|
|
return [(record.platform, record.token) for record in records]
|
|
|
|
|
|
async def _delete_invalid_token(user_id: int, platform: str, token: str) -> None:
|
|
async with AsyncSessionLocal() as db:
|
|
await delete_push_device_token(db, user_id=user_id, platform=platform, token=token)
|
|
await db.commit()
|
|
|
|
|
|
def _send_fcm_to_user(user_id: int, title: str, body: str, data: dict[str, Any]) -> None:
|
|
app = _get_firebase_app()
|
|
if app is None:
|
|
logger.info("Skipping FCM send for user=%s: Firebase disabled", user_id)
|
|
return
|
|
|
|
tokens = _run_async(_load_tokens(user_id))
|
|
if not tokens:
|
|
return
|
|
|
|
string_data = {
|
|
"title": title,
|
|
"body": body,
|
|
}
|
|
for key, value in data.items():
|
|
normalized_key = str(key)
|
|
if normalized_key == "chat_id":
|
|
normalized_key = "chatId"
|
|
elif normalized_key == "message_id":
|
|
normalized_key = "messageId"
|
|
elif normalized_key == "sender_id":
|
|
normalized_key = "senderId"
|
|
elif normalized_key == "message_type":
|
|
normalized_key = "messageType"
|
|
elif normalized_key == "preview_image_url":
|
|
normalized_key = "previewImageUrl"
|
|
elif normalized_key == "sender_name":
|
|
normalized_key = "senderName"
|
|
elif normalized_key == "text_preview":
|
|
normalized_key = "textPreview"
|
|
string_data[normalized_key] = str(value)
|
|
for platform, token in tokens:
|
|
webpush = None
|
|
notification = None
|
|
android = None
|
|
if platform == "web":
|
|
webpush = messaging.WebpushConfig(
|
|
fcm_options=messaging.WebpushFCMOptions(link=settings.firebase_webpush_link)
|
|
)
|
|
notification = messaging.Notification(title=title, body=body)
|
|
elif platform == "android":
|
|
android = messaging.AndroidConfig(priority="high")
|
|
else:
|
|
notification = messaging.Notification(title=title, body=body)
|
|
message = messaging.Message(
|
|
token=token,
|
|
notification=notification,
|
|
data=string_data,
|
|
webpush=webpush,
|
|
android=android,
|
|
)
|
|
try:
|
|
messaging.send(message, app=app)
|
|
except messaging.UnregisteredError:
|
|
_run_async(_delete_invalid_token(user_id=user_id, platform=platform, token=token))
|
|
except messaging.SenderIdMismatchError:
|
|
_run_async(_delete_invalid_token(user_id=user_id, platform=platform, token=token))
|
|
except Exception:
|
|
logger.exception("FCM send failed for user=%s platform=%s", user_id, platform)
|
|
|
|
|
|
@celery_app.task(name="notifications.send_push")
|
|
def send_push_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
|
|
_send_fcm_to_user(user_id=user_id, title=title, body=body, data=data)
|
|
|
|
|
|
@celery_app.task(name="notifications.send_mention")
|
|
def send_mention_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
|
|
_send_fcm_to_user(user_id=user_id, title=title, body=body, data=data)
|