privacy/security: add PM privacy levels and improve session visibility
All checks were successful
CI / test (push) Successful in 24s
All checks were successful
CI / test (push) Successful in 24s
This commit is contained in:
@@ -23,6 +23,7 @@ from app.auth.service import (
|
||||
disable_twofa,
|
||||
enable_twofa,
|
||||
get_current_user,
|
||||
get_access_session_info,
|
||||
get_email_sender,
|
||||
get_request_metadata,
|
||||
login_user,
|
||||
@@ -37,6 +38,7 @@ from app.auth.service import (
|
||||
reset_password,
|
||||
setup_twofa,
|
||||
verify_email,
|
||||
oauth2_scheme,
|
||||
)
|
||||
from app.database.session import get_db
|
||||
from app.email.service import EmailService
|
||||
@@ -147,7 +149,10 @@ async def me(current_user: User = Depends(get_current_user)) -> AuthUserResponse
|
||||
|
||||
|
||||
@router.get("/sessions", response_model=list[SessionRead])
|
||||
async def list_sessions(current_user: User = Depends(get_current_user)) -> list[SessionRead]:
|
||||
async def list_sessions(
|
||||
current_user: User = Depends(get_current_user),
|
||||
access_token: str = Depends(oauth2_scheme),
|
||||
) -> list[SessionRead]:
|
||||
sessions = await list_user_sessions(current_user.id)
|
||||
out: list[SessionRead] = []
|
||||
for item in sessions:
|
||||
@@ -157,8 +162,23 @@ async def list_sessions(current_user: User = Depends(get_current_user)) -> list[
|
||||
created_at=datetime.fromtimestamp(item.created_at, tz=timezone.utc),
|
||||
ip_address=item.ip_address,
|
||||
user_agent=item.user_agent,
|
||||
current=False,
|
||||
token_type="refresh",
|
||||
)
|
||||
)
|
||||
access_session = get_access_session_info(access_token)
|
||||
if access_session and all(item.jti != access_session[0] for item in out):
|
||||
out.insert(
|
||||
0,
|
||||
SessionRead(
|
||||
jti=access_session[0],
|
||||
created_at=access_session[1],
|
||||
ip_address=None,
|
||||
user_agent="Current access token",
|
||||
current=True,
|
||||
token_type="access",
|
||||
),
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, EmailStr, Field
|
||||
from app.users.schemas import GroupInvitePrivacyLevel, PrivacyLevel
|
||||
from app.users.schemas import GroupInvitePrivacyLevel, PrivacyLevel, PrivateMessagesPrivacyLevel
|
||||
|
||||
|
||||
class RegisterRequest(BaseModel):
|
||||
@@ -60,6 +60,7 @@ class AuthUserResponse(BaseModel):
|
||||
email_verified: bool
|
||||
twofa_enabled: bool
|
||||
allow_private_messages: bool = True
|
||||
privacy_private_messages: PrivateMessagesPrivacyLevel = "everyone"
|
||||
privacy_last_seen: PrivacyLevel = "everyone"
|
||||
privacy_avatar: PrivacyLevel = "everyone"
|
||||
privacy_group_invites: GroupInvitePrivacyLevel = "everyone"
|
||||
@@ -72,6 +73,8 @@ class SessionRead(BaseModel):
|
||||
created_at: datetime
|
||||
ip_address: str | None = None
|
||||
user_agent: str | None = None
|
||||
current: bool = False
|
||||
token_type: str = "refresh"
|
||||
|
||||
|
||||
class TwoFactorSetupRead(BaseModel):
|
||||
|
||||
@@ -244,6 +244,20 @@ def get_request_metadata(request: Request) -> tuple[str | None, str | None]:
|
||||
return ip_address, user_agent
|
||||
|
||||
|
||||
def get_access_session_info(token: str) -> tuple[str, datetime] | None:
|
||||
try:
|
||||
payload = decode_token(token)
|
||||
except ValueError:
|
||||
return None
|
||||
if payload.get("type") != "access":
|
||||
return None
|
||||
jti = payload.get("jti")
|
||||
if not isinstance(jti, str) or not jti:
|
||||
return None
|
||||
issued_at = _token_issued_at(payload) or datetime.now(timezone.utc)
|
||||
return jti, issued_at
|
||||
|
||||
|
||||
async def setup_twofa(db: AsyncSession, user: User) -> tuple[str, str]:
|
||||
if user.twofa_enabled and user.twofa_secret:
|
||||
secret = user.twofa_secret
|
||||
|
||||
Reference in New Issue
Block a user