261 lines
9.8 KiB
Python
261 lines
9.8 KiB
Python
from database.db import get_connection
|
||
from database.repository import (
|
||
get_terminal_steps,
|
||
set_instruction_progress,
|
||
get_instruction_progress,
|
||
clear_instruction_progress,
|
||
find_instructions,
|
||
)
|
||
from keyboards.factory import pause_keyboard, back_to_main, build_keyboard
|
||
from services.log_export import log_event
|
||
|
||
PAUSE_PROMPT = "Помогла ли эта часть?"
|
||
PAUSE_YES = {"да, помогло", "да помогло", "да"}
|
||
PAUSE_NO = {"нет, дальше", "нет дальше", "нет"}
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# PINPAD — простой ответ
|
||
# ─────────────────────────────────────────────
|
||
|
||
async def send_pinpad_error(message, code: str, keyboard=None):
|
||
"""
|
||
Отправляет простой ответ для ошибок пинпада:
|
||
причина + что делать
|
||
"""
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"SELECT reason, action FROM pinpad_errors WHERE code = ?",
|
||
(code,)
|
||
)
|
||
row = cur.fetchone()
|
||
|
||
if not row:
|
||
return False # сигнал, что ошибки нет
|
||
|
||
reason, action = row
|
||
log_event(message.from_id, "pinpad_error", str(code))
|
||
action_text = (action or "").strip()
|
||
if action_text.lower().startswith("terminal:"):
|
||
query = action_text.split(":", 1)[1].strip()
|
||
if query:
|
||
found = find_instructions(query.lower())
|
||
if len(found) == 1:
|
||
instruction_id, _ = found[0]
|
||
await send_terminal_instruction(
|
||
message,
|
||
instruction_id,
|
||
keyboard=keyboard or back_to_main(),
|
||
)
|
||
return True
|
||
if len(found) > 1:
|
||
buttons = [{"title": title} for _, title in found]
|
||
kb = build_keyboard(buttons, back=True)
|
||
await message.answer(
|
||
"🔎 Найдено несколько вариантов. Выберите нужный:",
|
||
keyboard=kb,
|
||
)
|
||
return True
|
||
|
||
await message.answer(
|
||
f"❗ Ошибка пинпада {code}\n\n"
|
||
f"📌 Причина:\n{reason}\n\n"
|
||
f"❌ Не удалось найти инструкцию по ключу: {query or 'не задан'}",
|
||
keyboard=keyboard or back_to_main(),
|
||
)
|
||
return True
|
||
|
||
await message.answer(
|
||
f"❗ Ошибка пинпада {code}\n\n"
|
||
f"📌 Причина:\n{reason}\n\n"
|
||
f"✅ Что делать:\n{action}",
|
||
keyboard=keyboard
|
||
)
|
||
return True
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# TERMINAL — подробная инструкция
|
||
# ─────────────────────────────────────────────
|
||
|
||
async def _send_text(target, text: str, keyboard=None):
|
||
if hasattr(target, "answer"):
|
||
await target.answer(text, keyboard=keyboard)
|
||
else:
|
||
await target.send_message(text, keyboard=keyboard)
|
||
|
||
|
||
async def _send_attachment(target, attachment: str):
|
||
if hasattr(target, "answer"):
|
||
await target.answer(attachment=attachment)
|
||
else:
|
||
await target.send_message(attachment=attachment)
|
||
|
||
|
||
def _parse_goto(value: str):
|
||
parts = (value or "").strip().split(":", 1)
|
||
if not parts[0].isdigit():
|
||
return None
|
||
instr_id = int(parts[0])
|
||
step_index = 0
|
||
if len(parts) > 1:
|
||
if not parts[1].isdigit():
|
||
return None
|
||
step_num = int(parts[1])
|
||
if step_num > 0:
|
||
step_index = step_num - 1
|
||
return instr_id, step_index
|
||
|
||
|
||
async def _send_terminal_from_index(target, instruction_id: int, start_index: int, keyboard=None, max_hops: int = 5):
|
||
"""
|
||
Отправляет пошаговую инструкцию терминала:
|
||
несколько сообщений подряд (text / image)
|
||
"""
|
||
steps = get_terminal_steps(instruction_id)
|
||
|
||
if not steps:
|
||
await _send_text(target, "❌ Для этой инструкции нет шагов.", keyboard=keyboard)
|
||
return
|
||
|
||
for idx, (step_type, content) in enumerate(steps[start_index:], start=start_index):
|
||
if step_type == "text":
|
||
await _send_text(target, content, keyboard=keyboard)
|
||
elif step_type == "image":
|
||
await _send_attachment(target, content)
|
||
elif step_type == "pause":
|
||
await _send_text(target, PAUSE_PROMPT, keyboard=pause_keyboard())
|
||
pause_at_end = (idx == len(steps) - 1)
|
||
set_instruction_progress(
|
||
target.from_id if hasattr(target, "from_id") else target.user_id,
|
||
instruction_id,
|
||
idx + 1,
|
||
pause_at_end
|
||
)
|
||
return
|
||
elif step_type == "goto":
|
||
if max_hops <= 0:
|
||
await _send_text(target, "❌ Превышено число переходов между инструкциями.", keyboard=keyboard)
|
||
return
|
||
parsed = _parse_goto(content)
|
||
if not parsed:
|
||
await _send_text(target, f"❌ Некорректный goto: {content}", keyboard=keyboard)
|
||
return
|
||
next_id, next_index = parsed
|
||
user_id = target.from_id if hasattr(target, "from_id") else target.user_id
|
||
log_event(user_id, "terminal_instruction", str(next_id))
|
||
clear_instruction_progress(target.from_id if hasattr(target, "from_id") else target.user_id)
|
||
await _send_terminal_from_index(
|
||
target,
|
||
next_id,
|
||
next_index,
|
||
keyboard=keyboard,
|
||
max_hops=max_hops - 1,
|
||
)
|
||
return
|
||
|
||
clear_instruction_progress(target.from_id if hasattr(target, "from_id") else target.user_id)
|
||
|
||
|
||
async def send_terminal_instruction(message, instruction_id: int, keyboard=None):
|
||
log_event(message.from_id, "terminal_instruction", str(instruction_id))
|
||
await _send_terminal_from_index(message, instruction_id, 0, keyboard)
|
||
|
||
|
||
# ─────────────────────────────────────────────
|
||
# УНИВЕРСАЛЬНЫЙ ХЕЛПЕР (по желанию)
|
||
# ─────────────────────────────────────────────
|
||
|
||
async def send_instruction(message, instruction_id: int, keyboard=None):
|
||
"""
|
||
Алиас, чтобы использовать одно имя в коде
|
||
"""
|
||
await send_terminal_instruction(message, instruction_id, keyboard)
|
||
|
||
|
||
async def handle_pause_response(message) -> bool:
|
||
if not message.text:
|
||
return False
|
||
|
||
progress = get_instruction_progress(message.from_id)
|
||
if not progress:
|
||
return False
|
||
|
||
text = message.text.strip().lower()
|
||
instruction_id, next_step, pause_at_end = progress
|
||
|
||
if text in PAUSE_YES:
|
||
clear_instruction_progress(message.from_id)
|
||
await message.answer("Отлично, остановил инструкцию.", keyboard=back_to_main())
|
||
return True
|
||
|
||
if text in PAUSE_NO:
|
||
if pause_at_end:
|
||
clear_instruction_progress(message.from_id)
|
||
await message.answer(
|
||
"Похоже, инструкция не помогла. Заполните, пожалуйста, форму:\n"
|
||
"https://example.com/google-form",
|
||
keyboard=back_to_main()
|
||
)
|
||
return True
|
||
|
||
steps = get_terminal_steps(instruction_id)
|
||
if next_step >= len(steps):
|
||
clear_instruction_progress(message.from_id)
|
||
await message.answer(
|
||
"Похоже, инструкция не помогла. Заполните, пожалуйста, форму:\n"
|
||
"https://example.com/google-form",
|
||
keyboard=back_to_main()
|
||
)
|
||
return True
|
||
|
||
await _send_terminal_from_index(message, instruction_id, next_step, keyboard=back_to_main())
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
async def handle_pause_event(event) -> bool:
|
||
payload = event.get_payload_json() if hasattr(event, "get_payload_json") else None
|
||
if not payload or payload.get("pause") not in ("yes", "no"):
|
||
return False
|
||
|
||
progress = get_instruction_progress(event.user_id)
|
||
if not progress:
|
||
return False
|
||
|
||
instruction_id, next_step, pause_at_end = progress
|
||
|
||
if payload["pause"] == "yes":
|
||
clear_instruction_progress(event.user_id)
|
||
await event.send_message("Отлично, всё работает.", keyboard=back_to_main())
|
||
return True
|
||
|
||
if pause_at_end:
|
||
clear_instruction_progress(event.user_id)
|
||
await event.send_message(
|
||
"Похоже, инструкция не помогла. Заполните, пожалуйста, форму:\n"
|
||
"https://example.com/google-form",
|
||
keyboard=back_to_main()
|
||
)
|
||
return True
|
||
|
||
steps = get_terminal_steps(instruction_id)
|
||
if next_step >= len(steps):
|
||
clear_instruction_progress(event.user_id)
|
||
await event.send_message(
|
||
"Похоже, инструкция не помогла. Заполните, пожалуйста, форму:\n"
|
||
"https://example.com/google-form",
|
||
keyboard=back_to_main()
|
||
)
|
||
return True
|
||
|
||
await _send_terminal_from_index(
|
||
event,
|
||
instruction_id,
|
||
next_step,
|
||
keyboard=back_to_main()
|
||
)
|
||
return True
|