Files
VK_bot/database/repository.py
2026-04-30 18:38:38 +03:00

312 lines
10 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from database.db import get_connection
import re
_STOPWORDS = {
"и", "в", "во", "на", "с", "со", "к", "ко", "от", "до", "по",
"за", "из", "у", "о", "об", "про", "для", "при", "без", "не",
"нет", "ли", "же", "а", "но", "или", "то", "это", "все", "всё",
}
def _normalize_query(query: str):
text = re.sub(r"[^\w\s]+", " ", query.lower(), flags=re.UNICODE)
parts = [p for p in text.split() if p and p not in _STOPWORDS]
# Keep tokens of length >= 3 to reduce noise
return [p for p in parts if len(p) >= 3]
def _normalize_title(text: str) -> str:
text = (text or "").strip().lower()
text = re.sub(r"\s+", " ", text)
return text
def _levenshtein(a: str, b: str) -> int:
if a == b:
return 0
if not a:
return len(b)
if not b:
return len(a)
if len(a) < len(b):
a, b = b, a
prev = list(range(len(b) + 1))
for i, ca in enumerate(a, start=1):
cur = [i]
for j, cb in enumerate(b, start=1):
ins = cur[j - 1] + 1
delete = prev[j] + 1
sub = prev[j - 1] + (0 if ca == cb else 1)
cur.append(min(ins, delete, sub))
prev = cur
return prev[-1]
def _fuzzy_match(token: str, word: str) -> bool:
if token in word:
return True
dist = _levenshtein(token, word)
if len(token) <= 5:
return dist <= 1
if len(token) <= 8:
return dist <= 2
return dist <= 3
def find_instructions(query: str):
query = query.lower()
tokens = _normalize_query(query)
with get_connection() as conn:
cur = conn.cursor()
# 1⃣ точное совпадение по коду
cur.execute("""
SELECT DISTINCT i.id, i.title
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'code' AND k.key = ?
""", (query,))
rows = cur.fetchall()
if rows:
return rows
# 2⃣ частичное совпадение по тексту
if tokens:
like_parts = " OR ".join(["k.key LIKE ?"] * len(tokens))
params = [f"%{t}%" for t in tokens]
cur.execute(
f"""
SELECT i.id, i.title, k.key
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text' AND ({like_parts})
""",
params,
)
rows = cur.fetchall()
if rows:
scores = {}
titles = {}
for iid, title, key in rows:
key_l = (key or "").lower()
title_l = (title or "").lower()
matched = set()
for t in tokens:
if t in key_l or t in title_l:
matched.add(t)
if matched:
scores[iid] = max(scores.get(iid, 0), len(matched))
titles[iid] = title
if scores:
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(iid, titles[iid]) for iid, _ in ordered]
cur.execute("""
SELECT DISTINCT i.id, i.title
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text' AND k.key LIKE ?
""", (f"%{query}%",))
rows = cur.fetchall()
if rows:
return rows
# 3⃣ fuzzy поиск по ключевым словам (опечатки)
if tokens:
cur.execute("""
SELECT i.id, i.title, k.key
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text'
""")
all_rows = cur.fetchall()
scores = {}
titles = {}
for iid, title, key in all_rows:
key_words = [w for w in re.split(r"\s+", (key or "").lower()) if w]
title_words = [w for w in re.split(r"\s+", (title or "").lower()) if w]
matched = set()
for t in tokens:
if any(_fuzzy_match(t, w) for w in key_words) or any(_fuzzy_match(t, w) for w in title_words):
matched.add(t)
if matched:
scores[iid] = max(scores.get(iid, 0), len(matched))
titles[iid] = title
if scores:
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(iid, titles[iid]) for iid, _ in ordered]
return []
def get_terminal_steps(instruction_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT type, content
FROM terminal_instruction_steps
WHERE instruction_id = ?
ORDER BY step_order
""", (instruction_id,))
return cur.fetchall()
def get_instruction_id_by_title(title: str):
normalized = _normalize_title(title)
if not normalized:
return None
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT id, title
FROM terminal_instructions
""")
for iid, db_title in cur.fetchall():
if _normalize_title(db_title) == normalized:
return iid
return None
def find_tech_problems(query: str):
tokens = _normalize_query(query)
if not tokens:
return []
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT id, task_type, keywords
FROM tech_problems
""")
rows = cur.fetchall()
scores = {}
types = {}
for pid, task_type, keywords in rows:
key_words = [w for w in re.split(r"[,\s]+", (keywords or "").lower()) if w]
matched = set()
for t in tokens:
if any(_fuzzy_match(t, w) for w in key_words):
matched.add(t)
if matched:
scores[pid] = max(scores.get(pid, 0), len(matched))
types[pid] = task_type
if not scores:
return []
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(pid, types[pid]) for pid, _ in ordered]
def get_tech_solution(problem_id: str):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, problem_name, task_type, can_fix_self,
need_result_feedback, solution_steps, tools_needed,
when_stop_and_report
FROM tech_problem_solutions
WHERE problem_id = ?
""", (problem_id,))
return cur.fetchone()
def get_tech_problem_by_name(problem_name: str):
normalized = _normalize_title(problem_name)
if not normalized:
return None
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, problem_name, task_type
FROM tech_problem_solutions
""")
for pid, name, task_type in cur.fetchall():
if _normalize_title(name) == normalized:
return pid, name, task_type
return None
def find_tech_solutions_by_name_contains(fragment: str):
frag = _normalize_title(fragment)
if not frag:
return []
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT problem_id, problem_name, task_type
FROM tech_problem_solutions
WHERE lower(problem_name) LIKE ?
""",
(f"%{frag}%",),
)
return cur.fetchall()
def set_tech_problem_progress(user_id: int, problem_id: str, task_type: str, need_result_feedback: str):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
INSERT INTO tech_problem_progress (user_id, problem_id, task_type, need_result_feedback)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
problem_id = excluded.problem_id,
task_type = excluded.task_type,
need_result_feedback = excluded.need_result_feedback
""", (user_id, problem_id, task_type, need_result_feedback))
conn.commit()
def get_tech_problem_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, task_type, need_result_feedback
FROM tech_problem_progress
WHERE user_id = ?
""", (user_id,))
return cur.fetchone()
def clear_tech_problem_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM tech_problem_progress WHERE user_id = ?", (user_id,))
conn.commit()
def set_instruction_progress(user_id: int, instruction_id: int, next_step: int, pause_at_end: bool):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
INSERT INTO instruction_progress (user_id, instruction_id, next_step, pause_at_end)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
instruction_id = excluded.instruction_id,
next_step = excluded.next_step,
pause_at_end = excluded.pause_at_end
""", (user_id, instruction_id, next_step, 1 if pause_at_end else 0))
conn.commit()
def get_instruction_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT instruction_id, next_step, pause_at_end
FROM instruction_progress
WHERE user_id = ?
""", (user_id,))
return cur.fetchone()
def clear_instruction_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM instruction_progress WHERE user_id = ?", (user_id,))
conn.commit()