Files
Messenger/app/notifications/tasks.py
benya d2e0969fd5
Some checks failed
Android CI / android (push) Failing after 11m0s
Android Release / release (push) Has started running
CI / test (push) Has been cancelled
feat: improve media viewer and push delivery stability
- add unified Android media viewer with swipe navigation, pinch-to-zoom and swipe-to-dismiss\n- move circle videos out of media gallery and surface them in voice/chat info flows\n- align web chat info handling for circle videos and media viewer exclusions\n- stabilize realtime and tablet chat shell updates already staged in this batch\n- fix Celery push delivery loop handling so FCM jobs can read tokens reliably in worker processes
2026-04-05 14:06:36 +03:00

114 lines
3.9 KiB
Python

import asyncio
import json
import logging
from typing import Any
import firebase_admin
from firebase_admin import credentials, messaging
from app.celery_app import celery_app
from app.config.settings import settings
from app.database.session import AsyncSessionLocal
from app.notifications.repository import delete_push_device_token, list_push_tokens_for_user
logger = logging.getLogger(__name__)
_firebase_app: firebase_admin.App | None = None
_worker_loop: asyncio.AbstractEventLoop | None = None
def _get_worker_loop() -> asyncio.AbstractEventLoop:
global _worker_loop
if _worker_loop is None or _worker_loop.is_closed():
_worker_loop = asyncio.new_event_loop()
return _worker_loop
def _run_async(coro):
loop = _get_worker_loop()
return loop.run_until_complete(coro)
def _get_firebase_app() -> firebase_admin.App | None:
global _firebase_app
if _firebase_app is not None:
return _firebase_app
if not settings.firebase_enabled:
return None
cert_payload: dict[str, Any] | None = None
if settings.firebase_credentials_json:
try:
cert_payload = json.loads(settings.firebase_credentials_json)
except json.JSONDecodeError:
logger.warning("FCM disabled: invalid FIREBASE_CREDENTIALS_JSON")
return None
elif settings.firebase_credentials_path:
cert_payload = settings.firebase_credentials_path
else:
logger.warning("FCM disabled: credentials are not configured")
return None
try:
cred = credentials.Certificate(cert_payload) # type: ignore[arg-type]
_firebase_app = firebase_admin.initialize_app(cred)
return _firebase_app
except Exception:
logger.exception("Failed to initialize Firebase app")
return None
async def _load_tokens(user_id: int) -> list[tuple[str, str]]:
async with AsyncSessionLocal() as db:
records = await list_push_tokens_for_user(db, user_id=user_id)
return [(record.platform, record.token) for record in records]
async def _delete_invalid_token(user_id: int, platform: str, token: str) -> None:
async with AsyncSessionLocal() as db:
await delete_push_device_token(db, user_id=user_id, platform=platform, token=token)
await db.commit()
def _send_fcm_to_user(user_id: int, title: str, body: str, data: dict[str, Any]) -> None:
app = _get_firebase_app()
if app is None:
logger.info("Skipping FCM send for user=%s: Firebase disabled", user_id)
return
tokens = _run_async(_load_tokens(user_id))
if not tokens:
return
string_data = {str(key): str(value) for key, value in data.items()}
for platform, token in tokens:
webpush = None
if platform == "web":
webpush = messaging.WebpushConfig(
fcm_options=messaging.WebpushFCMOptions(link=settings.firebase_webpush_link)
)
message = messaging.Message(
token=token,
notification=messaging.Notification(title=title, body=body),
data=string_data,
webpush=webpush,
)
try:
messaging.send(message, app=app)
except messaging.UnregisteredError:
_run_async(_delete_invalid_token(user_id=user_id, platform=platform, token=token))
except messaging.SenderIdMismatchError:
_run_async(_delete_invalid_token(user_id=user_id, platform=platform, token=token))
except Exception:
logger.exception("FCM send failed for user=%s platform=%s", user_id, platform)
@celery_app.task(name="notifications.send_push")
def send_push_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
_send_fcm_to_user(user_id=user_id, title=title, body=body, data=data)
@celery_app.task(name="notifications.send_mention")
def send_mention_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
_send_fcm_to_user(user_id=user_id, title=title, body=body, data=data)