312 lines
10 KiB
Python
312 lines
10 KiB
Python
from database.db import get_connection
|
||
import re
|
||
|
||
|
||
_STOPWORDS = {
|
||
"и", "в", "во", "на", "с", "со", "к", "ко", "от", "до", "по",
|
||
"за", "из", "у", "о", "об", "про", "для", "при", "без", "не",
|
||
"нет", "ли", "же", "а", "но", "или", "то", "это", "все", "всё",
|
||
}
|
||
|
||
|
||
def _normalize_query(query: str):
|
||
text = re.sub(r"[^\w\s]+", " ", query.lower(), flags=re.UNICODE)
|
||
parts = [p for p in text.split() if p and p not in _STOPWORDS]
|
||
# Keep tokens of length >= 3 to reduce noise
|
||
return [p for p in parts if len(p) >= 3]
|
||
|
||
|
||
def _normalize_title(text: str) -> str:
|
||
text = (text or "").strip().lower()
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text
|
||
|
||
|
||
def _levenshtein(a: str, b: str) -> int:
|
||
if a == b:
|
||
return 0
|
||
if not a:
|
||
return len(b)
|
||
if not b:
|
||
return len(a)
|
||
|
||
if len(a) < len(b):
|
||
a, b = b, a
|
||
|
||
prev = list(range(len(b) + 1))
|
||
for i, ca in enumerate(a, start=1):
|
||
cur = [i]
|
||
for j, cb in enumerate(b, start=1):
|
||
ins = cur[j - 1] + 1
|
||
delete = prev[j] + 1
|
||
sub = prev[j - 1] + (0 if ca == cb else 1)
|
||
cur.append(min(ins, delete, sub))
|
||
prev = cur
|
||
return prev[-1]
|
||
|
||
|
||
def _fuzzy_match(token: str, word: str) -> bool:
|
||
if token in word:
|
||
return True
|
||
dist = _levenshtein(token, word)
|
||
if len(token) <= 5:
|
||
return dist <= 1
|
||
if len(token) <= 8:
|
||
return dist <= 2
|
||
return dist <= 3
|
||
|
||
def find_instructions(query: str):
|
||
query = query.lower()
|
||
tokens = _normalize_query(query)
|
||
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
|
||
# 1️⃣ точное совпадение по коду
|
||
cur.execute("""
|
||
SELECT DISTINCT i.id, i.title
|
||
FROM terminal_instruction_keys k
|
||
JOIN terminal_instructions i ON i.id = k.instruction_id
|
||
WHERE k.key_type = 'code' AND k.key = ?
|
||
""", (query,))
|
||
rows = cur.fetchall()
|
||
if rows:
|
||
return rows
|
||
|
||
# 2️⃣ частичное совпадение по тексту
|
||
if tokens:
|
||
like_parts = " OR ".join(["k.key LIKE ?"] * len(tokens))
|
||
params = [f"%{t}%" for t in tokens]
|
||
cur.execute(
|
||
f"""
|
||
SELECT i.id, i.title, k.key
|
||
FROM terminal_instruction_keys k
|
||
JOIN terminal_instructions i ON i.id = k.instruction_id
|
||
WHERE k.key_type = 'text' AND ({like_parts})
|
||
""",
|
||
params,
|
||
)
|
||
rows = cur.fetchall()
|
||
if rows:
|
||
scores = {}
|
||
titles = {}
|
||
for iid, title, key in rows:
|
||
key_l = (key or "").lower()
|
||
title_l = (title or "").lower()
|
||
matched = set()
|
||
for t in tokens:
|
||
if t in key_l or t in title_l:
|
||
matched.add(t)
|
||
if matched:
|
||
scores[iid] = max(scores.get(iid, 0), len(matched))
|
||
titles[iid] = title
|
||
if scores:
|
||
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
|
||
return [(iid, titles[iid]) for iid, _ in ordered]
|
||
|
||
cur.execute("""
|
||
SELECT DISTINCT i.id, i.title
|
||
FROM terminal_instruction_keys k
|
||
JOIN terminal_instructions i ON i.id = k.instruction_id
|
||
WHERE k.key_type = 'text' AND k.key LIKE ?
|
||
""", (f"%{query}%",))
|
||
|
||
rows = cur.fetchall()
|
||
if rows:
|
||
return rows
|
||
|
||
# 3️⃣ fuzzy поиск по ключевым словам (опечатки)
|
||
if tokens:
|
||
cur.execute("""
|
||
SELECT i.id, i.title, k.key
|
||
FROM terminal_instruction_keys k
|
||
JOIN terminal_instructions i ON i.id = k.instruction_id
|
||
WHERE k.key_type = 'text'
|
||
""")
|
||
all_rows = cur.fetchall()
|
||
scores = {}
|
||
titles = {}
|
||
for iid, title, key in all_rows:
|
||
key_words = [w for w in re.split(r"\s+", (key or "").lower()) if w]
|
||
title_words = [w for w in re.split(r"\s+", (title or "").lower()) if w]
|
||
matched = set()
|
||
for t in tokens:
|
||
if any(_fuzzy_match(t, w) for w in key_words) or any(_fuzzy_match(t, w) for w in title_words):
|
||
matched.add(t)
|
||
if matched:
|
||
scores[iid] = max(scores.get(iid, 0), len(matched))
|
||
titles[iid] = title
|
||
if scores:
|
||
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
|
||
return [(iid, titles[iid]) for iid, _ in ordered]
|
||
|
||
return []
|
||
|
||
def get_terminal_steps(instruction_id: int):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT type, content
|
||
FROM terminal_instruction_steps
|
||
WHERE instruction_id = ?
|
||
ORDER BY step_order
|
||
""", (instruction_id,))
|
||
return cur.fetchall()
|
||
|
||
|
||
def get_instruction_id_by_title(title: str):
|
||
normalized = _normalize_title(title)
|
||
if not normalized:
|
||
return None
|
||
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT id, title
|
||
FROM terminal_instructions
|
||
""")
|
||
for iid, db_title in cur.fetchall():
|
||
if _normalize_title(db_title) == normalized:
|
||
return iid
|
||
return None
|
||
|
||
|
||
def find_tech_problems(query: str):
|
||
tokens = _normalize_query(query)
|
||
if not tokens:
|
||
return []
|
||
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT id, task_type, keywords
|
||
FROM tech_problems
|
||
""")
|
||
rows = cur.fetchall()
|
||
|
||
scores = {}
|
||
types = {}
|
||
for pid, task_type, keywords in rows:
|
||
key_words = [w for w in re.split(r"[,\s]+", (keywords or "").lower()) if w]
|
||
matched = set()
|
||
for t in tokens:
|
||
if any(_fuzzy_match(t, w) for w in key_words):
|
||
matched.add(t)
|
||
if matched:
|
||
scores[pid] = max(scores.get(pid, 0), len(matched))
|
||
types[pid] = task_type
|
||
|
||
if not scores:
|
||
return []
|
||
|
||
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
|
||
return [(pid, types[pid]) for pid, _ in ordered]
|
||
|
||
|
||
def get_tech_solution(problem_id: str):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT problem_id, problem_name, task_type, can_fix_self,
|
||
need_result_feedback, solution_steps, tools_needed,
|
||
when_stop_and_report
|
||
FROM tech_problem_solutions
|
||
WHERE problem_id = ?
|
||
""", (problem_id,))
|
||
return cur.fetchone()
|
||
|
||
|
||
def get_tech_problem_by_name(problem_name: str):
|
||
normalized = _normalize_title(problem_name)
|
||
if not normalized:
|
||
return None
|
||
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT problem_id, problem_name, task_type
|
||
FROM tech_problem_solutions
|
||
""")
|
||
for pid, name, task_type in cur.fetchall():
|
||
if _normalize_title(name) == normalized:
|
||
return pid, name, task_type
|
||
return None
|
||
|
||
|
||
def find_tech_solutions_by_name_contains(fragment: str):
|
||
frag = _normalize_title(fragment)
|
||
if not frag:
|
||
return []
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"""
|
||
SELECT problem_id, problem_name, task_type
|
||
FROM tech_problem_solutions
|
||
WHERE lower(problem_name) LIKE ?
|
||
""",
|
||
(f"%{frag}%",),
|
||
)
|
||
return cur.fetchall()
|
||
|
||
|
||
def set_tech_problem_progress(user_id: int, problem_id: str, task_type: str, need_result_feedback: str):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
INSERT INTO tech_problem_progress (user_id, problem_id, task_type, need_result_feedback)
|
||
VALUES (?, ?, ?, ?)
|
||
ON CONFLICT(user_id) DO UPDATE SET
|
||
problem_id = excluded.problem_id,
|
||
task_type = excluded.task_type,
|
||
need_result_feedback = excluded.need_result_feedback
|
||
""", (user_id, problem_id, task_type, need_result_feedback))
|
||
conn.commit()
|
||
|
||
|
||
def get_tech_problem_progress(user_id: int):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT problem_id, task_type, need_result_feedback
|
||
FROM tech_problem_progress
|
||
WHERE user_id = ?
|
||
""", (user_id,))
|
||
return cur.fetchone()
|
||
|
||
|
||
def clear_tech_problem_progress(user_id: int):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("DELETE FROM tech_problem_progress WHERE user_id = ?", (user_id,))
|
||
conn.commit()
|
||
|
||
def set_instruction_progress(user_id: int, instruction_id: int, next_step: int, pause_at_end: bool):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
INSERT INTO instruction_progress (user_id, instruction_id, next_step, pause_at_end)
|
||
VALUES (?, ?, ?, ?)
|
||
ON CONFLICT(user_id) DO UPDATE SET
|
||
instruction_id = excluded.instruction_id,
|
||
next_step = excluded.next_step,
|
||
pause_at_end = excluded.pause_at_end
|
||
""", (user_id, instruction_id, next_step, 1 if pause_at_end else 0))
|
||
conn.commit()
|
||
|
||
def get_instruction_progress(user_id: int):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("""
|
||
SELECT instruction_id, next_step, pause_at_end
|
||
FROM instruction_progress
|
||
WHERE user_id = ?
|
||
""", (user_id,))
|
||
return cur.fetchone()
|
||
|
||
def clear_instruction_progress(user_id: int):
|
||
with get_connection() as conn:
|
||
cur = conn.cursor()
|
||
cur.execute("DELETE FROM instruction_progress WHERE user_id = ?", (user_id,))
|
||
conn.commit()
|