202 lines
6.7 KiB
Python
202 lines
6.7 KiB
Python
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, Depends, Request, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.auth.schemas import (
|
|
AuthUserResponse,
|
|
EmailStatusResponse,
|
|
LoginRequest,
|
|
MessageResponse,
|
|
RefreshTokenRequest,
|
|
RegisterRequest,
|
|
RequestPasswordResetRequest,
|
|
ResendVerificationRequest,
|
|
ResetPasswordRequest,
|
|
TokenResponse,
|
|
SessionRead,
|
|
TwoFactorCodeRequest,
|
|
TwoFactorSetupRead,
|
|
VerifyEmailRequest,
|
|
)
|
|
from app.auth.service import (
|
|
disable_twofa,
|
|
enable_twofa,
|
|
get_current_user,
|
|
get_email_sender,
|
|
get_request_metadata,
|
|
login_user,
|
|
list_user_sessions,
|
|
revoke_all_user_sessions,
|
|
revoke_user_session,
|
|
refresh_tokens,
|
|
register_user,
|
|
get_email_status,
|
|
request_password_reset,
|
|
resend_verification_email,
|
|
reset_password,
|
|
setup_twofa,
|
|
verify_email,
|
|
)
|
|
from app.database.session import get_db
|
|
from app.email.service import EmailService
|
|
from app.config.settings import settings
|
|
from app.utils.rate_limit import enforce_ip_rate_limit
|
|
from app.users.models import User
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
|
|
@router.get("/check-email", response_model=EmailStatusResponse)
|
|
async def check_email_status(
|
|
email: str,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> EmailStatusResponse:
|
|
return await get_email_status(db, email=email)
|
|
|
|
|
|
@router.post("/register", response_model=MessageResponse, status_code=status.HTTP_201_CREATED)
|
|
async def register(
|
|
payload: RegisterRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
email_service: EmailService = Depends(get_email_sender),
|
|
) -> MessageResponse:
|
|
await enforce_ip_rate_limit(
|
|
request,
|
|
scope="auth_register",
|
|
limit=settings.register_rate_limit_per_minute,
|
|
)
|
|
await register_user(db, payload, email_service)
|
|
return MessageResponse(message="Registration successful. Verification email sent.")
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
async def login(payload: LoginRequest, request: Request, db: AsyncSession = Depends(get_db)) -> TokenResponse:
|
|
await enforce_ip_rate_limit(
|
|
request,
|
|
scope="auth_login",
|
|
limit=settings.login_rate_limit_per_minute,
|
|
)
|
|
ip_address, user_agent = get_request_metadata(request)
|
|
return await login_user(db, payload, ip_address=ip_address, user_agent=user_agent)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
async def refresh(
|
|
payload: RefreshTokenRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> TokenResponse:
|
|
await enforce_ip_rate_limit(
|
|
request,
|
|
scope="auth_refresh",
|
|
limit=settings.refresh_rate_limit_per_minute,
|
|
)
|
|
ip_address, user_agent = get_request_metadata(request)
|
|
return await refresh_tokens(db, payload, ip_address=ip_address, user_agent=user_agent)
|
|
|
|
|
|
@router.post("/verify-email", response_model=MessageResponse)
|
|
async def verify_email_endpoint(payload: VerifyEmailRequest, db: AsyncSession = Depends(get_db)) -> MessageResponse:
|
|
await verify_email(db, payload)
|
|
return MessageResponse(message="Email verified successfully.")
|
|
|
|
|
|
@router.post("/resend-verification", response_model=MessageResponse)
|
|
async def resend_verification(
|
|
payload: ResendVerificationRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
email_service: EmailService = Depends(get_email_sender),
|
|
) -> MessageResponse:
|
|
await enforce_ip_rate_limit(
|
|
request,
|
|
scope="auth_resend_verification",
|
|
limit=settings.reset_rate_limit_per_minute,
|
|
)
|
|
await resend_verification_email(db, payload, email_service)
|
|
return MessageResponse(message="If the account exists, a verification email was sent.")
|
|
|
|
|
|
@router.post("/request-password-reset", response_model=MessageResponse)
|
|
async def request_password_reset_endpoint(
|
|
payload: RequestPasswordResetRequest,
|
|
request: Request,
|
|
db: AsyncSession = Depends(get_db),
|
|
email_service: EmailService = Depends(get_email_sender),
|
|
) -> MessageResponse:
|
|
await enforce_ip_rate_limit(
|
|
request,
|
|
scope="auth_request_reset",
|
|
limit=settings.reset_rate_limit_per_minute,
|
|
)
|
|
await request_password_reset(db, payload, email_service)
|
|
return MessageResponse(message="If the account exists, a reset email was sent.")
|
|
|
|
|
|
@router.post("/reset-password", response_model=MessageResponse)
|
|
async def reset_password_endpoint(payload: ResetPasswordRequest, db: AsyncSession = Depends(get_db)) -> MessageResponse:
|
|
await reset_password(db, payload)
|
|
return MessageResponse(message="Password reset successfully.")
|
|
|
|
|
|
@router.get("/me", response_model=AuthUserResponse)
|
|
async def me(current_user: User = Depends(get_current_user)) -> AuthUserResponse:
|
|
return current_user
|
|
|
|
|
|
@router.get("/sessions", response_model=list[SessionRead])
|
|
async def list_sessions(current_user: User = Depends(get_current_user)) -> list[SessionRead]:
|
|
sessions = await list_user_sessions(current_user.id)
|
|
out: list[SessionRead] = []
|
|
for item in sessions:
|
|
out.append(
|
|
SessionRead(
|
|
jti=item.jti,
|
|
created_at=datetime.fromtimestamp(item.created_at, tz=timezone.utc),
|
|
ip_address=item.ip_address,
|
|
user_agent=item.user_agent,
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
@router.delete("/sessions/{jti}", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def revoke_session(jti: str, current_user: User = Depends(get_current_user)) -> None:
|
|
await revoke_user_session(user_id=current_user.id, jti=jti)
|
|
|
|
|
|
@router.delete("/sessions", status_code=status.HTTP_204_NO_CONTENT)
|
|
async def revoke_all_sessions(current_user: User = Depends(get_current_user)) -> None:
|
|
await revoke_all_user_sessions(user_id=current_user.id)
|
|
|
|
|
|
@router.post("/2fa/setup", response_model=TwoFactorSetupRead)
|
|
async def setup_2fa(
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
) -> TwoFactorSetupRead:
|
|
secret, otpauth_url = await setup_twofa(db, current_user)
|
|
return TwoFactorSetupRead(secret=secret, otpauth_url=otpauth_url)
|
|
|
|
|
|
@router.post("/2fa/enable", response_model=MessageResponse)
|
|
async def enable_2fa(
|
|
payload: TwoFactorCodeRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
) -> MessageResponse:
|
|
await enable_twofa(db, current_user, code=payload.code)
|
|
return MessageResponse(message="2FA enabled")
|
|
|
|
|
|
@router.post("/2fa/disable", response_model=MessageResponse)
|
|
async def disable_2fa(
|
|
payload: TwoFactorCodeRequest,
|
|
db: AsyncSession = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
) -> MessageResponse:
|
|
await disable_twofa(db, current_user, code=payload.code)
|
|
return MessageResponse(message="2FA disabled")
|