All checks were successful
CI / test (push) Successful in 9m2s
Security hardening: - Added IP/user rate limiting with Redis-backed counters and fail-open behavior. - Added message anti-spam controls (per-chat rate + duplicate cooldown). - Implemented refresh token rotation with JTI tracking and revoke support. Notification pipeline: - Added Celery app and async notification tasks for mention/offline delivery. - Added Redis-based presence tracking and integrated it into realtime connect/disconnect. - Added notification dispatch from message flow and notifications listing endpoint. Quality gates and CI: - Added pytest async integration tests for auth and chat/message lifecycle. - Added pytest config, test fixtures, and GitHub Actions CI workflow. - Fixed bcrypt/passlib compatibility by pinning bcrypt version. - Documented worker and quality-gate commands in README.
70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
from sqlalchemy import select
|
|
|
|
from app.auth.models import EmailVerificationToken
|
|
|
|
|
|
async def test_register_verify_login_and_me(client, db_session):
|
|
register_payload = {
|
|
"email": "alice@example.com",
|
|
"username": "alice",
|
|
"password": "strongpass123",
|
|
}
|
|
register_response = await client.post("/api/v1/auth/register", json=register_payload)
|
|
assert register_response.status_code == 201
|
|
|
|
login_response_before_verify = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": register_payload["email"], "password": register_payload["password"]},
|
|
)
|
|
assert login_response_before_verify.status_code == 403
|
|
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
|
|
verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
assert verify_response.status_code == 200
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": register_payload["email"], "password": register_payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
token_data = login_response.json()
|
|
assert "access_token" in token_data
|
|
assert "refresh_token" in token_data
|
|
|
|
me_response = await client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {token_data['access_token']}"},
|
|
)
|
|
assert me_response.status_code == 200
|
|
me_data = me_response.json()
|
|
assert me_data["email"] == "alice@example.com"
|
|
assert me_data["email_verified"] is True
|
|
|
|
|
|
async def test_refresh_token_rotation(client, db_session):
|
|
payload = {
|
|
"email": "bob@example.com",
|
|
"username": "bob",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
refresh_token = login_response.json()["refresh_token"]
|
|
|
|
refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert refresh_response.status_code == 200
|
|
rotated_refresh_token = refresh_response.json()["refresh_token"]
|
|
assert rotated_refresh_token != refresh_token
|
|
|
|
old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert old_refresh_reuse.status_code == 401
|