Files
Messenger/app/users/service.py
benya 76cc5e0f12
All checks were successful
CI / test (push) Successful in 24s
privacy/security: add PM privacy levels and improve session visibility
2026-03-08 14:26:19 +03:00

167 lines
6.4 KiB
Python

from sqlalchemy.ext.asyncio import AsyncSession
from app.users import repository
from app.users.models import User
from app.users.schemas import UserRead, UserSearchRead
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
return await repository.get_user_by_id(db, user_id)
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
return await repository.get_user_by_email(db, email)
async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
return await repository.get_user_by_username(db, username)
async def search_users_by_username(
db: AsyncSession,
*,
query: str,
limit: int = 20,
exclude_user_id: int | None = None,
) -> list[User]:
safe_limit = max(1, min(limit, 50))
return await repository.search_users_by_username(
db,
query=query,
limit=safe_limit,
exclude_user_id=exclude_user_id,
)
async def update_user_profile(
db: AsyncSession,
user: User,
*,
name: str | None = None,
username: str | None = None,
bio: str | None = None,
avatar_url: str | None = None,
allow_private_messages: bool | None = None,
privacy_private_messages: str | None = None,
privacy_last_seen: str | None = None,
privacy_avatar: str | None = None,
privacy_group_invites: str | None = None,
) -> User:
if name is not None:
user.name = name
if username is not None:
user.username = username
if bio is not None:
user.bio = bio
if avatar_url is not None:
user.avatar_url = avatar_url
if allow_private_messages is not None:
user.allow_private_messages = allow_private_messages
if privacy_private_messages is None:
user.privacy_private_messages = "everyone" if allow_private_messages else "nobody"
if privacy_private_messages is not None:
user.privacy_private_messages = privacy_private_messages
user.allow_private_messages = privacy_private_messages != "nobody"
if privacy_last_seen is not None:
user.privacy_last_seen = privacy_last_seen
if privacy_avatar is not None:
user.privacy_avatar = privacy_avatar
if privacy_group_invites is not None:
user.privacy_group_invites = privacy_group_invites
await db.commit()
await db.refresh(user)
return user
async def block_user(db: AsyncSession, *, user_id: int, blocked_user_id: int) -> None:
await repository.block_user(db, user_id=user_id, blocked_user_id=blocked_user_id)
await db.commit()
async def unblock_user(db: AsyncSession, *, user_id: int, blocked_user_id: int) -> None:
await repository.unblock_user(db, user_id=user_id, blocked_user_id=blocked_user_id)
await db.commit()
async def has_block_relation_between_users(db: AsyncSession, *, user_a_id: int, user_b_id: int) -> bool:
return await repository.has_block_relation_between_users(db, user_a_id=user_a_id, user_b_id=user_b_id)
async def list_blocked_users(db: AsyncSession, *, user_id: int) -> list[User]:
return await repository.list_blocked_users(db, user_id=user_id)
async def add_contact(db: AsyncSession, *, user_id: int, contact_user_id: int) -> None:
await repository.add_contact(db, user_id=user_id, contact_user_id=contact_user_id)
await db.commit()
async def remove_contact(db: AsyncSession, *, user_id: int, contact_user_id: int) -> None:
await repository.remove_contact(db, user_id=user_id, contact_user_id=contact_user_id)
await db.commit()
async def list_contacts(db: AsyncSession, *, user_id: int) -> list[User]:
return await repository.list_contacts(db, user_id=user_id)
async def can_view_user_avatar(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> bool:
if target_user.id == viewer_user_id:
return True
if target_user.privacy_avatar == "everyone":
return True
if target_user.privacy_avatar == "nobody":
return False
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=viewer_user_id)
async def can_view_user_last_seen(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> bool:
if target_user.id == viewer_user_id:
return True
if target_user.privacy_last_seen == "everyone":
return True
if target_user.privacy_last_seen == "nobody":
return False
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=viewer_user_id)
async def can_invite_user_to_groups(db: AsyncSession, *, target_user: User, actor_user_id: int) -> bool:
if target_user.id == actor_user_id:
return False
if target_user.privacy_group_invites == "everyone":
return True
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=actor_user_id)
async def can_user_receive_private_messages(db: AsyncSession, *, target_user: User, actor_user_id: int) -> bool:
if target_user.id == actor_user_id:
return True
policy = target_user.privacy_private_messages or ("everyone" if target_user.allow_private_messages else "nobody")
if policy == "everyone":
return True
if policy == "nobody":
return False
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=actor_user_id)
async def serialize_user_for_viewer(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> UserRead:
payload = UserRead.model_validate(target_user).model_dump()
payload["allow_private_messages"] = bool(target_user.privacy_private_messages != "nobody")
if not await can_view_user_avatar(db, target_user=target_user, viewer_user_id=viewer_user_id):
payload["avatar_url"] = None
if target_user.id != viewer_user_id:
payload["allow_private_messages"] = True
payload["privacy_private_messages"] = "everyone"
payload["privacy_last_seen"] = "everyone"
payload["privacy_avatar"] = "everyone"
payload["privacy_group_invites"] = "everyone"
payload["twofa_enabled"] = False
return UserRead.model_validate(payload)
async def serialize_user_search_for_viewer(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> UserSearchRead:
payload = UserSearchRead.model_validate(target_user).model_dump()
if not await can_view_user_avatar(db, target_user=target_user, viewer_user_id=viewer_user_id):
payload["avatar_url"] = None
return UserSearchRead.model_validate(payload)