feat(p0): complete account security privacy and sync hardening
Some checks failed
CI / test (push) Failing after 2m10s
Some checks failed
CI / test (push) Failing after 2m10s
This commit is contained in:
@@ -371,3 +371,100 @@ async def test_password_reset_flow_replaces_password_and_invalidates_old_passwor
|
||||
json={"email": payload["email"], "password": new_password},
|
||||
)
|
||||
assert new_login.status_code == 200
|
||||
|
||||
|
||||
async def test_check_email_status_reflects_verification_and_twofa_state(client, db_session):
|
||||
payload = {
|
||||
"email": "status_flow@example.com",
|
||||
"name": "Status Flow",
|
||||
"username": "status_flow",
|
||||
"password": "strongpass123",
|
||||
}
|
||||
register_response = await client.post("/api/v1/auth/register", json=payload)
|
||||
assert register_response.status_code == 201
|
||||
|
||||
status_before_verify = await client.get("/api/v1/auth/check-email", params={"email": payload["email"]})
|
||||
assert status_before_verify.status_code == 200
|
||||
body_before = status_before_verify.json()
|
||||
assert body_before["registered"] is True
|
||||
assert body_before["email_verified"] is False
|
||||
assert body_before["twofa_enabled"] is False
|
||||
|
||||
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
||||
verify_token = token_row.scalar_one().token
|
||||
verify_response = await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
||||
assert verify_response.status_code == 200
|
||||
|
||||
login_response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": payload["email"], "password": payload["password"]},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
access_token = login_response.json()["access_token"]
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
|
||||
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
|
||||
assert setup_response.status_code == 200
|
||||
secret = setup_response.json()["secret"]
|
||||
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)})
|
||||
assert enable_response.status_code == 200
|
||||
|
||||
status_after_enable = await client.get("/api/v1/auth/check-email", params={"email": payload["email"]})
|
||||
assert status_after_enable.status_code == 200
|
||||
body_after = status_after_enable.json()
|
||||
assert body_after["registered"] is True
|
||||
assert body_after["email_verified"] is True
|
||||
assert body_after["twofa_enabled"] is True
|
||||
|
||||
|
||||
async def test_disable_twofa_clears_recovery_codes_and_allows_password_login_without_otp(client, db_session):
|
||||
payload = {
|
||||
"email": "disable_twofa@example.com",
|
||||
"name": "Disable Twofa",
|
||||
"username": "disable_twofa",
|
||||
"password": "strongpass123",
|
||||
}
|
||||
await client.post("/api/v1/auth/register", json=payload)
|
||||
token_row = await db_session.execute(select(EmailVerificationToken).order_by(EmailVerificationToken.id.desc()))
|
||||
verify_token = token_row.scalar_one().token
|
||||
await client.post("/api/v1/auth/verify-email", json={"token": verify_token})
|
||||
|
||||
login_response = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": payload["email"], "password": payload["password"]},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
access_token = login_response.json()["access_token"]
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
|
||||
setup_response = await client.post("/api/v1/auth/2fa/setup", headers=headers)
|
||||
assert setup_response.status_code == 200
|
||||
secret = setup_response.json()["secret"]
|
||||
|
||||
enable_response = await client.post("/api/v1/auth/2fa/enable", headers=headers, json={"code": _totp_code(secret)})
|
||||
assert enable_response.status_code == 200
|
||||
|
||||
regen_response = await client.post(
|
||||
"/api/v1/auth/2fa/recovery-codes/regenerate",
|
||||
headers=headers,
|
||||
json={"code": _totp_code(secret)},
|
||||
)
|
||||
assert regen_response.status_code == 200
|
||||
assert len(regen_response.json()["codes"]) >= 1
|
||||
|
||||
disable_response = await client.post(
|
||||
"/api/v1/auth/2fa/disable",
|
||||
headers=headers,
|
||||
json={"code": _totp_code(secret)},
|
||||
)
|
||||
assert disable_response.status_code == 200
|
||||
|
||||
status_after_disable = await client.get("/api/v1/auth/2fa/recovery-codes/status", headers=headers)
|
||||
assert status_after_disable.status_code == 200
|
||||
assert status_after_disable.json()["remaining_codes"] == 0
|
||||
|
||||
plain_login_after_disable = await client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"email": payload["email"], "password": payload["password"]},
|
||||
)
|
||||
assert plain_login_after_disable.status_code == 200
|
||||
|
||||
Reference in New Issue
Block a user