diff --git a/app/realtime/router.py b/app/realtime/router.py index f8adce4..34e0533 100644 --- a/app/realtime/router.py +++ b/app/realtime/router.py @@ -40,6 +40,15 @@ async def websocket_gateway(websocket: WebSocket) -> None: raw_data = await websocket.receive_json() try: event = IncomingRealtimeEvent.model_validate(raw_data) + if event.event == "ping": + await websocket.send_json( + OutgoingRealtimeEvent( + event="pong", + payload={}, + timestamp=datetime.now(timezone.utc), + ).model_dump(mode="json") + ) + continue await _dispatch_event(db, user.id, event) except ValidationError: await websocket.send_json( diff --git a/app/realtime/schemas.py b/app/realtime/schemas.py index 4ffb6b2..82de8bb 100644 --- a/app/realtime/schemas.py +++ b/app/realtime/schemas.py @@ -17,6 +17,7 @@ RealtimeEventName = Literal[ "message_delivered", "user_online", "user_offline", + "pong", "error", ] @@ -40,7 +41,7 @@ class MessageStatusPayload(BaseModel): class IncomingRealtimeEvent(BaseModel): - event: Literal["send_message", "typing_start", "typing_stop", "message_read", "message_delivered"] + event: Literal["send_message", "typing_start", "typing_stop", "message_read", "message_delivered", "ping"] payload: dict[str, Any] diff --git a/web/src/hooks/useRealtime.ts b/web/src/hooks/useRealtime.ts index 444da73..56fac91 100644 --- a/web/src/hooks/useRealtime.ts +++ b/web/src/hooks/useRealtime.ts @@ -16,6 +16,9 @@ export function useRealtime() { const typingByChat = useRef>>({}); const wsRef = useRef(null); const reconnectTimeoutRef = useRef(null); + const heartbeatIntervalRef = useRef(null); + const watchdogIntervalRef = useRef(null); + const lastPongAtRef = useRef(Date.now()); const reconnectAttemptsRef = useRef(0); const manualCloseRef = useRef(false); @@ -42,6 +45,23 @@ export function useRealtime() { ws.onopen = () => { reconnectAttemptsRef.current = 0; + lastPongAtRef.current = Date.now(); + if (heartbeatIntervalRef.current !== null) { + window.clearInterval(heartbeatIntervalRef.current); + } + if (watchdogIntervalRef.current !== null) { + window.clearInterval(watchdogIntervalRef.current); + } + heartbeatIntervalRef.current = window.setInterval(() => { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ event: "ping", payload: {} })); + } + }, 20000); + watchdogIntervalRef.current = window.setInterval(() => { + if (Date.now() - lastPongAtRef.current > 65000 && ws.readyState === WebSocket.OPEN) { + ws.close(); + } + }, 15000); void useChatStore.getState().loadChats(); }; @@ -80,6 +100,9 @@ export function useRealtime() { void chatStore.loadChats(); } } + if (event.event === "pong") { + lastPongAtRef.current = Date.now(); + } if (event.event === "typing_start") { const chatId = Number(event.payload.chat_id); const userId = Number(event.payload.user_id); @@ -143,6 +166,14 @@ export function useRealtime() { }; ws.onclose = () => { + if (heartbeatIntervalRef.current !== null) { + window.clearInterval(heartbeatIntervalRef.current); + heartbeatIntervalRef.current = null; + } + if (watchdogIntervalRef.current !== null) { + window.clearInterval(watchdogIntervalRef.current); + watchdogIntervalRef.current = null; + } if (manualCloseRef.current) { return; } @@ -160,6 +191,14 @@ export function useRealtime() { return () => { manualCloseRef.current = true; + if (heartbeatIntervalRef.current !== null) { + window.clearInterval(heartbeatIntervalRef.current); + heartbeatIntervalRef.current = null; + } + if (watchdogIntervalRef.current !== null) { + window.clearInterval(watchdogIntervalRef.current); + watchdogIntervalRef.current = null; + } if (reconnectTimeoutRef.current !== null) { window.clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null;