import json from collections.abc import Awaitable, Callable from redis.asyncio import Redis from app.config.settings import settings class RedisRealtimeRepository: def __init__(self) -> None: self._redis: Redis | None = None self._pubsub = None async def connect(self) -> None: if self._redis: return self._redis = Redis.from_url(settings.redis_url, decode_responses=True) self._pubsub = self._redis.pubsub() await self._pubsub.psubscribe("chat:*") async def close(self) -> None: if self._pubsub: await self._pubsub.close() self._pubsub = None if self._redis: await self._redis.aclose() self._redis = None async def publish_event(self, channel: str, payload: dict) -> None: if not self._redis: await self.connect() assert self._redis is not None await self._redis.publish(channel, json.dumps(payload)) async def consume(self, handler: Callable[[str, dict], Awaitable[None]]) -> None: if not self._pubsub: await self.connect() assert self._pubsub is not None while True: message = await self._pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if not message: continue channel = message.get("channel") data = message.get("data") if not channel or not isinstance(data, str): continue payload = json.loads(data) await handler(channel, payload)