from sqlalchemy import select from app.auth.models import EmailVerificationToken, PasswordResetToken from app.utils.totp import _totp_code async def test_register_verify_login_and_me(client, db_session): register_payload = { "email": "alice@example.com", "name": "Alice", "username": "alice", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=register_payload) assert register_response.status_code == 201 login_response_before_verify = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response_before_verify.status_code == 403 token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) assert verify_response.status_code == 200 login_response = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response.status_code == 200 token_data = login_response.json() assert "access_token" in token_data assert "refresh_token" in token_data me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert me_response.status_code == 200 me_data = me_response.json() assert me_data["email"] == "alice@example.com" assert me_data["email_verified"] is True sessions_response = await client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert sessions_response.status_code == 200 sessions = sessions_response.json() assert len(sessions) >= 1 assert any(item.get("token_type") == "access" for item in sessions) async def test_refresh_token_rotation(client, db_session): payload = { "email": "bob@example.com", "name": "Bob", "username": "bob", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) refresh_token = login_response.json()["refresh_token"] refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 200 rotated_refresh_token = refresh_response.json()["refresh_token"] assert rotated_refresh_token != refresh_token old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert old_refresh_reuse.status_code == 401 async def test_revoke_all_sessions_invalidates_access_and_refresh(client, db_session): payload = { "email": "carol@example.com", "name": "Carol", "username": "carol", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 tokens = login_response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] revoke_all_response = await client.delete( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {access_token}"}, ) assert revoke_all_response.status_code == 204 me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert me_response.status_code == 401 refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 401 async def test_revoke_single_session_invalidates_only_target_refresh_token(client, db_session): payload = { "email": "frank@example.com", "name": "Frank", "username": "frank", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 tokens = login_response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] sessions_response = await client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {access_token}"}, ) assert sessions_response.status_code == 200 refresh_sessions = [item for item in sessions_response.json() if item.get("token_type") == "refresh"] assert len(refresh_sessions) >= 1 target_jti = refresh_sessions[0]["jti"] revoke_response = await client.delete( f"/api/v1/auth/sessions/{target_jti}", headers={"Authorization": f"Bearer {access_token}"}, ) assert revoke_response.status_code == 204 refresh_after_revoke = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_after_revoke.status_code == 401 me_after_revoke = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert me_after_revoke.status_code == 200 async def test_twofa_setup_is_blocked_when_already_enabled(client, db_session): payload = { "email": "dana@example.com", "name": "Dana", "username": "dana", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] code = _totp_code(secret) enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": code}) assert enable_response.status_code == 200 setup_again_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_again_response.status_code == 400 async def test_twofa_recovery_codes_can_login_once(client, db_session): payload = { "email": "erin@example.com", "name": "Erin", "username": "erin", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)}) assert enable_response.status_code == 200 regen_response = await client.post( "/api/v1/auth/2fa/recovery-codes/regenerate", headers=headers, json={"code": _totp_code(secret)}, ) assert regen_response.status_code == 200 codes = regen_response.json()["codes"] assert len(codes) >= 1 first_code = codes[0] login_with_recovery = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code}, ) assert login_with_recovery.status_code == 200 reuse_recovery_code = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code}, ) assert reuse_recovery_code.status_code == 401 async def test_twofa_recovery_codes_are_normalized_and_decrement_status(client, db_session): payload = { "email": "erin2@example.com", "name": "Erin Two", "username": "erin_two", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)}) assert enable_response.status_code == 200 regen_response = await client.post( "/api/v1/auth/2fa/recovery-codes/regenerate", headers=headers, json={"code": _totp_code(secret)}, ) assert regen_response.status_code == 200 codes = regen_response.json()["codes"] assert len(codes) >= 2 initial_count = len(codes) first_code = codes[0] status_before = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers) assert status_before.status_code == 200 assert status_before.json()["remaining_codes"] == initial_count normalized_variant = first_code.lower().replace("-", "") login_with_normalized_recovery = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"], "recovery_code": normalized_variant}, ) assert login_with_normalized_recovery.status_code == 200 status_after = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers) assert status_after.status_code == 200 assert status_after.json()["remaining_codes"] == initial_count - 1 async def test_resend_verification_replaces_old_token_and_verifies_with_new_one(client, db_session): payload = { "email": "resend_flow@example.com", "name": "Resend Flow", "username": "resend_flow", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=payload) assert register_response.status_code == 201 first_token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) first_token = first_token_row.scalar_one().token resend_response = await client.post("/api/v1/auth/resend-verification", json={"email": payload["email"]}) assert resend_response.status_code == 200 second_token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) second_token = second_token_row.scalar_one().token assert second_token != first_token verify_old = await client.post("/api/v1/auth/verify-email", json={"token": first_token}) assert verify_old.status_code == 400 verify_new = await client.post("/api/v1/auth/verify-email", json={"token": second_token}) assert verify_new.status_code == 200 login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 async def test_password_reset_flow_replaces_password_and_invalidates_old_password(client, db_session): payload = { "email": "reset_flow@example.com", "name": "Reset Flow", "username": "reset_flow", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=payload) assert register_response.status_code == 201 token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) assert verify_response.status_code == 200 request_reset = await client.post("/api/v1/auth/request-password-reset", json={"email": payload["email"]}) assert request_reset.status_code == 200 reset_token_row = await db_session.execute(select(PasswordResetToken).order_by(PasswordResetToken.id.desc())) reset_token = reset_token_row.scalar_one().token new_password = "newstrongpass456" reset_password_response = await client.post( "/api/v1/auth/reset-password", json={"token": reset_token, "new_password": new_password}, ) assert reset_password_response.status_code == 200 old_login = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert old_login.status_code == 401 new_login = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": new_password}, ) assert new_login.status_code == 200 async def test_check_email_status_reflects_verification_and_twofa_state(client, db_session): payload = { "email": "status_flow@example.com", "name": "Status Flow", "username": "status_flow", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=payload) assert register_response.status_code == 201 status_before_verify = await client.get("/api/v1/auth/check-email", params={"email": payload["email"]}) assert status_before_verify.status_code == 200 body_before = status_before_verify.json() assert body_before["registered"] is True assert body_before["email_verified"] is False assert body_before["twofa_enabled"] is False token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) assert verify_response.status_code == 200 login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)}) assert enable_response.status_code == 200 status_after_enable = await client.get("/api/v1/auth/check-email", params={"email": payload["email"]}) assert status_after_enable.status_code == 200 body_after = status_after_enable.json() assert body_after["registered"] is True assert body_after["email_verified"] is True assert body_after["twofa_enabled"] is True async def test_disable_twofa_clears_recovery_codes_and_allows_password_login_without_otp(client, db_session): payload = { "email": "disable_twofa@example.com", "name": "Disable Twofa", "username": "disable_twofa", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 access_token = login_response.json()["access_token"] headers = {"Authorization": f"Bearer {access_token}"} setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers) assert setup_response.status_code == 200 secret = setup_response.json()["secret"] enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)}) assert enable_response.status_code == 200 regen_response = await client.post( "/api/v1/auth/2fa/recovery-codes/regenerate", headers=headers, json={"code": _totp_code(secret)}, ) assert regen_response.status_code == 200 assert len(regen_response.json()["codes"]) >= 1 disable_response = await client.post( "/api/v1/auth/2fa/disable", headers=headers, json={"code": _totp_code(secret)}, ) assert disable_response.status_code == 200 status_after_disable = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers) assert status_after_disable.status_code == 200 assert status_after_disable.json()["remaining_codes"] == 0 plain_login_after_disable = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert plain_login_after_disable.status_code == 200