Add runtime state, auto-mute schedules, and backup retries
This commit is contained in:
52
services/runtime_state.py
Normal file
52
services/runtime_state.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Dict
|
||||
|
||||
_PATH = "/var/server-bot/runtime.json"
|
||||
_STATE: Dict[str, Any] = {}
|
||||
|
||||
|
||||
def configure(path: str | None):
|
||||
global _PATH
|
||||
if path:
|
||||
_PATH = path
|
||||
|
||||
|
||||
def _load_from_disk():
|
||||
global _STATE
|
||||
if not os.path.exists(_PATH):
|
||||
_STATE = {}
|
||||
return
|
||||
try:
|
||||
with open(_PATH, "r", encoding="utf-8") as f:
|
||||
_STATE = json.load(f)
|
||||
except Exception:
|
||||
_STATE = {}
|
||||
|
||||
|
||||
def _save():
|
||||
os.makedirs(os.path.dirname(_PATH), exist_ok=True)
|
||||
try:
|
||||
with open(_PATH, "w", encoding="utf-8") as f:
|
||||
json.dump(_STATE, f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def get_state() -> Dict[str, Any]:
|
||||
if not _STATE:
|
||||
_load_from_disk()
|
||||
return _STATE
|
||||
|
||||
|
||||
def set_state(key: str, value: Any):
|
||||
if not _STATE:
|
||||
_load_from_disk()
|
||||
_STATE[key] = value
|
||||
_save()
|
||||
|
||||
|
||||
def get(key: str, default: Any = None) -> Any:
|
||||
if not _STATE:
|
||||
_load_from_disk()
|
||||
return _STATE.get(key, default)
|
||||
Reference in New Issue
Block a user