Files
Messenger/tests/test_auth_flow.py
2026-03-08 21:07:17 +03:00

374 lines
15 KiB
Python

from sqlalchemy import select
from app.auth.models import EmailVerificationToken, PasswordResetToken
from app.utils.totp import _totp_code
async def test_register_verify_login_and_me(client, db_session):
register_payload = {
"email": "alice@example.com",
"name": "Alice",
"username": "alice",
"password": "strongpass123",
}
register_response = await client.post("/api/v1/auth/register", json=register_payload)
assert register_response.status_code == 201
login_response_before_verify = await client.post(
"/api/v1/auth/login",
json={"email": register_payload["email"], "password": register_payload["password"]},
)
assert login_response_before_verify.status_code == 403
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
assert verify_response.status_code == 200
login_response = await client.post(
"/api/v1/auth/login",
json={"email": register_payload["email"], "password": register_payload["password"]},
)
assert login_response.status_code == 200
token_data = login_response.json()
assert "access_token" in token_data
assert "refresh_token" in token_data
me_response = await client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
assert me_response.status_code == 200
me_data = me_response.json()
assert me_data["email"] == "alice@example.com"
assert me_data["email_verified"] is True
sessions_response = await client.get(
"/api/v1/auth/sessions",
headers={"Authorization": f"Bearer {token_data['access_token']}"},
)
assert sessions_response.status_code == 200
sessions = sessions_response.json()
assert len(sessions) >= 1
assert any(item.get("token_type") == "access" for item in sessions)
async def test_refresh_token_rotation(client, db_session):
payload = {
"email": "bob@example.com",
"name": "Bob",
"username": "bob",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
refresh_token = login_response.json()["refresh_token"]
refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response.status_code == 200
rotated_refresh_token = refresh_response.json()["refresh_token"]
assert rotated_refresh_token != refresh_token
old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
assert old_refresh_reuse.status_code == 401
async def test_revoke_all_sessions_invalidates_access_and_refresh(client, db_session):
payload = {
"email": "carol@example.com",
"name": "Carol",
"username": "carol",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
tokens = login_response.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]
revoke_all_response = await client.delete(
"/api/v1/auth/sessions",
headers={"Authorization": f"Bearer {access_token}"},
)
assert revoke_all_response.status_code == 204
me_response = await client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {access_token}"},
)
assert me_response.status_code == 401
refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_response.status_code == 401
async def test_revoke_single_session_invalidates_only_target_refresh_token(client, db_session):
payload = {
"email": "frank@example.com",
"name": "Frank",
"username": "frank",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
tokens = login_response.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]
sessions_response = await client.get(
"/api/v1/auth/sessions",
headers={"Authorization": f"Bearer {access_token}"},
)
assert sessions_response.status_code == 200
refresh_sessions = [item for item in sessions_response.json() if item.get("token_type") == "refresh"]
assert len(refresh_sessions) >= 1
target_jti = refresh_sessions[0]["jti"]
revoke_response = await client.delete(
f"/api/v1/auth/sessions/{target_jti}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert revoke_response.status_code == 204
refresh_after_revoke = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
assert refresh_after_revoke.status_code == 401
me_after_revoke = await client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {access_token}"},
)
assert me_after_revoke.status_code == 200
async def test_twofa_setup_is_blocked_when_already_enabled(client, db_session):
payload = {
"email": "dana@example.com",
"name": "Dana",
"username": "dana",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
access_token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {access_token}"}
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
assert setup_response.status_code == 200
secret = setup_response.json()["secret"]
code = _totp_code(secret)
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": code})
assert enable_response.status_code == 200
setup_again_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
assert setup_again_response.status_code == 400
async def test_twofa_recovery_codes_can_login_once(client, db_session):
payload = {
"email": "erin@example.com",
"name": "Erin",
"username": "erin",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
access_token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {access_token}"}
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
assert setup_response.status_code == 200
secret = setup_response.json()["secret"]
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)})
assert enable_response.status_code == 200
regen_response = await client.post(
"/api/v1/auth/2fa/recovery-codes/regenerate",
headers=headers,
json={"code": _totp_code(secret)},
)
assert regen_response.status_code == 200
codes = regen_response.json()["codes"]
assert len(codes) >= 1
first_code = codes[0]
login_with_recovery = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code},
)
assert login_with_recovery.status_code == 200
reuse_recovery_code = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code},
)
assert reuse_recovery_code.status_code == 401
async def test_twofa_recovery_codes_are_normalized_and_decrement_status(client, db_session):
payload = {
"email": "erin2@example.com",
"name": "Erin Two",
"username": "erin_two",
"password": "strongpass123",
}
await client.post("/api/v1/auth/register", json=payload)
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
access_token = login_response.json()["access_token"]
headers = {"Authorization": f"Bearer {access_token}"}
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
assert setup_response.status_code == 200
secret = setup_response.json()["secret"]
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)})
assert enable_response.status_code == 200
regen_response = await client.post(
"/api/v1/auth/2fa/recovery-codes/regenerate",
headers=headers,
json={"code": _totp_code(secret)},
)
assert regen_response.status_code == 200
codes = regen_response.json()["codes"]
assert len(codes) >= 2
initial_count = len(codes)
first_code = codes[0]
status_before = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers)
assert status_before.status_code == 200
assert status_before.json()["remaining_codes"] == initial_count
normalized_variant = first_code.lower().replace("-", "")
login_with_normalized_recovery = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"], "recovery_code": normalized_variant},
)
assert login_with_normalized_recovery.status_code == 200
status_after = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers)
assert status_after.status_code == 200
assert status_after.json()["remaining_codes"] == initial_count - 1
async def test_resend_verification_replaces_old_token_and_verifies_with_new_one(client, db_session):
payload = {
"email": "resend_flow@example.com",
"name": "Resend Flow",
"username": "resend_flow",
"password": "strongpass123",
}
register_response = await client.post("/api/v1/auth/register", json=payload)
assert register_response.status_code == 201
first_token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
first_token = first_token_row.scalar_one().token
resend_response = await client.post("/api/v1/auth/resend-verification", json={"email": payload["email"]})
assert resend_response.status_code == 200
second_token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
second_token = second_token_row.scalar_one().token
assert second_token != first_token
verify_old = await client.post("/api/v1/auth/verify-email", json={"token": first_token})
assert verify_old.status_code == 400
verify_new = await client.post("/api/v1/auth/verify-email", json={"token": second_token})
assert verify_new.status_code == 200
login_response = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert login_response.status_code == 200
async def test_password_reset_flow_replaces_password_and_invalidates_old_password(client, db_session):
payload = {
"email": "reset_flow@example.com",
"name": "Reset Flow",
"username": "reset_flow",
"password": "strongpass123",
}
register_response = await client.post("/api/v1/auth/register", json=payload)
assert register_response.status_code == 201
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
verify_token = token_row.scalar_one().token
verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
assert verify_response.status_code == 200
request_reset = await client.post("/api/v1/auth/request-password-reset", json={"email": payload["email"]})
assert request_reset.status_code == 200
reset_token_row = await db_session.execute(select(PasswordResetToken).order_by(PasswordResetToken.id.desc()))
reset_token = reset_token_row.scalar_one().token
new_password = "newstrongpass456"
reset_password_response = await client.post(
"/api/v1/auth/reset-password",
json={"token": reset_token, "new_password": new_password},
)
assert reset_password_response.status_code == 200
old_login = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": payload["password"]},
)
assert old_login.status_code == 401
new_login = await client.post(
"/api/v1/auth/login",
json={"email": payload["email"], "password": new_password},
)
assert new_login.status_code == 200