feat(auth): add TOTP 2FA setup and login verification
Some checks failed
CI / test (push) Failing after 21s
Some checks failed
CI / test (push) Failing after 21s
- add user twofa fields and migration - add 2FA setup/enable/disable endpoints - enforce OTP on login when 2FA enabled - add web login OTP field and settings UI
This commit is contained in:
@@ -14,9 +14,13 @@ from app.auth.schemas import (
|
||||
ResetPasswordRequest,
|
||||
TokenResponse,
|
||||
SessionRead,
|
||||
TwoFactorCodeRequest,
|
||||
TwoFactorSetupRead,
|
||||
VerifyEmailRequest,
|
||||
)
|
||||
from app.auth.service import (
|
||||
disable_twofa,
|
||||
enable_twofa,
|
||||
get_current_user,
|
||||
get_email_sender,
|
||||
get_request_metadata,
|
||||
@@ -29,6 +33,7 @@ from app.auth.service import (
|
||||
request_password_reset,
|
||||
resend_verification_email,
|
||||
reset_password,
|
||||
setup_twofa,
|
||||
verify_email,
|
||||
)
|
||||
from app.database.session import get_db
|
||||
@@ -155,3 +160,32 @@ async def revoke_session(jti: str, current_user: User = Depends(get_current_user
|
||||
@router.delete("/sessions", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def revoke_all_sessions(current_user: User = Depends(get_current_user)) -> None:
|
||||
await revoke_all_user_sessions(user_id=current_user.id)
|
||||
|
||||
|
||||
@router.post("/2fa/setup", response_model=TwoFactorSetupRead)
|
||||
async def setup_2fa(
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> TwoFactorSetupRead:
|
||||
secret, otpauth_url = await setup_twofa(db, current_user)
|
||||
return TwoFactorSetupRead(secret=secret, otpauth_url=otpauth_url)
|
||||
|
||||
|
||||
@router.post("/2fa/enable", response_model=MessageResponse)
|
||||
async def enable_2fa(
|
||||
payload: TwoFactorCodeRequest,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> MessageResponse:
|
||||
await enable_twofa(db, current_user, code=payload.code)
|
||||
return MessageResponse(message="2FA enabled")
|
||||
|
||||
|
||||
@router.post("/2fa/disable", response_model=MessageResponse)
|
||||
async def disable_2fa(
|
||||
payload: TwoFactorCodeRequest,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> MessageResponse:
|
||||
await disable_twofa(db, current_user, code=payload.code)
|
||||
return MessageResponse(message="2FA disabled")
|
||||
|
||||
@@ -13,6 +13,7 @@ class RegisterRequest(BaseModel):
|
||||
class LoginRequest(BaseModel):
|
||||
email: EmailStr
|
||||
password: str = Field(min_length=8, max_length=128)
|
||||
otp_code: str | None = Field(default=None, min_length=6, max_length=8)
|
||||
|
||||
|
||||
class RefreshTokenRequest(BaseModel):
|
||||
@@ -56,6 +57,7 @@ class AuthUserResponse(BaseModel):
|
||||
bio: str | None = None
|
||||
avatar_url: str | None = None
|
||||
email_verified: bool
|
||||
twofa_enabled: bool
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
@@ -65,3 +67,12 @@ class SessionRead(BaseModel):
|
||||
created_at: datetime
|
||||
ip_address: str | None = None
|
||||
user_agent: str | None = None
|
||||
|
||||
|
||||
class TwoFactorSetupRead(BaseModel):
|
||||
secret: str
|
||||
otpauth_url: str
|
||||
|
||||
|
||||
class TwoFactorCodeRequest(BaseModel):
|
||||
code: str = Field(min_length=6, max_length=8)
|
||||
|
||||
@@ -30,6 +30,7 @@ from app.database.session import get_db
|
||||
from app.email.service import EmailDeliveryError, EmailService, get_email_service
|
||||
from app.users.models import User
|
||||
from app.users.repository import create_user, get_user_by_email, get_user_by_id, get_user_by_username
|
||||
from app.utils.totp import build_otpauth_uri, generate_totp_secret, verify_totp_code
|
||||
from app.utils.security import (
|
||||
create_access_token,
|
||||
create_refresh_token,
|
||||
@@ -132,6 +133,11 @@ async def login_user(
|
||||
|
||||
if not user.email_verified:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Email not verified")
|
||||
if user.twofa_enabled:
|
||||
if not payload.otp_code:
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="2FA code required")
|
||||
if not user.twofa_secret or not verify_totp_code(user.twofa_secret, payload.otp_code):
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid 2FA code")
|
||||
|
||||
refresh_jti = str(uuid4())
|
||||
refresh_token = create_refresh_token(str(user.id), jti=refresh_jti)
|
||||
@@ -215,6 +221,43 @@ def get_request_metadata(request: Request) -> tuple[str | None, str | None]:
|
||||
return ip_address, user_agent
|
||||
|
||||
|
||||
async def setup_twofa(db: AsyncSession, user: User) -> tuple[str, str]:
|
||||
if user.twofa_enabled and user.twofa_secret:
|
||||
secret = user.twofa_secret
|
||||
else:
|
||||
secret = generate_totp_secret()
|
||||
user.twofa_secret = secret
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
otpauth_url = build_otpauth_uri(secret=secret, account_name=user.email, issuer=settings.app_name)
|
||||
return secret, otpauth_url
|
||||
|
||||
|
||||
async def enable_twofa(db: AsyncSession, user: User, *, code: str) -> None:
|
||||
if not user.twofa_secret:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="2FA is not initialized")
|
||||
if not verify_totp_code(user.twofa_secret, code):
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid 2FA code")
|
||||
user.twofa_enabled = True
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
|
||||
|
||||
async def disable_twofa(db: AsyncSession, user: User, *, code: str) -> None:
|
||||
if not user.twofa_enabled or not user.twofa_secret:
|
||||
user.twofa_enabled = False
|
||||
user.twofa_secret = None
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
return
|
||||
if not verify_totp_code(user.twofa_secret, code):
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid 2FA code")
|
||||
user.twofa_enabled = False
|
||||
user.twofa_secret = None
|
||||
await db.commit()
|
||||
await db.refresh(user)
|
||||
|
||||
|
||||
async def request_password_reset(
|
||||
db: AsyncSession,
|
||||
payload: RequestPasswordResetRequest,
|
||||
|
||||
@@ -24,6 +24,8 @@ class User(Base):
|
||||
bio: Mapped[str | None] = mapped_column(String(500), nullable=True)
|
||||
email_verified: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False, index=True)
|
||||
allow_private_messages: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False, server_default="true")
|
||||
twofa_enabled: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False, server_default="false")
|
||||
twofa_secret: Mapped[str | None] = mapped_column(String(64), nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
|
||||
@@ -21,6 +21,7 @@ class UserRead(UserBase):
|
||||
bio: str | None = None
|
||||
email_verified: bool
|
||||
allow_private_messages: bool
|
||||
twofa_enabled: bool = False
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
|
||||
44
app/utils/totp.py
Normal file
44
app/utils/totp.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import secrets
|
||||
import struct
|
||||
import time
|
||||
import urllib.parse
|
||||
|
||||
|
||||
def generate_totp_secret(length: int = 20) -> str:
|
||||
raw = secrets.token_bytes(length)
|
||||
return base64.b32encode(raw).decode("ascii").rstrip("=")
|
||||
|
||||
|
||||
def build_otpauth_uri(*, secret: str, account_name: str, issuer: str) -> str:
|
||||
label = urllib.parse.quote(f"{issuer}:{account_name}")
|
||||
issuer_q = urllib.parse.quote(issuer)
|
||||
secret_q = urllib.parse.quote(secret)
|
||||
return f"otpauth://totp/{label}?secret={secret_q}&issuer={issuer_q}&algorithm=SHA1&digits=6&period=30"
|
||||
|
||||
|
||||
def _totp_code(secret: str, for_time: int | None = None, step: int = 30, digits: int = 6) -> str:
|
||||
now = int(time.time()) if for_time is None else int(for_time)
|
||||
counter = now // step
|
||||
padded = secret + "=" * ((8 - len(secret) % 8) % 8)
|
||||
key = base64.b32decode(padded, casefold=True)
|
||||
msg = struct.pack(">Q", counter)
|
||||
digest = hmac.new(key, msg, hashlib.sha1).digest()
|
||||
offset = digest[-1] & 0x0F
|
||||
binary = struct.unpack(">I", digest[offset : offset + 4])[0] & 0x7FFFFFFF
|
||||
return str(binary % (10**digits)).zfill(digits)
|
||||
|
||||
|
||||
def verify_totp_code(secret: str, code: str, *, window: int = 1, step: int = 30, digits: int = 6) -> bool:
|
||||
prepared = "".join(ch for ch in code if ch.isdigit())
|
||||
if len(prepared) != digits:
|
||||
return False
|
||||
now = int(time.time())
|
||||
for delta in range(-window, window + 1):
|
||||
expected = _totp_code(secret, for_time=now + delta * step, step=step, digits=digits)
|
||||
if secrets.compare_digest(expected, prepared):
|
||||
return True
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user