import re from urllib.parse import quote from uuid import uuid4 import boto3 from botocore.client import Config from botocore.exceptions import BotoCoreError, ClientError from fastapi import HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.config.settings import settings from app.media import repository from app.media.schemas import AttachmentCreateRequest, AttachmentRead, UploadUrlRequest, UploadUrlResponse from app.messages.repository import get_message_by_id ALLOWED_MIME_TYPES = { "image/jpeg", "image/png", "image/webp", "video/mp4", "video/webm", "audio/mpeg", "audio/ogg", "audio/wav", "application/pdf", "application/zip", "text/plain", } _SAFE_NAME_RE = re.compile(r"[^a-zA-Z0-9._-]+") def _sanitize_filename(file_name: str) -> str: sanitized = _SAFE_NAME_RE.sub("_", file_name).strip("._") if not sanitized: sanitized = "file.bin" return sanitized[:120] def _build_file_url(bucket: str, object_key: str) -> str: base = (settings.s3_public_endpoint_url or settings.s3_endpoint_url).rstrip("/") encoded_key = quote(object_key) return f"{base}/{bucket}/{encoded_key}" def _allowed_file_url_prefixes() -> tuple[str, ...]: endpoints = [settings.s3_endpoint_url] if settings.s3_public_endpoint_url: endpoints.append(settings.s3_public_endpoint_url) return tuple(f"{endpoint.rstrip('/')}/{settings.s3_bucket_name}/" for endpoint in endpoints) def _validate_media(file_type: str, file_size: int) -> None: if file_type not in ALLOWED_MIME_TYPES: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Unsupported file type") if file_size > settings.max_upload_size_bytes: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="File size exceeds limit") def _get_s3_client(endpoint_url: str): return boto3.client( "s3", endpoint_url=endpoint_url, aws_access_key_id=settings.s3_access_key, aws_secret_access_key=settings.s3_secret_key, region_name=settings.s3_region, config=Config(signature_version="s3v4", s3={"addressing_style": "path"}), ) async def generate_upload_url(payload: UploadUrlRequest) -> UploadUrlResponse: _validate_media(payload.file_type, payload.file_size) file_name = _sanitize_filename(payload.file_name) object_key = f"uploads/{uuid4()}-{file_name}" bucket = settings.s3_bucket_name try: presign_endpoint = settings.s3_public_endpoint_url or settings.s3_endpoint_url s3_client = _get_s3_client(presign_endpoint) upload_url = s3_client.generate_presigned_url( "put_object", Params={ "Bucket": bucket, "Key": object_key, "ContentType": payload.file_type, }, ExpiresIn=settings.s3_presign_expire_seconds, HttpMethod="PUT", ) except (BotoCoreError, ClientError) as exc: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Storage service unavailable") from exc return UploadUrlResponse( upload_url=upload_url, file_url=_build_file_url(bucket, object_key), object_key=object_key, expires_in=settings.s3_presign_expire_seconds, required_headers={"Content-Type": payload.file_type}, ) async def store_attachment_metadata( db: AsyncSession, *, user_id: int, payload: AttachmentCreateRequest, ) -> AttachmentRead: _validate_media(payload.file_type, payload.file_size) if not payload.file_url.startswith(_allowed_file_url_prefixes()): raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid file URL") message = await get_message_by_id(db, payload.message_id) if not message: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Message not found") if message.sender_id != user_id: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the message sender can attach files", ) attachment = await repository.create_attachment( db, message_id=payload.message_id, file_url=payload.file_url, file_type=payload.file_type, file_size=payload.file_size, ) await db.commit() await db.refresh(attachment) return AttachmentRead.model_validate(attachment)