diff --git a/app/chats/repository.py b/app/chats/repository.py index f949aa9..ba83425 100644 --- a/app/chats/repository.py +++ b/app/chats/repository.py @@ -1,4 +1,5 @@ from sqlalchemy import Select, select +from sqlalchemy.orm import aliased from sqlalchemy.ext.asyncio import AsyncSession from app.chats.models import Chat, ChatMember, ChatMemberRole, ChatType @@ -60,3 +61,21 @@ async def list_user_chat_ids(db: AsyncSession, *, user_id: int) -> list[int]: select(ChatMember.chat_id).where(ChatMember.user_id == user_id).order_by(ChatMember.chat_id.asc()) ) return list(result.scalars().all()) + + +async def find_private_chat_between_users(db: AsyncSession, *, user_a_id: int, user_b_id: int) -> Chat | None: + cm_a = aliased(ChatMember) + cm_b = aliased(ChatMember) + stmt = ( + select(Chat) + .join(cm_a, cm_a.chat_id == Chat.id) + .join(cm_b, cm_b.chat_id == Chat.id) + .where( + Chat.type == ChatType.PRIVATE, + cm_a.user_id == user_a_id, + cm_b.user_id == user_b_id, + ) + .limit(1) + ) + result = await db.execute(stmt) + return result.scalar_one_or_none() diff --git a/app/chats/service.py b/app/chats/service.py index ca1ce27..7226958 100644 --- a/app/chats/service.py +++ b/app/chats/service.py @@ -16,6 +16,14 @@ async def create_chat_for_user(db: AsyncSession, *, creator_id: int, payload: Ch status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Private chat requires exactly one target user.", ) + if payload.type == ChatType.PRIVATE: + existing_chat = await repository.find_private_chat_between_users( + db, + user_a_id=creator_id, + user_b_id=member_ids[0], + ) + if existing_chat: + return existing_chat if payload.type in {ChatType.GROUP, ChatType.CHANNEL} and not payload.title: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, diff --git a/app/messages/router.py b/app/messages/router.py index 72414f3..3a4181d 100644 --- a/app/messages/router.py +++ b/app/messages/router.py @@ -5,6 +5,7 @@ from app.auth.service import get_current_user from app.database.session import get_db from app.messages.schemas import MessageCreateRequest, MessageRead, MessageUpdateRequest from app.messages.service import create_chat_message, delete_message, get_messages, update_message +from app.realtime.service import realtime_gateway from app.users.models import User router = APIRouter(prefix="/messages", tags=["messages"]) @@ -16,7 +17,9 @@ async def create_message( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ) -> MessageRead: - return await create_chat_message(db, sender_id=current_user.id, payload=payload) + message = await create_chat_message(db, sender_id=current_user.id, payload=payload) + await realtime_gateway.publish_message_created(message=message, sender_id=current_user.id) + return message @router.get("/{chat_id}", response_model=list[MessageRead]) diff --git a/app/realtime/service.py b/app/realtime/service.py index 15f7890..6c041db 100644 --- a/app/realtime/service.py +++ b/app/realtime/service.py @@ -83,17 +83,7 @@ class RealtimeGateway: sender_id=user_id, payload=MessageCreateRequest(chat_id=payload.chat_id, type=payload.type, text=payload.text), ) - message_data = MessageRead.model_validate(message).model_dump(mode="json") - await self._publish_chat_event( - payload.chat_id, - event="receive_message", - payload={ - "chat_id": payload.chat_id, - "message": message_data, - "temp_id": payload.temp_id, - "sender_id": user_id, - }, - ) + await self.publish_message_created(message=message, sender_id=user_id, temp_id=payload.temp_id) async def handle_typing_event(self, db: AsyncSession, user_id: int, payload: ChatEventPayload, event: str) -> None: await ensure_chat_membership(db, chat_id=payload.chat_id, user_id=user_id) @@ -149,6 +139,19 @@ class RealtimeGateway: return await self._handle_redis_event(f"chat:{chat_id}", event_payload) + async def publish_message_created(self, *, message, sender_id: int, temp_id: str | None = None) -> None: + message_data = MessageRead.model_validate(message).model_dump(mode="json") + await self._publish_chat_event( + message.chat_id, + event="receive_message", + payload={ + "chat_id": message.chat_id, + "message": message_data, + "temp_id": temp_id, + "sender_id": sender_id, + }, + ) + async def _send_user_event(self, user_id: int, event: OutgoingRealtimeEvent) -> None: user_connections = self._connections.get(user_id, {}) if not user_connections: diff --git a/web/src/api/chats.ts b/web/src/api/chats.ts index dd430c6..2dd5f53 100644 --- a/web/src/api/chats.ts +++ b/web/src/api/chats.ts @@ -1,5 +1,6 @@ import { http } from "./http"; import type { Chat, ChatType, Message, MessageType } from "../chat/types"; +import axios from "axios"; export async function getChats(): Promise { const { data } = await http.get("/chats"); @@ -51,6 +52,23 @@ export async function requestUploadUrl(file: File): Promise { return data; } +export async function uploadToPresignedUrl( + uploadUrl: string, + requiredHeaders: Record, + file: File, + onProgress?: (percent: number) => void +): Promise { + await axios.put(uploadUrl, file, { + headers: requiredHeaders, + onUploadProgress: (progressEvent) => { + if (!onProgress || !progressEvent.total) { + return; + } + onProgress(Math.round((progressEvent.loaded * 100) / progressEvent.total)); + } + }); +} + export async function attachFile(messageId: number, fileUrl: string, fileType: string, fileSize: number): Promise { await http.post("/media/attachments", { message_id: messageId, diff --git a/web/src/components/MessageComposer.tsx b/web/src/components/MessageComposer.tsx index d9ad656..bfc1458 100644 --- a/web/src/components/MessageComposer.tsx +++ b/web/src/components/MessageComposer.tsx @@ -1,5 +1,5 @@ -import { useRef, useState } from "react"; -import { attachFile, requestUploadUrl, sendMessage } from "../api/chats"; +import { useEffect, useRef, useState } from "react"; +import { attachFile, requestUploadUrl, sendMessage, uploadToPresignedUrl } from "../api/chats"; import { useAuthStore } from "../store/authStore"; import { useChatStore } from "../store/chatStore"; import { buildWsUrl } from "../utils/ws"; @@ -12,6 +12,21 @@ export function MessageComposer() { const wsRef = useRef(null); const recorderRef = useRef(null); const chunksRef = useRef([]); + const [isUploading, setIsUploading] = useState(false); + const [uploadProgress, setUploadProgress] = useState(0); + const [uploadError, setUploadError] = useState(null); + const [selectedFile, setSelectedFile] = useState(null); + const [selectedType, setSelectedType] = useState<"file" | "image" | "video" | "audio">("file"); + const [previewUrl, setPreviewUrl] = useState(null); + const [isRecording, setIsRecording] = useState(false); + + useEffect(() => { + return () => { + if (previewUrl) { + URL.revokeObjectURL(previewUrl); + } + }; + }, [previewUrl]); function getWs(): WebSocket | null { if (!accessToken || !activeChatId) { @@ -40,34 +55,109 @@ export function MessageComposer() { if (!activeChatId) { return; } - const upload = await requestUploadUrl(file); - await fetch(upload.upload_url, { - method: "PUT", - headers: upload.required_headers, - body: file - }); - const message = await sendMessage(activeChatId, upload.file_url, messageType); - await attachFile(message.id, upload.file_url, file.type || "application/octet-stream", file.size); - prependMessage(activeChatId, message); + setIsUploading(true); + setUploadProgress(0); + setUploadError(null); + try { + const upload = await requestUploadUrl(file); + await uploadToPresignedUrl(upload.upload_url, upload.required_headers, file, setUploadProgress); + const message = await sendMessage(activeChatId, upload.file_url, messageType); + await attachFile(message.id, upload.file_url, file.type || "application/octet-stream", file.size); + prependMessage(activeChatId, message); + } catch { + setUploadError("Upload failed. Please try again."); + } finally { + setIsUploading(false); + } + } + + function inferType(file: File): "file" | "image" | "video" | "audio" { + if (file.type.startsWith("image/")) { + return "image"; + } + if (file.type.startsWith("video/")) { + return "video"; + } + if (file.type.startsWith("audio/")) { + return "audio"; + } + return "file"; } async function startRecord() { - const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); - const recorder = new MediaRecorder(stream); - chunksRef.current = []; - recorder.ondataavailable = (e) => chunksRef.current.push(e.data); - recorder.onstop = async () => { - const blob = new Blob(chunksRef.current, { type: "audio/webm" }); - const file = new File([blob], `voice-${Date.now()}.webm`, { type: "audio/webm" }); - await handleUpload(file, "voice"); - }; - recorderRef.current = recorder; - recorder.start(); + try { + const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); + const recorder = new MediaRecorder(stream); + chunksRef.current = []; + recorder.ondataavailable = (e) => chunksRef.current.push(e.data); + recorder.onstop = async () => { + const blob = new Blob(chunksRef.current, { type: "audio/webm" }); + const file = new File([blob], `voice-${Date.now()}.webm`, { type: "audio/webm" }); + setIsRecording(false); + await handleUpload(file, "voice"); + }; + recorderRef.current = recorder; + recorder.start(); + setIsRecording(true); + } catch { + setUploadError("Microphone access denied."); + } } function stopRecord() { recorderRef.current?.stop(); recorderRef.current = null; + setIsRecording(false); + } + + function selectFile(file: File) { + setUploadError(null); + setSelectedFile(file); + const fileType = inferType(file); + setSelectedType(fileType); + if (previewUrl) { + URL.revokeObjectURL(previewUrl); + } + if (fileType === "image" || fileType === "video") { + setPreviewUrl(URL.createObjectURL(file)); + } else { + setPreviewUrl(null); + } + } + + async function sendSelectedFile() { + if (!selectedFile) { + return; + } + await handleUpload(selectedFile, selectedType); + if (previewUrl) { + URL.revokeObjectURL(previewUrl); + } + setSelectedFile(null); + setPreviewUrl(null); + setSelectedType("file"); + setUploadProgress(0); + } + + function cancelSelectedFile() { + if (previewUrl) { + URL.revokeObjectURL(previewUrl); + } + setSelectedFile(null); + setPreviewUrl(null); + setSelectedType("file"); + setUploadProgress(0); + setUploadError(null); + } + + function formatBytes(size: number): string { + if (size < 1024) { + return `${size} B`; + } + if (size < 1024 * 1024) { + return `${(size / 1024).toFixed(1)} KB`; + } + return `${(size / (1024 * 1024)).toFixed(1)} MB`; } return ( @@ -89,24 +179,60 @@ export function MessageComposer() { Send + {selectedFile ? ( +
+
Ready to send
+
{selectedFile.name}
+
{formatBytes(selectedFile.size)}
+ {previewUrl && selectedType === "image" ? ( + {selectedFile.name} + ) : null} + {previewUrl && selectedType === "video" ? ( +