diff --git a/.env.example b/.env.example index 0f49aa1..2081a44 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,10 @@ SMTP_USE_TLS=false SMTP_USE_SSL=false SMTP_TIMEOUT_SECONDS=10 SMTP_FROM_EMAIL=no-reply@benyamessenger.local +FIREBASE_ENABLED=false +FIREBASE_CREDENTIALS_PATH= +FIREBASE_CREDENTIALS_JSON= +FIREBASE_WEBPUSH_LINK=https://chat.daemonlord.ru/ LOGIN_RATE_LIMIT_PER_MINUTE=10 REGISTER_RATE_LIMIT_PER_MINUTE=5 diff --git a/alembic/versions/0027_push_device_tokens.py b/alembic/versions/0027_push_device_tokens.py new file mode 100644 index 0000000..4eadb37 --- /dev/null +++ b/alembic/versions/0027_push_device_tokens.py @@ -0,0 +1,44 @@ +"""add push device tokens table + +Revision ID: 0027_push_device_tokens +Revises: 0026_deduplicate_saved_chats +Create Date: 2026-03-10 02:10:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "0027_push_device_tokens" +down_revision: Union[str, Sequence[str], None] = "0026_deduplicate_saved_chats" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "push_device_tokens", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("platform", sa.String(length=16), nullable=False), + sa.Column("token", sa.String(length=512), nullable=False), + sa.Column("device_id", sa.String(length=128), nullable=True), + sa.Column("app_version", sa.String(length=64), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("user_id", "platform", "token", name="uq_push_device_tokens_user_platform_token"), + ) + op.create_index(op.f("ix_push_device_tokens_id"), "push_device_tokens", ["id"], unique=False) + op.create_index(op.f("ix_push_device_tokens_platform"), "push_device_tokens", ["platform"], unique=False) + op.create_index(op.f("ix_push_device_tokens_user_id"), "push_device_tokens", ["user_id"], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f("ix_push_device_tokens_user_id"), table_name="push_device_tokens") + op.drop_index(op.f("ix_push_device_tokens_platform"), table_name="push_device_tokens") + op.drop_index(op.f("ix_push_device_tokens_id"), table_name="push_device_tokens") + op.drop_table("push_device_tokens") diff --git a/app/config/settings.py b/app/config/settings.py index 8119da9..5c94316 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -39,6 +39,10 @@ class Settings(BaseSettings): smtp_use_ssl: bool = False smtp_timeout_seconds: float = 10.0 smtp_from_email: str = "no-reply@benyamessenger.local" + firebase_enabled: bool = False + firebase_credentials_path: str | None = None + firebase_credentials_json: str | None = None + firebase_webpush_link: str = "https://chat.daemonlord.ru/" login_rate_limit_per_minute: int = 10 register_rate_limit_per_minute: int = 5 diff --git a/app/database/models.py b/app/database/models.py index 93dc1f4..5364d08 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -3,7 +3,7 @@ from app.chats.models import Chat, ChatInviteLink, ChatMember, ChatUserSetting from app.email.models import EmailLog from app.media.models import Attachment from app.messages.models import Message, MessageHidden, MessageIdempotencyKey, MessageReaction, MessageReceipt -from app.notifications.models import NotificationLog +from app.notifications.models import NotificationLog, PushDeviceToken from app.users.models import User, UserContact __all__ = [ @@ -19,6 +19,7 @@ __all__ = [ "MessageReaction", "MessageReceipt", "NotificationLog", + "PushDeviceToken", "PasswordResetToken", "User", "UserContact", diff --git a/app/notifications/models.py b/app/notifications/models.py index 5a38cb0..83c6020 100644 --- a/app/notifications/models.py +++ b/app/notifications/models.py @@ -1,6 +1,6 @@ from datetime import datetime -from sqlalchemy import DateTime, String, func +from sqlalchemy import DateTime, ForeignKey, String, UniqueConstraint, func from sqlalchemy.orm import Mapped, mapped_column from app.database.base import Base @@ -14,3 +14,22 @@ class NotificationLog(Base): event_type: Mapped[str] = mapped_column(String(64), index=True) payload: Mapped[str] = mapped_column(String(1024)) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False) + + +class PushDeviceToken(Base): + __tablename__ = "push_device_tokens" + __table_args__ = (UniqueConstraint("user_id", "platform", "token", name="uq_push_device_tokens_user_platform_token"),) + + id: Mapped[int] = mapped_column(primary_key=True, index=True) + user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True) + platform: Mapped[str] = mapped_column(String(16), nullable=False, index=True) + token: Mapped[str] = mapped_column(String(512), nullable=False) + device_id: Mapped[str | None] = mapped_column(String(128), nullable=True) + app_version: Mapped[str | None] = mapped_column(String(64), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + nullable=False, + ) diff --git a/app/notifications/repository.py b/app/notifications/repository.py index 5fbcad7..d4cc109 100644 --- a/app/notifications/repository.py +++ b/app/notifications/repository.py @@ -1,6 +1,9 @@ +from datetime import datetime, timezone + +from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession -from app.notifications.models import NotificationLog +from app.notifications.models import NotificationLog, PushDeviceToken async def create_notification_log(db: AsyncSession, *, user_id: int, event_type: str, payload: str) -> None: @@ -8,8 +11,6 @@ async def create_notification_log(db: AsyncSession, *, user_id: int, event_type: async def list_user_notifications(db: AsyncSession, *, user_id: int, limit: int = 50) -> list[NotificationLog]: - from sqlalchemy import select - result = await db.execute( select(NotificationLog) .where(NotificationLog.user_id == user_id) @@ -17,3 +18,63 @@ async def list_user_notifications(db: AsyncSession, *, user_id: int, limit: int .limit(limit) ) return list(result.scalars().all()) + + +async def upsert_push_device_token( + db: AsyncSession, + *, + user_id: int, + platform: str, + token: str, + device_id: str | None, + app_version: str | None, +) -> PushDeviceToken: + result = await db.execute( + select(PushDeviceToken).where( + PushDeviceToken.user_id == user_id, + PushDeviceToken.platform == platform, + PushDeviceToken.token == token, + ) + ) + existing = result.scalar_one_or_none() + if existing is not None: + existing.device_id = device_id + existing.app_version = app_version + existing.updated_at = datetime.now(timezone.utc) + await db.flush() + return existing + + record = PushDeviceToken( + user_id=user_id, + platform=platform, + token=token, + device_id=device_id, + app_version=app_version, + ) + db.add(record) + await db.flush() + return record + + +async def delete_push_device_token( + db: AsyncSession, + *, + user_id: int, + platform: str, + token: str, +) -> int: + result = await db.execute( + delete(PushDeviceToken).where( + PushDeviceToken.user_id == user_id, + PushDeviceToken.platform == platform, + PushDeviceToken.token == token, + ) + ) + return int(result.rowcount or 0) + + +async def list_push_tokens_for_user(db: AsyncSession, *, user_id: int) -> list[PushDeviceToken]: + result = await db.execute( + select(PushDeviceToken).where(PushDeviceToken.user_id == user_id).order_by(PushDeviceToken.id.asc()) + ) + return list(result.scalars().all()) diff --git a/app/notifications/router.py b/app/notifications/router.py index 03cbcc3..7a429c4 100644 --- a/app/notifications/router.py +++ b/app/notifications/router.py @@ -1,10 +1,10 @@ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Body, Depends, Response, status from sqlalchemy.ext.asyncio import AsyncSession from app.auth.service import get_current_user from app.database.session import get_db -from app.notifications.schemas import NotificationRead -from app.notifications.service import get_notifications_for_user +from app.notifications.schemas import NotificationRead, PushTokenDeleteRequest, PushTokenUpsertRequest +from app.notifications.service import get_notifications_for_user, register_push_token, unregister_push_token from app.users.models import User router = APIRouter(prefix="/notifications", tags=["notifications"]) @@ -17,3 +17,23 @@ async def list_my_notifications( current_user: User = Depends(get_current_user), ) -> list[NotificationRead]: return await get_notifications_for_user(db, user_id=current_user.id, limit=limit) + + +@router.post("/push-token", status_code=status.HTTP_204_NO_CONTENT) +async def upsert_my_push_token( + payload: PushTokenUpsertRequest, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Response: + await register_push_token(db, user_id=current_user.id, payload=payload) + return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@router.api_route("/push-token", methods=["DELETE"], status_code=status.HTTP_204_NO_CONTENT) +async def delete_my_push_token( + payload: PushTokenDeleteRequest = Body(...), + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +) -> Response: + await unregister_push_token(db, user_id=current_user.id, payload=payload) + return Response(status_code=status.HTTP_204_NO_CONTENT) diff --git a/app/notifications/schemas.py b/app/notifications/schemas.py index f5f0318..57621af 100644 --- a/app/notifications/schemas.py +++ b/app/notifications/schemas.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Any -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field class NotificationRequest(BaseModel): @@ -25,3 +25,15 @@ class PushTaskPayload(BaseModel): title: str body: str data: dict[str, Any] + + +class PushTokenUpsertRequest(BaseModel): + platform: str = Field(min_length=2, max_length=16) + token: str = Field(min_length=8, max_length=512) + device_id: str | None = Field(default=None, max_length=128) + app_version: str | None = Field(default=None, max_length=64) + + +class PushTokenDeleteRequest(BaseModel): + platform: str = Field(min_length=2, max_length=16) + token: str = Field(min_length=8, max_length=512) diff --git a/app/notifications/service.py b/app/notifications/service.py index 8f08995..7b7c44d 100644 --- a/app/notifications/service.py +++ b/app/notifications/service.py @@ -5,8 +5,18 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.chats.repository import is_chat_muted_for_user, list_chat_members from app.messages.models import Message -from app.notifications.repository import create_notification_log, list_user_notifications -from app.notifications.schemas import NotificationRead, NotificationRequest +from app.notifications.repository import ( + create_notification_log, + delete_push_device_token, + list_user_notifications, + upsert_push_device_token, +) +from app.notifications.schemas import ( + NotificationRead, + NotificationRequest, + PushTokenDeleteRequest, + PushTokenUpsertRequest, +) from app.notifications.tasks import send_mention_notification_task, send_push_notification_task from app.realtime.presence import is_user_online from app.users.repository import list_users_by_ids @@ -98,3 +108,25 @@ async def get_notifications_for_user(db: AsyncSession, *, user_id: int, limit: i safe_limit = max(1, min(limit, 100)) rows = await list_user_notifications(db, user_id=user_id, limit=safe_limit) return [NotificationRead.model_validate(item) for item in rows] + + +async def register_push_token(db: AsyncSession, *, user_id: int, payload: PushTokenUpsertRequest) -> None: + await upsert_push_device_token( + db, + user_id=user_id, + platform=payload.platform.strip().lower(), + token=payload.token.strip(), + device_id=payload.device_id.strip() if payload.device_id else None, + app_version=payload.app_version.strip() if payload.app_version else None, + ) + await db.commit() + + +async def unregister_push_token(db: AsyncSession, *, user_id: int, payload: PushTokenDeleteRequest) -> None: + await delete_push_device_token( + db, + user_id=user_id, + platform=payload.platform.strip().lower(), + token=payload.token.strip(), + ) + await db.commit() diff --git a/app/notifications/tasks.py b/app/notifications/tasks.py index 81b6b8a..62347b1 100644 --- a/app/notifications/tasks.py +++ b/app/notifications/tasks.py @@ -1,15 +1,100 @@ +import asyncio +import json import logging +from typing import Any + +import firebase_admin +from firebase_admin import credentials, messaging from app.celery_app import celery_app +from app.config.settings import settings +from app.database.session import AsyncSessionLocal +from app.notifications.repository import delete_push_device_token, list_push_tokens_for_user logger = logging.getLogger(__name__) +_firebase_app: firebase_admin.App | None = None + + +def _get_firebase_app() -> firebase_admin.App | None: + global _firebase_app + if _firebase_app is not None: + return _firebase_app + if not settings.firebase_enabled: + return None + cert_payload: dict[str, Any] | None = None + if settings.firebase_credentials_json: + try: + cert_payload = json.loads(settings.firebase_credentials_json) + except json.JSONDecodeError: + logger.warning("FCM disabled: invalid FIREBASE_CREDENTIALS_JSON") + return None + elif settings.firebase_credentials_path: + cert_payload = settings.firebase_credentials_path + else: + logger.warning("FCM disabled: credentials are not configured") + return None + + try: + cred = credentials.Certificate(cert_payload) # type: ignore[arg-type] + _firebase_app = firebase_admin.initialize_app(cred) + return _firebase_app + except Exception: + logger.exception("Failed to initialize Firebase app") + return None + + +async def _load_tokens(user_id: int) -> list[tuple[str, str]]: + async with AsyncSessionLocal() as db: + records = await list_push_tokens_for_user(db, user_id=user_id) + return [(record.platform, record.token) for record in records] + + +async def _delete_invalid_token(user_id: int, platform: str, token: str) -> None: + async with AsyncSessionLocal() as db: + await delete_push_device_token(db, user_id=user_id, platform=platform, token=token) + await db.commit() + + +def _send_fcm_to_user(user_id: int, title: str, body: str, data: dict[str, Any]) -> None: + app = _get_firebase_app() + if app is None: + logger.info("Skipping FCM send for user=%s: Firebase disabled", user_id) + return + + tokens = asyncio.run(_load_tokens(user_id)) + if not tokens: + return + + string_data = {str(key): str(value) for key, value in data.items()} + for platform, token in tokens: + webpush = None + if platform == "web": + webpush = messaging.WebpushConfig( + fcm_options=messaging.WebpushFCMOptions(link=settings.firebase_webpush_link) + ) + message = messaging.Message( + token=token, + notification=messaging.Notification(title=title, body=body), + data=string_data, + webpush=webpush, + ) + try: + messaging.send(message, app=app) + except messaging.UnregisteredError: + asyncio.run(_delete_invalid_token(user_id=user_id, platform=platform, token=token)) + except messaging.SenderIdMismatchError: + asyncio.run(_delete_invalid_token(user_id=user_id, platform=platform, token=token)) + except Exception: + logger.exception("FCM send failed for user=%s platform=%s", user_id, platform) + + @celery_app.task(name="notifications.send_push") def send_push_notification_task(user_id: int, title: str, body: str, data: dict) -> None: - logger.info("PUSH user=%s title=%s body=%s data=%s", user_id, title, body, data) + _send_fcm_to_user(user_id=user_id, title=title, body=body, data=data) @celery_app.task(name="notifications.send_mention") def send_mention_notification_task(user_id: int, title: str, body: str, data: dict) -> None: - logger.info("MENTION user=%s title=%s body=%s data=%s", user_id, title, body, data) + _send_fcm_to_user(user_id=user_id, title=title, body=body, data=data) diff --git a/requirements.txt b/requirements.txt index cbdfd53..ff92984 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ redis==6.4.0 celery==5.5.3 boto3==1.40.31 aiosmtplib==4.0.2 +firebase-admin==6.9.0 alembic==1.16.5 pytest==8.4.2 pytest-asyncio==1.2.0