167 lines
6.4 KiB
Python
167 lines
6.4 KiB
Python
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.users import repository
|
|
from app.users.models import User
|
|
from app.users.schemas import UserRead, UserSearchRead
|
|
|
|
|
|
async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
|
|
return await repository.get_user_by_id(db, user_id)
|
|
|
|
|
|
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
|
|
return await repository.get_user_by_email(db, email)
|
|
|
|
|
|
async def get_user_by_username(db: AsyncSession, username: str) -> User | None:
|
|
return await repository.get_user_by_username(db, username)
|
|
|
|
|
|
async def search_users_by_username(
|
|
db: AsyncSession,
|
|
*,
|
|
query: str,
|
|
limit: int = 20,
|
|
exclude_user_id: int | None = None,
|
|
) -> list[User]:
|
|
safe_limit = max(1, min(limit, 50))
|
|
return await repository.search_users_by_username(
|
|
db,
|
|
query=query,
|
|
limit=safe_limit,
|
|
exclude_user_id=exclude_user_id,
|
|
)
|
|
|
|
|
|
async def update_user_profile(
|
|
db: AsyncSession,
|
|
user: User,
|
|
*,
|
|
name: str | None = None,
|
|
username: str | None = None,
|
|
bio: str | None = None,
|
|
avatar_url: str | None = None,
|
|
allow_private_messages: bool | None = None,
|
|
privacy_private_messages: str | None = None,
|
|
privacy_last_seen: str | None = None,
|
|
privacy_avatar: str | None = None,
|
|
privacy_group_invites: str | None = None,
|
|
) -> User:
|
|
if name is not None:
|
|
user.name = name
|
|
if username is not None:
|
|
user.username = username
|
|
if bio is not None:
|
|
user.bio = bio
|
|
if avatar_url is not None:
|
|
user.avatar_url = avatar_url
|
|
if allow_private_messages is not None:
|
|
user.allow_private_messages = allow_private_messages
|
|
if privacy_private_messages is None:
|
|
user.privacy_private_messages = "everyone" if allow_private_messages else "nobody"
|
|
if privacy_private_messages is not None:
|
|
user.privacy_private_messages = privacy_private_messages
|
|
user.allow_private_messages = privacy_private_messages != "nobody"
|
|
if privacy_last_seen is not None:
|
|
user.privacy_last_seen = privacy_last_seen
|
|
if privacy_avatar is not None:
|
|
user.privacy_avatar = privacy_avatar
|
|
if privacy_group_invites is not None:
|
|
user.privacy_group_invites = privacy_group_invites
|
|
await db.commit()
|
|
await db.refresh(user)
|
|
return user
|
|
|
|
|
|
async def block_user(db: AsyncSession, *, user_id: int, blocked_user_id: int) -> None:
|
|
await repository.block_user(db, user_id=user_id, blocked_user_id=blocked_user_id)
|
|
await db.commit()
|
|
|
|
|
|
async def unblock_user(db: AsyncSession, *, user_id: int, blocked_user_id: int) -> None:
|
|
await repository.unblock_user(db, user_id=user_id, blocked_user_id=blocked_user_id)
|
|
await db.commit()
|
|
|
|
|
|
async def has_block_relation_between_users(db: AsyncSession, *, user_a_id: int, user_b_id: int) -> bool:
|
|
return await repository.has_block_relation_between_users(db, user_a_id=user_a_id, user_b_id=user_b_id)
|
|
|
|
|
|
async def list_blocked_users(db: AsyncSession, *, user_id: int) -> list[User]:
|
|
return await repository.list_blocked_users(db, user_id=user_id)
|
|
|
|
|
|
async def add_contact(db: AsyncSession, *, user_id: int, contact_user_id: int) -> None:
|
|
await repository.add_contact(db, user_id=user_id, contact_user_id=contact_user_id)
|
|
await db.commit()
|
|
|
|
|
|
async def remove_contact(db: AsyncSession, *, user_id: int, contact_user_id: int) -> None:
|
|
await repository.remove_contact(db, user_id=user_id, contact_user_id=contact_user_id)
|
|
await db.commit()
|
|
|
|
|
|
async def list_contacts(db: AsyncSession, *, user_id: int) -> list[User]:
|
|
return await repository.list_contacts(db, user_id=user_id)
|
|
|
|
|
|
async def can_view_user_avatar(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> bool:
|
|
if target_user.id == viewer_user_id:
|
|
return True
|
|
if target_user.privacy_avatar == "everyone":
|
|
return True
|
|
if target_user.privacy_avatar == "nobody":
|
|
return False
|
|
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=viewer_user_id)
|
|
|
|
|
|
async def can_view_user_last_seen(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> bool:
|
|
if target_user.id == viewer_user_id:
|
|
return True
|
|
if target_user.privacy_last_seen == "everyone":
|
|
return True
|
|
if target_user.privacy_last_seen == "nobody":
|
|
return False
|
|
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=viewer_user_id)
|
|
|
|
|
|
async def can_invite_user_to_groups(db: AsyncSession, *, target_user: User, actor_user_id: int) -> bool:
|
|
if target_user.id == actor_user_id:
|
|
return False
|
|
if target_user.privacy_group_invites == "everyone":
|
|
return True
|
|
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=actor_user_id)
|
|
|
|
|
|
async def can_user_receive_private_messages(db: AsyncSession, *, target_user: User, actor_user_id: int) -> bool:
|
|
if target_user.id == actor_user_id:
|
|
return True
|
|
policy = target_user.privacy_private_messages or ("everyone" if target_user.allow_private_messages else "nobody")
|
|
if policy == "everyone":
|
|
return True
|
|
if policy == "nobody":
|
|
return False
|
|
return await repository.is_user_in_contacts(db, owner_user_id=target_user.id, candidate_user_id=actor_user_id)
|
|
|
|
|
|
async def serialize_user_for_viewer(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> UserRead:
|
|
payload = UserRead.model_validate(target_user).model_dump()
|
|
payload["allow_private_messages"] = bool(target_user.privacy_private_messages != "nobody")
|
|
if not await can_view_user_avatar(db, target_user=target_user, viewer_user_id=viewer_user_id):
|
|
payload["avatar_url"] = None
|
|
if target_user.id != viewer_user_id:
|
|
payload["allow_private_messages"] = True
|
|
payload["privacy_private_messages"] = "everyone"
|
|
payload["privacy_last_seen"] = "everyone"
|
|
payload["privacy_avatar"] = "everyone"
|
|
payload["privacy_group_invites"] = "everyone"
|
|
payload["twofa_enabled"] = False
|
|
return UserRead.model_validate(payload)
|
|
|
|
|
|
async def serialize_user_search_for_viewer(db: AsyncSession, *, target_user: User, viewer_user_id: int) -> UserSearchRead:
|
|
payload = UserSearchRead.model_validate(target_user).model_dump()
|
|
if not await can_view_user_avatar(db, target_user=target_user, viewer_user_id=viewer_user_id):
|
|
payload["avatar_url"] = None
|
|
return UserSearchRead.model_validate(payload)
|