feat(realtime): add ping/pong heartbeat and watchdog reconnect
Some checks failed
CI / test (push) Failing after 20s
Some checks failed
CI / test (push) Failing after 20s
- support ping incoming event and pong outgoing response - add web heartbeat interval to keep ws alive - add stale-connection watchdog to force reconnect on missing pong
This commit is contained in:
@@ -40,6 +40,15 @@ async def websocket_gateway(websocket: WebSocket) -> None:
|
|||||||
raw_data = await websocket.receive_json()
|
raw_data = await websocket.receive_json()
|
||||||
try:
|
try:
|
||||||
event = IncomingRealtimeEvent.model_validate(raw_data)
|
event = IncomingRealtimeEvent.model_validate(raw_data)
|
||||||
|
if event.event == "ping":
|
||||||
|
await websocket.send_json(
|
||||||
|
OutgoingRealtimeEvent(
|
||||||
|
event="pong",
|
||||||
|
payload={},
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
).model_dump(mode="json")
|
||||||
|
)
|
||||||
|
continue
|
||||||
await _dispatch_event(db, user.id, event)
|
await _dispatch_event(db, user.id, event)
|
||||||
except ValidationError:
|
except ValidationError:
|
||||||
await websocket.send_json(
|
await websocket.send_json(
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ RealtimeEventName = Literal[
|
|||||||
"message_delivered",
|
"message_delivered",
|
||||||
"user_online",
|
"user_online",
|
||||||
"user_offline",
|
"user_offline",
|
||||||
|
"pong",
|
||||||
"error",
|
"error",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -40,7 +41,7 @@ class MessageStatusPayload(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class IncomingRealtimeEvent(BaseModel):
|
class IncomingRealtimeEvent(BaseModel):
|
||||||
event: Literal["send_message", "typing_start", "typing_stop", "message_read", "message_delivered"]
|
event: Literal["send_message", "typing_start", "typing_stop", "message_read", "message_delivered", "ping"]
|
||||||
payload: dict[str, Any]
|
payload: dict[str, Any]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ export function useRealtime() {
|
|||||||
const typingByChat = useRef<Record<number, Set<number>>>({});
|
const typingByChat = useRef<Record<number, Set<number>>>({});
|
||||||
const wsRef = useRef<WebSocket | null>(null);
|
const wsRef = useRef<WebSocket | null>(null);
|
||||||
const reconnectTimeoutRef = useRef<number | null>(null);
|
const reconnectTimeoutRef = useRef<number | null>(null);
|
||||||
|
const heartbeatIntervalRef = useRef<number | null>(null);
|
||||||
|
const watchdogIntervalRef = useRef<number | null>(null);
|
||||||
|
const lastPongAtRef = useRef<number>(Date.now());
|
||||||
const reconnectAttemptsRef = useRef(0);
|
const reconnectAttemptsRef = useRef(0);
|
||||||
const manualCloseRef = useRef(false);
|
const manualCloseRef = useRef(false);
|
||||||
|
|
||||||
@@ -42,6 +45,23 @@ export function useRealtime() {
|
|||||||
|
|
||||||
ws.onopen = () => {
|
ws.onopen = () => {
|
||||||
reconnectAttemptsRef.current = 0;
|
reconnectAttemptsRef.current = 0;
|
||||||
|
lastPongAtRef.current = Date.now();
|
||||||
|
if (heartbeatIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(heartbeatIntervalRef.current);
|
||||||
|
}
|
||||||
|
if (watchdogIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(watchdogIntervalRef.current);
|
||||||
|
}
|
||||||
|
heartbeatIntervalRef.current = window.setInterval(() => {
|
||||||
|
if (ws.readyState === WebSocket.OPEN) {
|
||||||
|
ws.send(JSON.stringify({ event: "ping", payload: {} }));
|
||||||
|
}
|
||||||
|
}, 20000);
|
||||||
|
watchdogIntervalRef.current = window.setInterval(() => {
|
||||||
|
if (Date.now() - lastPongAtRef.current > 65000 && ws.readyState === WebSocket.OPEN) {
|
||||||
|
ws.close();
|
||||||
|
}
|
||||||
|
}, 15000);
|
||||||
void useChatStore.getState().loadChats();
|
void useChatStore.getState().loadChats();
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -80,6 +100,9 @@ export function useRealtime() {
|
|||||||
void chatStore.loadChats();
|
void chatStore.loadChats();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (event.event === "pong") {
|
||||||
|
lastPongAtRef.current = Date.now();
|
||||||
|
}
|
||||||
if (event.event === "typing_start") {
|
if (event.event === "typing_start") {
|
||||||
const chatId = Number(event.payload.chat_id);
|
const chatId = Number(event.payload.chat_id);
|
||||||
const userId = Number(event.payload.user_id);
|
const userId = Number(event.payload.user_id);
|
||||||
@@ -143,6 +166,14 @@ export function useRealtime() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
ws.onclose = () => {
|
ws.onclose = () => {
|
||||||
|
if (heartbeatIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(heartbeatIntervalRef.current);
|
||||||
|
heartbeatIntervalRef.current = null;
|
||||||
|
}
|
||||||
|
if (watchdogIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(watchdogIntervalRef.current);
|
||||||
|
watchdogIntervalRef.current = null;
|
||||||
|
}
|
||||||
if (manualCloseRef.current) {
|
if (manualCloseRef.current) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -160,6 +191,14 @@ export function useRealtime() {
|
|||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
manualCloseRef.current = true;
|
manualCloseRef.current = true;
|
||||||
|
if (heartbeatIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(heartbeatIntervalRef.current);
|
||||||
|
heartbeatIntervalRef.current = null;
|
||||||
|
}
|
||||||
|
if (watchdogIntervalRef.current !== null) {
|
||||||
|
window.clearInterval(watchdogIntervalRef.current);
|
||||||
|
watchdogIntervalRef.current = null;
|
||||||
|
}
|
||||||
if (reconnectTimeoutRef.current !== null) {
|
if (reconnectTimeoutRef.current !== null) {
|
||||||
window.clearTimeout(reconnectTimeoutRef.current);
|
window.clearTimeout(reconnectTimeoutRef.current);
|
||||||
reconnectTimeoutRef.current = null;
|
reconnectTimeoutRef.current = null;
|
||||||
|
|||||||
Reference in New Issue
Block a user