Initial commit

This commit is contained in:
2026-04-30 18:38:38 +03:00
commit 8d71819caf
20 changed files with 2134 additions and 0 deletions

311
database/repository.py Normal file
View File

@@ -0,0 +1,311 @@
from database.db import get_connection
import re
_STOPWORDS = {
"и", "в", "во", "на", "с", "со", "к", "ко", "от", "до", "по",
"за", "из", "у", "о", "об", "про", "для", "при", "без", "не",
"нет", "ли", "же", "а", "но", "или", "то", "это", "все", "всё",
}
def _normalize_query(query: str):
text = re.sub(r"[^\w\s]+", " ", query.lower(), flags=re.UNICODE)
parts = [p for p in text.split() if p and p not in _STOPWORDS]
# Keep tokens of length >= 3 to reduce noise
return [p for p in parts if len(p) >= 3]
def _normalize_title(text: str) -> str:
text = (text or "").strip().lower()
text = re.sub(r"\s+", " ", text)
return text
def _levenshtein(a: str, b: str) -> int:
if a == b:
return 0
if not a:
return len(b)
if not b:
return len(a)
if len(a) < len(b):
a, b = b, a
prev = list(range(len(b) + 1))
for i, ca in enumerate(a, start=1):
cur = [i]
for j, cb in enumerate(b, start=1):
ins = cur[j - 1] + 1
delete = prev[j] + 1
sub = prev[j - 1] + (0 if ca == cb else 1)
cur.append(min(ins, delete, sub))
prev = cur
return prev[-1]
def _fuzzy_match(token: str, word: str) -> bool:
if token in word:
return True
dist = _levenshtein(token, word)
if len(token) <= 5:
return dist <= 1
if len(token) <= 8:
return dist <= 2
return dist <= 3
def find_instructions(query: str):
query = query.lower()
tokens = _normalize_query(query)
with get_connection() as conn:
cur = conn.cursor()
# 1⃣ точное совпадение по коду
cur.execute("""
SELECT DISTINCT i.id, i.title
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'code' AND k.key = ?
""", (query,))
rows = cur.fetchall()
if rows:
return rows
# 2⃣ частичное совпадение по тексту
if tokens:
like_parts = " OR ".join(["k.key LIKE ?"] * len(tokens))
params = [f"%{t}%" for t in tokens]
cur.execute(
f"""
SELECT i.id, i.title, k.key
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text' AND ({like_parts})
""",
params,
)
rows = cur.fetchall()
if rows:
scores = {}
titles = {}
for iid, title, key in rows:
key_l = (key or "").lower()
title_l = (title or "").lower()
matched = set()
for t in tokens:
if t in key_l or t in title_l:
matched.add(t)
if matched:
scores[iid] = max(scores.get(iid, 0), len(matched))
titles[iid] = title
if scores:
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(iid, titles[iid]) for iid, _ in ordered]
cur.execute("""
SELECT DISTINCT i.id, i.title
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text' AND k.key LIKE ?
""", (f"%{query}%",))
rows = cur.fetchall()
if rows:
return rows
# 3⃣ fuzzy поиск по ключевым словам (опечатки)
if tokens:
cur.execute("""
SELECT i.id, i.title, k.key
FROM terminal_instruction_keys k
JOIN terminal_instructions i ON i.id = k.instruction_id
WHERE k.key_type = 'text'
""")
all_rows = cur.fetchall()
scores = {}
titles = {}
for iid, title, key in all_rows:
key_words = [w for w in re.split(r"\s+", (key or "").lower()) if w]
title_words = [w for w in re.split(r"\s+", (title or "").lower()) if w]
matched = set()
for t in tokens:
if any(_fuzzy_match(t, w) for w in key_words) or any(_fuzzy_match(t, w) for w in title_words):
matched.add(t)
if matched:
scores[iid] = max(scores.get(iid, 0), len(matched))
titles[iid] = title
if scores:
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(iid, titles[iid]) for iid, _ in ordered]
return []
def get_terminal_steps(instruction_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT type, content
FROM terminal_instruction_steps
WHERE instruction_id = ?
ORDER BY step_order
""", (instruction_id,))
return cur.fetchall()
def get_instruction_id_by_title(title: str):
normalized = _normalize_title(title)
if not normalized:
return None
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT id, title
FROM terminal_instructions
""")
for iid, db_title in cur.fetchall():
if _normalize_title(db_title) == normalized:
return iid
return None
def find_tech_problems(query: str):
tokens = _normalize_query(query)
if not tokens:
return []
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT id, task_type, keywords
FROM tech_problems
""")
rows = cur.fetchall()
scores = {}
types = {}
for pid, task_type, keywords in rows:
key_words = [w for w in re.split(r"[,\s]+", (keywords or "").lower()) if w]
matched = set()
for t in tokens:
if any(_fuzzy_match(t, w) for w in key_words):
matched.add(t)
if matched:
scores[pid] = max(scores.get(pid, 0), len(matched))
types[pid] = task_type
if not scores:
return []
ordered = sorted(scores.items(), key=lambda x: (-x[1], x[0]))
return [(pid, types[pid]) for pid, _ in ordered]
def get_tech_solution(problem_id: str):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, problem_name, task_type, can_fix_self,
need_result_feedback, solution_steps, tools_needed,
when_stop_and_report
FROM tech_problem_solutions
WHERE problem_id = ?
""", (problem_id,))
return cur.fetchone()
def get_tech_problem_by_name(problem_name: str):
normalized = _normalize_title(problem_name)
if not normalized:
return None
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, problem_name, task_type
FROM tech_problem_solutions
""")
for pid, name, task_type in cur.fetchall():
if _normalize_title(name) == normalized:
return pid, name, task_type
return None
def find_tech_solutions_by_name_contains(fragment: str):
frag = _normalize_title(fragment)
if not frag:
return []
with get_connection() as conn:
cur = conn.cursor()
cur.execute(
"""
SELECT problem_id, problem_name, task_type
FROM tech_problem_solutions
WHERE lower(problem_name) LIKE ?
""",
(f"%{frag}%",),
)
return cur.fetchall()
def set_tech_problem_progress(user_id: int, problem_id: str, task_type: str, need_result_feedback: str):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
INSERT INTO tech_problem_progress (user_id, problem_id, task_type, need_result_feedback)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
problem_id = excluded.problem_id,
task_type = excluded.task_type,
need_result_feedback = excluded.need_result_feedback
""", (user_id, problem_id, task_type, need_result_feedback))
conn.commit()
def get_tech_problem_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT problem_id, task_type, need_result_feedback
FROM tech_problem_progress
WHERE user_id = ?
""", (user_id,))
return cur.fetchone()
def clear_tech_problem_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM tech_problem_progress WHERE user_id = ?", (user_id,))
conn.commit()
def set_instruction_progress(user_id: int, instruction_id: int, next_step: int, pause_at_end: bool):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
INSERT INTO instruction_progress (user_id, instruction_id, next_step, pause_at_end)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
instruction_id = excluded.instruction_id,
next_step = excluded.next_step,
pause_at_end = excluded.pause_at_end
""", (user_id, instruction_id, next_step, 1 if pause_at_end else 0))
conn.commit()
def get_instruction_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("""
SELECT instruction_id, next_step, pause_at_end
FROM instruction_progress
WHERE user_id = ?
""", (user_id,))
return cur.fetchone()
def clear_instruction_progress(user_id: int):
with get_connection() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM instruction_progress WHERE user_id = ?", (user_id,))
conn.commit()