from sqlalchemy import select from app.auth.models import EmailVerificationToken async def test_register_verify_login_and_me(client, db_session): register_payload = { "email": "alice@example.com", "name": "Alice", "username": "alice", "password": "strongpass123", } register_response = await client.post("/api/v1/auth/register", json=register_payload) assert register_response.status_code == 201 login_response_before_verify = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response_before_verify.status_code == 403 token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) assert verify_response.status_code == 200 login_response = await client.post( "/api/v1/auth/login", json={"email": register_payload["email"], "password": register_payload["password"]}, ) assert login_response.status_code == 200 token_data = login_response.json() assert "access_token" in token_data assert "refresh_token" in token_data me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert me_response.status_code == 200 me_data = me_response.json() assert me_data["email"] == "alice@example.com" assert me_data["email_verified"] is True sessions_response = await client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {token_data['access_token']}"}, ) assert sessions_response.status_code == 200 sessions = sessions_response.json() assert len(sessions) >= 1 assert any(item.get("token_type") == "access" for item in sessions) async def test_refresh_token_rotation(client, db_session): payload = { "email": "bob@example.com", "name": "Bob", "username": "bob", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) refresh_token = login_response.json()["refresh_token"] refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 200 rotated_refresh_token = refresh_response.json()["refresh_token"] assert rotated_refresh_token != refresh_token old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert old_refresh_reuse.status_code == 401 async def test_revoke_all_sessions_invalidates_access_and_refresh(client, db_session): payload = { "email": "carol@example.com", "name": "Carol", "username": "carol", "password": "strongpass123", } await client.post("/api/v1/auth/register", json=payload) token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc())) verify_token = token_row.scalar_one().token await client.post("/api/v1/auth/verify-email", json={"token": verify_token}) login_response = await client.post( "/api/v1/auth/login", json={"email": payload["email"], "password": payload["password"]}, ) assert login_response.status_code == 200 tokens = login_response.json() access_token = tokens["access_token"] refresh_token = tokens["refresh_token"] revoke_all_response = await client.delete( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {access_token}"}, ) assert revoke_all_response.status_code == 204 me_response = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert me_response.status_code == 401 refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token}) assert refresh_response.status_code == 401