from sqlalchemy import select from app.auth.models import EmailVerificationToken from app.utils.totp import _totp_code async def test_register_verify_login_and_me(client, db_session): register_payload = { "email": "alice@example.com", "name": "Alice", "username": "alice", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=register_payload) assert register_response.status_code == 201 login_response_before_verify = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response_before_verify.status_code == 403 token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) assert verify_response.status_code == 200 login_response = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response.status_code == 200 token_data = login_response.json() assert "access_token" in token_data assert "refresh_token" in token_data me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert me_response.status_code == 200 me_data = me_response.json() assert me_data["email"] == "alice@example.com" assert me_data["email_verified"] is True sessions_response = await client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert sessions_response.status_code == 200 sessions = sessions_response.json() assert len(sessions) >= 1 assert any(item.get("token_type") == "access" for item in sessions) async def test_refresh_token_rotation(client, db_session): payload = { "email": "bob@example.com", "name": "Bob", "username": "bob", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) refresh_token = login_response.json()["refresh_token"] refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 200 rotated_refresh_token = refresh_response.json()["refresh_token"] assert rotated_refresh_token != refresh_token old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert old_refresh_reuse.status_code == 401 async def test_revoke_all_sessions_invalidates_access_and_refresh(client, db_session): payload = { "email": "carol@example.com", "name": "Carol", "username": "carol", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 tokens = login_response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] revoke_all_response = await client.delete( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {access_token}"}, ) assert revoke_all_response.status_code == 204 me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert me_response.status_code == 401 refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 401 async def test_revoke_single_session_invalidates_only_target_refresh_token(client, db_session): payload = { "email": "frank@example.com", "name": "Frank", "username": "frank", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 tokens = login_response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] sessions_response = await client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {access_token}"}, ) assert sessions_response.status_code == 200 refresh_sessions = [item for item in sessions_response.json() if item.get("token_type") == "refresh"] assert len(refresh_sessions) >= 1 target_jti = refresh_sessions[0]["jti"] revoke_response = await client.delete( f"/api/v1/auth/sessions/{target_jti}", headers={"Authorization": f"Bearer {access_token}"}, ) assert revoke_response.status_code == 204 refresh_after_revoke = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_after_revoke.status_code == 401 me_after_revoke = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert me_after_revoke.status_code == 200 async def test_twofa_setup_is_blocked_when_already_enabled(client, db_session): payload = { "email": "dana@example.com", "name": "Dana", "username": "dana", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] code = _totp_code(secret) enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": code}) assert enable_response.status_code == 200 setup_again_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_again_response.status_code == 400 async def test_twofa_recovery_codes_can_login_once(client, db_session): payload = { "email": "erin@example.com", "name": "Erin", "username": "erin", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)}) assert enable_response.status_code == 200 regen_response = await client.post( "/api/v1/auth/2fa/recovery-codes/regenerate", headers=headers, json={"code": _totp_code(secret)}, ) assert regen_response.status_code == 200 codes = regen_response.json()["codes"] assert len(codes) >= 1 first_code = codes[0] login_with_recovery = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code}, ) assert login_with_recovery.status_code == 200 reuse_recovery_code = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code}, ) assert reuse_recovery_code.status_code == 401