from fastapi import HTTPException, status from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.chats import repository as chats_repository from app.chats.models import ChatMemberRole, ChatType from app.chats.service import ensure_chat_membership from app.messages import repository from app.messages.models import Message from app.messages.spam_guard import enforce_message_spam_policy from app.messages.schemas import MessageCreateRequest, MessageForwardRequest, MessageStatusUpdateRequest, MessageUpdateRequest from app.notifications.service import dispatch_message_notifications async def create_chat_message(db: AsyncSession, *, sender_id: int, payload: MessageCreateRequest) -> Message: await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=sender_id) if payload.reply_to_message_id is not None: reply_to = await repository.get_message_by_id(db, payload.reply_to_message_id) if not reply_to or reply_to.chat_id != payload.chat_id: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid reply target") if payload.client_message_id: existing = await repository.get_message_by_client_message_id( db, chat_id=payload.chat_id, sender_id=sender_id, client_message_id=payload.client_message_id, ) if existing: return existing if payload.type.value == "text" and not (payload.text and payload.text.strip()): raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Text message cannot be empty") await enforce_message_spam_policy(user_id=sender_id, chat_id=payload.chat_id, text=payload.text) try: message = await repository.create_message( db, chat_id=payload.chat_id, sender_id=sender_id, reply_to_message_id=payload.reply_to_message_id, forwarded_from_message_id=None, message_type=payload.type, text=payload.text, ) if payload.client_message_id: await repository.create_message_idempotency_key( db, chat_id=payload.chat_id, sender_id=sender_id, client_message_id=payload.client_message_id, message_id=message.id, ) await db.commit() await db.refresh(message) except IntegrityError: await db.rollback() if payload.client_message_id: existing = await repository.get_message_by_client_message_id( db, chat_id=payload.chat_id, sender_id=sender_id, client_message_id=payload.client_message_id, ) if existing: return existing raise try: await dispatch_message_notifications(db, message) except Exception: # Notifications should not block message delivery. pass return message async def get_messages( db: AsyncSession, *, chat_id: int, user_id: int, limit: int = 50, before_id: int | None = None, ) -> list[Message]: await ensure_chat_membership(db, chat_id=chat_id, user_id=user_id) safe_limit = max(1, min(limit, 100)) return await repository.list_chat_messages(db, chat_id, user_id=user_id, limit=safe_limit, before_id=before_id) async def search_messages( db: AsyncSession, *, user_id: int, query: str, chat_id: int | None = None, limit: int = 50, ) -> list[Message]: normalized = query.strip() if len(normalized) < 2: return [] safe_limit = max(1, min(limit, 100)) if chat_id is not None: await ensure_chat_membership(db, chat_id=chat_id, user_id=user_id) return await repository.search_messages( db, user_id=user_id, query=normalized, chat_id=chat_id, limit=safe_limit, ) async def update_message( db: AsyncSession, *, message_id: int, user_id: int, payload: MessageUpdateRequest, ) -> Message: message = await repository.get_message_by_id(db, message_id) if not message: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found") await ensure_chat_membership(db, chat_id=message.chat_id, user_id=user_id) if message.sender_id != user_id: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You can edit only your own messages") message.text = payload.text await db.commit() await db.refresh(message) return message async def delete_message(db: AsyncSession, *, message_id: int, user_id: int) -> None: message = await repository.get_message_by_id(db, message_id) if not message: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found") await ensure_chat_membership(db, chat_id=message.chat_id, user_id=user_id) chat = await chats_repository.get_chat_by_id(db, message.chat_id) if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") membership = await chats_repository.get_chat_member(db, chat_id=message.chat_id, user_id=user_id) if not membership: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You are not a member of this chat") # Telegram-like default: delete only for current user. hidden = await repository.get_hidden_message(db, message_id=message.id, user_id=user_id) if not hidden: try: await repository.hide_message_for_user(db, message_id=message.id, user_id=user_id) except IntegrityError: await db.rollback() return await db.commit() async def delete_message_for_all(db: AsyncSession, *, message_id: int, user_id: int) -> None: message = await repository.get_message_by_id(db, message_id) if not message: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found") await ensure_chat_membership(db, chat_id=message.chat_id, user_id=user_id) chat = await chats_repository.get_chat_by_id(db, message.chat_id) if not chat: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Chat not found") membership = await chats_repository.get_chat_member(db, chat_id=message.chat_id, user_id=user_id) if not membership: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="You are not a member of this chat") if chat.is_saved: await delete_message(db, message_id=message_id, user_id=user_id) return can_delete_for_all = False if chat.type == ChatType.PRIVATE: can_delete_for_all = True elif message.sender_id == user_id: can_delete_for_all = True elif chat.type in {ChatType.GROUP, ChatType.CHANNEL} and membership.role in {ChatMemberRole.OWNER, ChatMemberRole.ADMIN}: can_delete_for_all = True if not can_delete_for_all: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions for delete-for-all") await repository.delete_message(db, message) await db.commit() async def mark_message_status( db: AsyncSession, *, user_id: int, payload: MessageStatusUpdateRequest, ) -> dict[str, int]: await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=user_id) message = await repository.get_message_by_id(db, payload.message_id) if not message or message.chat_id != payload.chat_id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found") receipt = await repository.get_message_receipt(db, chat_id=payload.chat_id, user_id=user_id) if not receipt: last_delivered = payload.message_id if payload.status in {"message_delivered", "message_read"} else None last_read = payload.message_id if payload.status == "message_read" else None receipt = await repository.create_message_receipt( db, chat_id=payload.chat_id, user_id=user_id, last_delivered_message_id=last_delivered, last_read_message_id=last_read, ) else: if payload.status in {"message_delivered", "message_read"}: current = receipt.last_delivered_message_id or 0 receipt.last_delivered_message_id = max(current, payload.message_id) if payload.status == "message_read": current_read = receipt.last_read_message_id or 0 receipt.last_read_message_id = max(current_read, payload.message_id) await db.commit() await db.refresh(receipt) return { "chat_id": payload.chat_id, "message_id": payload.message_id, "last_delivered_message_id": receipt.last_delivered_message_id or 0, "last_read_message_id": receipt.last_read_message_id or 0, } async def forward_message( db: AsyncSession, *, source_message_id: int, sender_id: int, payload: MessageForwardRequest, ) -> Message: source = await repository.get_message_by_id(db, source_message_id) if not source: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Source message not found") await ensure_chat_membership(db, chat_id=source.chat_id, user_id=sender_id) await ensure_chat_membership(db, chat_id=payload.target_chat_id, user_id=sender_id) forwarded = await repository.create_message( db, chat_id=payload.target_chat_id, sender_id=sender_id, reply_to_message_id=None, forwarded_from_message_id=source.id, message_type=source.type, text=source.text, ) await db.commit() await db.refresh(forwarded) return forwarded