import json import os import threading import tempfile from typing import Any, Dict _PATH = "/var/server-bot/runtime.json" _STATE: Dict[str, Any] = {} _LOCK = threading.RLock() _LOADED = False def configure(path: str | None): global _PATH if path: _PATH = path def _load_from_disk(): global _STATE, _LOADED if not os.path.exists(_PATH): _STATE = {} _LOADED = True return try: with open(_PATH, "r", encoding="utf-8") as f: _STATE = json.load(f) except Exception: _STATE = {} _LOADED = True def _save(): directory = os.path.dirname(_PATH) or "." os.makedirs(directory, exist_ok=True) try: fd, tmp_path = tempfile.mkstemp(prefix=".runtime.", suffix=".json", dir=directory) try: with os.fdopen(fd, "w", encoding="utf-8") as f: json.dump(_STATE, f, ensure_ascii=False) f.flush() os.fsync(f.fileno()) os.replace(tmp_path, _PATH) finally: if os.path.exists(tmp_path): try: os.unlink(tmp_path) except Exception: pass except Exception: pass def get_state() -> Dict[str, Any]: with _LOCK: if not _LOADED: _load_from_disk() return _STATE def set_state(key: str, value: Any): with _LOCK: if not _LOADED: _load_from_disk() _STATE[key] = value _save() def get(key: str, default: Any = None) -> Any: with _LOCK: if not _LOADED: _load_from_disk() return _STATE.get(key, default)