Files
tg-admin-bot/services/metrics.py
2026-02-08 00:02:34 +03:00

87 lines
2.5 KiB
Python

import asyncio
import time
from collections import deque
import psutil
class MetricsStore:
def __init__(self, maxlen: int = 720):
self.samples = deque(maxlen=maxlen)
self.interval = 5
def add(self, sample: dict):
self.samples.append(sample)
async def start_sampler(store: MetricsStore, interval: int = 5):
store.interval = interval
psutil.cpu_percent(interval=None)
last_net = psutil.net_io_counters()
while True:
now = time.time()
cpu = psutil.cpu_percent(interval=None)
load1 = psutil.getloadavg()[0]
net = psutil.net_io_counters()
rx_bytes = net.bytes_recv - last_net.bytes_recv
tx_bytes = net.bytes_sent - last_net.bytes_sent
last_net = net
rx_rate = rx_bytes / interval
tx_rate = tx_bytes / interval
store.add({
"ts": now,
"cpu": cpu,
"load1": load1,
"rx_bytes": rx_bytes,
"tx_bytes": tx_bytes,
"rx_rate": rx_rate,
"tx_rate": tx_rate,
})
await asyncio.sleep(interval)
def summarize(store: MetricsStore, minutes: int = 15) -> str:
cutoff = time.time() - minutes * 60
data = [s for s in list(store.samples) if s["ts"] >= cutoff]
if not data:
return "📈 Metrics\n\n⚠️ No data yet"
cpu_vals = [s["cpu"] for s in data]
load_vals = [s["load1"] for s in data]
rx_rates = [s["rx_rate"] for s in data]
tx_rates = [s["tx_rate"] for s in data]
total_rx = sum(s["rx_bytes"] for s in data)
total_tx = sum(s["tx_bytes"] for s in data)
def avg(vals):
return sum(vals) / len(vals) if vals else 0.0
def fmt_rate(bps):
if bps > 1024**2:
return f"{bps / (1024**2):.2f} MiB/s"
if bps > 1024:
return f"{bps / 1024:.2f} KiB/s"
return f"{bps:.0f} B/s"
def fmt_bytes(b):
if b > 1024**3:
return f"{b / (1024**3):.2f} GiB"
if b > 1024**2:
return f"{b / (1024**2):.2f} MiB"
if b > 1024:
return f"{b / 1024:.2f} KiB"
return f"{b} B"
return (
f"📈 Metrics (last {minutes}m)\n\n"
f"🧠 CPU avg: {avg(cpu_vals):.1f}% | max: {max(cpu_vals):.1f}%\n"
f"⚙️ Load avg: {avg(load_vals):.2f} | max: {max(load_vals):.2f}\n"
f"⬇️ RX avg: {fmt_rate(avg(rx_rates))} | max: {fmt_rate(max(rx_rates))} | total: {fmt_bytes(total_rx)}\n"
f"⬆️ TX avg: {fmt_rate(avg(tx_rates))} | max: {fmt_rate(max(tx_rates))} | total: {fmt_bytes(total_tx)}"
)