246 lines
9.4 KiB
Python
246 lines
9.4 KiB
Python
from sqlalchemy import select
|
|
|
|
from app.auth.models import EmailVerificationToken
|
|
from app.utils.totp import _totp_code
|
|
|
|
|
|
async def test_register_verify_login_and_me(client, db_session):
|
|
register_payload = {
|
|
"email": "alice@example.com",
|
|
"name": "Alice",
|
|
"username": "alice",
|
|
"password": "strongpass123",
|
|
}
|
|
register_response = await client.post("/api/v1/auth/register", json=register_payload)
|
|
assert register_response.status_code == 201
|
|
|
|
login_response_before_verify = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": register_payload["email"], "password": register_payload["password"]},
|
|
)
|
|
assert login_response_before_verify.status_code == 403
|
|
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
|
|
verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
assert verify_response.status_code == 200
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": register_payload["email"], "password": register_payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
token_data = login_response.json()
|
|
assert "access_token" in token_data
|
|
assert "refresh_token" in token_data
|
|
|
|
me_response = await client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {token_data['access_token']}"},
|
|
)
|
|
assert me_response.status_code == 200
|
|
me_data = me_response.json()
|
|
assert me_data["email"] == "alice@example.com"
|
|
assert me_data["email_verified"] is True
|
|
|
|
sessions_response = await client.get(
|
|
"/api/v1/auth/sessions",
|
|
headers={"Authorization": f"Bearer {token_data['access_token']}"},
|
|
)
|
|
assert sessions_response.status_code == 200
|
|
sessions = sessions_response.json()
|
|
assert len(sessions) >= 1
|
|
assert any(item.get("token_type") == "access" for item in sessions)
|
|
|
|
|
|
async def test_refresh_token_rotation(client, db_session):
|
|
payload = {
|
|
"email": "bob@example.com",
|
|
"name": "Bob",
|
|
"username": "bob",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
refresh_token = login_response.json()["refresh_token"]
|
|
|
|
refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert refresh_response.status_code == 200
|
|
rotated_refresh_token = refresh_response.json()["refresh_token"]
|
|
assert rotated_refresh_token != refresh_token
|
|
|
|
old_refresh_reuse = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert old_refresh_reuse.status_code == 401
|
|
|
|
|
|
async def test_revoke_all_sessions_invalidates_access_and_refresh(client, db_session):
|
|
payload = {
|
|
"email": "carol@example.com",
|
|
"name": "Carol",
|
|
"username": "carol",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
tokens = login_response.json()
|
|
access_token = tokens["access_token"]
|
|
refresh_token = tokens["refresh_token"]
|
|
|
|
revoke_all_response = await client.delete(
|
|
"/api/v1/auth/sessions",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert revoke_all_response.status_code == 204
|
|
|
|
me_response = await client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert me_response.status_code == 401
|
|
|
|
refresh_response = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert refresh_response.status_code == 401
|
|
|
|
|
|
async def test_revoke_single_session_invalidates_only_target_refresh_token(client, db_session):
|
|
payload = {
|
|
"email": "frank@example.com",
|
|
"name": "Frank",
|
|
"username": "frank",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
tokens = login_response.json()
|
|
access_token = tokens["access_token"]
|
|
refresh_token = tokens["refresh_token"]
|
|
|
|
sessions_response = await client.get(
|
|
"/api/v1/auth/sessions",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert sessions_response.status_code == 200
|
|
refresh_sessions = [item for item in sessions_response.json() if item.get("token_type") == "refresh"]
|
|
assert len(refresh_sessions) >= 1
|
|
target_jti = refresh_sessions[0]["jti"]
|
|
|
|
revoke_response = await client.delete(
|
|
f"/api/v1/auth/sessions/{target_jti}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert revoke_response.status_code == 204
|
|
|
|
refresh_after_revoke = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh_token})
|
|
assert refresh_after_revoke.status_code == 401
|
|
|
|
me_after_revoke = await client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert me_after_revoke.status_code == 200
|
|
|
|
|
|
async def test_twofa_setup_is_blocked_when_already_enabled(client, db_session):
|
|
payload = {
|
|
"email": "dana@example.com",
|
|
"name": "Dana",
|
|
"username": "dana",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
access_token = login_response.json()["access_token"]
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
|
|
assert setup_response.status_code == 200
|
|
secret = setup_response.json()["secret"]
|
|
code = _totp_code(secret)
|
|
|
|
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": code})
|
|
assert enable_response.status_code == 200
|
|
|
|
setup_again_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
|
|
assert setup_again_response.status_code == 400
|
|
|
|
|
|
async def test_twofa_recovery_codes_can_login_once(client, db_session):
|
|
payload = {
|
|
"email": "erin@example.com",
|
|
"name": "Erin",
|
|
"username": "erin",
|
|
"password": "strongpass123",
|
|
}
|
|
await client.post("/api/v1/auth/register", json=payload)
|
|
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
|
verify_token = token_row.scalar_one().token
|
|
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
|
|
|
login_response = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"]},
|
|
)
|
|
assert login_response.status_code == 200
|
|
access_token = login_response.json()["access_token"]
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
|
|
assert setup_response.status_code == 200
|
|
secret = setup_response.json()["secret"]
|
|
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)})
|
|
assert enable_response.status_code == 200
|
|
|
|
regen_response = await client.post(
|
|
"/api/v1/auth/2fa/recovery-codes/regenerate",
|
|
headers=headers,
|
|
json={"code": _totp_code(secret)},
|
|
)
|
|
assert regen_response.status_code == 200
|
|
codes = regen_response.json()["codes"]
|
|
assert len(codes) >= 1
|
|
first_code = codes[0]
|
|
|
|
login_with_recovery = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code},
|
|
)
|
|
assert login_with_recovery.status_code == 200
|
|
|
|
reuse_recovery_code = await client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": payload["email"], "password": payload["password"], "recovery_code": first_code},
|
|
)
|
|
assert reuse_recovery_code.status_code == 401
|