All checks were successful
CI / test (push) Successful in 9m2s
Security hardening: - Added IP/user rate limiting with Redis-backed counters and fail-open behavior. - Added message anti-spam controls (per-chat rate + duplicate cooldown). - Implemented refresh token rotation with JTI tracking and revoke support. Notification pipeline: - Added Celery app and async notification tasks for mention/offline delivery. - Added Redis-based presence tracking and integrated it into realtime connect/disconnect. - Added notification dispatch from message flow and notifications listing endpoint. Quality gates and CI: - Added pytest async integration tests for auth and chat/message lifecycle. - Added pytest config, test fixtures, and GitHub Actions CI workflow. - Fixed bcrypt/passlib compatibility by pinning bcrypt version. - Documented worker and quality-gate commands in README.
28 lines
480 B
Python
28 lines
480 B
Python
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from pydantic import BaseModel, ConfigDict
|
|
|
|
|
|
class NotificationRequest(BaseModel):
|
|
user_id: int
|
|
event_type: str
|
|
payload: dict
|
|
|
|
|
|
class NotificationRead(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
id: int
|
|
user_id: int
|
|
event_type: str
|
|
payload: str
|
|
created_at: datetime
|
|
|
|
|
|
class PushTaskPayload(BaseModel):
|
|
user_id: int
|
|
title: str
|
|
body: str
|
|
data: dict[str, Any]
|