All checks were successful
CI / test (push) Successful in 9m2s
Security hardening: - Added IP/user rate limiting with Redis-backed counters and fail-open behavior. - Added message anti-spam controls (per-chat rate + duplicate cooldown). - Implemented refresh token rotation with JTI tracking and revoke support. Notification pipeline: - Added Celery app and async notification tasks for mention/offline delivery. - Added Redis-based presence tracking and integrated it into realtime connect/disconnect. - Added notification dispatch from message flow and notifications listing endpoint. Quality gates and CI: - Added pytest async integration tests for auth and chat/message lifecycle. - Added pytest config, test fixtures, and GitHub Actions CI workflow. - Fixed bcrypt/passlib compatibility by pinning bcrypt version. - Documented worker and quality-gate commands in README.
16 lines
553 B
Python
16 lines
553 B
Python
import logging
|
|
|
|
from app.celery_app import celery_app
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@celery_app.task(name="notifications.send_push")
|
|
def send_push_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
|
|
logger.info("PUSH user=%s title=%s body=%s data=%s", user_id, title, body, data)
|
|
|
|
|
|
@celery_app.task(name="notifications.send_mention")
|
|
def send_mention_notification_task(user_id: int, title: str, body: str, data: dict) -> None:
|
|
logger.info("MENTION user=%s title=%s body=%s data=%s", user_id, title, body, data)
|