from __future__ import annotations from datetime import datetime, timedelta from typing import Dict, List, Tuple from config import LOG_SPREADSHEET_ID from database.db import get_connection from services.google import get_google_service _IGNORE_MESSAGES = {"начать", "главное меню", "назад"} def _now_iso() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _format_ts(value: str) -> str: if not value: return "" try: dt = datetime.strptime(value, "%Y-%m-%d %H:%M:%S") return dt.strftime("%d.%m.%Y %H:%M:%S") except ValueError: return value def log_message(user_id: int, text: str) -> None: if not text: print("LOG message: skipped empty text") return normalized = text.strip().lower() if normalized in _IGNORE_MESSAGES: print(f"LOG message: ignored user_id={user_id} text={normalized!r}") return with get_connection() as conn: cur = conn.cursor() created_at = _now_iso() cur.execute( """ INSERT INTO message_log (user_id, created_at, message) VALUES (?, ?, ?) """, (user_id, created_at, text.strip()), ) conn.commit() print(f"LOG message: stored user_id={user_id} at={created_at}") def log_event(user_id: int, event_type: str, event_value: str) -> None: if not event_type or not event_value: print("LOG event: skipped empty event_type or event_value") return with get_connection() as conn: cur = conn.cursor() created_at = _now_iso() cur.execute( """ INSERT INTO event_log (user_id, created_at, event_type, event_value) VALUES (?, ?, ?, ?) """, (user_id, created_at, event_type, event_value), ) conn.commit() print(f"LOG event: stored user_id={user_id} type={event_type} value={event_value}") def _fetch_chatlog_rows() -> List[List[str]]: print("LOG export: building ChatLog rows from SQLite") with get_connection() as conn: cur = conn.cursor() cur.execute("SELECT DISTINCT user_id FROM message_log ORDER BY user_id") users = [row[0] for row in cur.fetchall()] user_msgs: Dict[int, List[Tuple[str, str]]] = {} for user_id in users: cur.execute( """ SELECT created_at, message FROM message_log WHERE user_id = ? ORDER BY id """, (user_id,), ) user_msgs[user_id] = cur.fetchall() if not users: print("LOG export: no users in message_log") return [["Время сообщения", "vk.com/id0"]] header: List[str] = [] for i, user_id in enumerate(users): header.extend(["Время сообщения", f"vk.com/id{user_id}"]) if i != len(users) - 1: header.append("") max_len = max(len(user_msgs[u]) for u in users) rows = [header] for idx in range(max_len): row: List[str] = [] for i, user_id in enumerate(users): if idx < len(user_msgs[user_id]): ts, msg = user_msgs[user_id][idx] row.extend([_format_ts(ts), msg]) else: row.extend(["", ""]) if i != len(users) - 1: row.append("") rows.append(row) return rows def _fetch_freq_rows(days: int = 30, limit: int = 50) -> List[List[str]]: print(f"LOG export: building FreqMsg rows for last {days} days") since = (datetime.now() - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S") with get_connection() as conn: cur = conn.cursor() cur.execute( """ SELECT message, COUNT(*) FROM message_log WHERE created_at >= ? GROUP BY message ORDER BY COUNT(*) DESC LIMIT ? """, (since, limit), ) messages = [row[0] for row in cur.fetchall()] cur.execute( """ SELECT event_value, COUNT(*) FROM event_log WHERE event_type = 'pinpad_error' AND created_at >= ? GROUP BY event_value """, (since,), ) pinpad_counts = {row[0]: row[1] for row in cur.fetchall()} cur.execute( """ SELECT event_value, COUNT(*) FROM event_log WHERE event_type = 'terminal_instruction' AND created_at >= ? GROUP BY event_value """, (since,), ) terminal_counts = {row[0]: row[1] for row in cur.fetchall()} cur.execute("SELECT code FROM pinpad_errors ORDER BY code") pinpad_codes = [str(row[0]) for row in cur.fetchall()] cur.execute("SELECT id, title FROM terminal_instructions ORDER BY id") terminal_items = [(str(row[0]), row[1]) for row in cur.fetchall()] errors_with_counts: List[Tuple[str, int]] = [] for code in pinpad_codes: count = int(pinpad_counts.get(code, 0)) if count > 0: errors_with_counts.append((f"PinPad {code}", count)) for instr_id, title in terminal_items: count = int(terminal_counts.get(instr_id, 0)) if count > 0: label = title or f"Terminal {instr_id}" errors_with_counts.append((label, count)) errors_with_counts.sort(key=lambda x: (-x[1], x[0].lower())) errors = [label for label, _count in errors_with_counts] max_len = max(len(messages), len(errors), 1) rows = [["Частые сообщения", "Частые ошибки"]] for i in range(max_len): msg = messages[i] if i < len(messages) else "" err = errors[i] if i < len(errors) else "" rows.append([msg, err]) return rows def export_chatlog_and_freq() -> None: print("LOG export: start") service = get_google_service() chat_rows = _fetch_chatlog_rows() print(f"LOG export: writing ChatLog rows={len(chat_rows)}") service.spreadsheets().values().clear( spreadsheetId=LOG_SPREADSHEET_ID, range="ChatLog", body={}, ).execute() service.spreadsheets().values().update( spreadsheetId=LOG_SPREADSHEET_ID, range="ChatLog!A1", valueInputOption="RAW", body={"values": chat_rows}, ).execute() freq_rows = _fetch_freq_rows() print(f"LOG export: writing FreqMsg rows={len(freq_rows)}") service.spreadsheets().values().clear( spreadsheetId=LOG_SPREADSHEET_ID, range="FreqMsg", body={}, ).execute() service.spreadsheets().values().update( spreadsheetId=LOG_SPREADSHEET_ID, range="FreqMsg!A1", valueInputOption="RAW", body={"values": freq_rows}, ).execute() print("LOG export: done")