Files
Messenger/tests/conftest.py
benya 85631b566a
All checks were successful
CI / test (push) Successful in 9m2s
Implement security hardening, notification pipeline, and CI test suite
Security hardening:

- Added IP/user rate limiting with Redis-backed counters and fail-open behavior.

- Added message anti-spam controls (per-chat rate + duplicate cooldown).

- Implemented refresh token rotation with JTI tracking and revoke support.

Notification pipeline:

- Added Celery app and async notification tasks for mention/offline delivery.

- Added Redis-based presence tracking and integrated it into realtime connect/disconnect.

- Added notification dispatch from message flow and notifications listing endpoint.

Quality gates and CI:

- Added pytest async integration tests for auth and chat/message lifecycle.

- Added pytest config, test fixtures, and GitHub Actions CI workflow.

- Fixed bcrypt/passlib compatibility by pinning bcrypt version.

- Documented worker and quality-gate commands in README.
2026-03-07 21:46:30 +03:00

41 lines
1.1 KiB
Python

import os
import sys
from pathlib import Path
# Set test env before importing app modules.
os.environ["POSTGRES_DSN"] = "sqlite+aiosqlite:///./test.db"
os.environ["AUTO_CREATE_TABLES"] = "false"
os.environ["REDIS_URL"] = "redis://localhost:6399/15"
os.environ["SECRET_KEY"] = "test-secret-key-1234567890"
os.environ["CELERY_TASK_ALWAYS_EAGER"] = "true"
PROJECT_ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(PROJECT_ROOT))
import pytest
from httpx import ASGITransport, AsyncClient
from app.database.base import Base
from app.database.session import AsyncSessionLocal, engine
from app.main import app
@pytest.fixture(autouse=True)
async def reset_db() -> None:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
yield
@pytest.fixture
async def client() -> AsyncClient:
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://testserver") as ac:
yield ac
@pytest.fixture
async def db_session():
async with AsyncSessionLocal() as session:
yield session