refactor: вынес сервисы и ui-компоненты

- вынес token/chat/update логику в services

- вынес диалог и текст инструкции в ui

- добавил и обновил тесты для нового слоя
This commit is contained in:
2026-02-15 20:32:36 +03:00
parent 4d84d2ebe5
commit e1e2f8f0e8
11 changed files with 715 additions and 14 deletions

172
main.py
View File

@@ -14,7 +14,18 @@ import tempfile
import urllib.request import urllib.request
import zipfile import zipfile
from app_version import APP_VERSION from app_version import APP_VERSION
from services import UpdateChecker, VkService, detect_update_repository_url from services import (
AutoUpdateService,
UpdateChecker,
VkService,
detect_update_repository_url,
load_chat_conversations,
load_token as token_store_load_token,
resolve_user_ids as chat_resolve_user_ids,
save_token as token_store_save_token,
)
from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog
from ui.main_window import instructions_text
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox, QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
@@ -22,7 +33,7 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QProgressBar) QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices from PySide6.QtGui import QIcon, QAction, QDesktopServices
from urllib.parse import urlparse, parse_qs, unquote from urllib.parse import parse_qs, unquote
from vk_api.exceptions import VkApiError from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess from PySide6.QtCore import QProcess
@@ -272,6 +283,7 @@ class VkChatManager(QMainWindow):
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'." "5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
) )
self.instructions.setFixedHeight(120) self.instructions.setFixedHeight(120)
self.instructions.setPlainText(instructions_text())
layout.addWidget(self.instructions) layout.addWidget(self.instructions)
layout.addWidget(QLabel("Access Token VK:")) layout.addWidget(QLabel("Access Token VK:"))
@@ -359,7 +371,7 @@ class VkChatManager(QMainWindow):
self.resolve_timer.start() self.resolve_timer.start()
def open_multi_link_dialog(self): def open_multi_link_dialog(self):
dialog = MultiLinkDialog(self) dialog = UIMultiLinkDialog(self)
if dialog.exec(): if dialog.exec():
links = dialog.get_links() links = dialog.get_links()
if links: if links:
@@ -931,7 +943,7 @@ class VkChatManager(QMainWindow):
raise last_error raise last_error
def load_saved_token_on_startup(self): def load_saved_token_on_startup(self):
loaded_token, expiration_time = load_token() loaded_token, expiration_time = token_store_load_token(TOKEN_FILE)
if loaded_token: if loaded_token:
self.handle_auth_token_on_load(loaded_token, expiration_time) self.handle_auth_token_on_load(loaded_token, expiration_time)
else: else:
@@ -1057,7 +1069,12 @@ class VkChatManager(QMainWindow):
self.token = token self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время) # Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = save_token(self.token, expires_in) self.token_expiration_time = token_store_save_token(
self.token,
TOKEN_FILE,
APP_DATA_DIR,
expires_in=expires_in,
)
self.token_input.setText(self.token[:50] + "...") self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован") self.status_label.setText("Статус: авторизован")
@@ -1432,6 +1449,151 @@ class VkChatManager(QMainWindow):
self.user_ids_to_process.clear() self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None) self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list):
if not self.vk:
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return
self.user_ids_to_process.clear()
resolved_ids = []
failed_links = []
self._set_busy(True, "Статус: Определяю ID...")
try:
resolved_ids, failed_items = chat_resolve_user_ids(
self._vk_call_with_retry,
self.vk,
links_list,
)
for failed_link, failed_exc in failed_items:
if isinstance(failed_exc, VkApiError):
if self._handle_vk_api_error("resolveScreenName", failed_exc, action_name="получения ID пользователей"):
return
failed_links.append(f"{failed_link} ({self._format_vk_error(failed_exc)})")
else:
failed_links.append(failed_link)
finally:
self._set_busy(False)
self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links:
QMessageBox.warning(
self,
"Ошибка получения ID",
"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links),
)
self.status_label.setText(status_message)
self.set_ui_state(self.token is not None)
def load_chats(self):
self._clear_chat_tabs()
layouts = [
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
self.other_tab.findChild(QWidget).findChild(QVBoxLayout),
]
try:
self._set_busy(True, "Статус: загрузка чатов...")
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
for conv in conversations:
if conv["conversation"]["peer"]["type"] != "chat":
continue
chat_id = conv["conversation"]["peer"]["local_id"]
title = conv["conversation"]["chat_settings"]["title"]
self.chats.append({"id": chat_id, "title": title})
checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id)
if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox)
elif "AG розница" in title:
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
self.retail_chat_checkboxes.append(checkbox)
elif "AG склад" in title:
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
self.warehouse_chat_checkboxes.append(checkbox)
elif "AG кофейни" in title:
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
self.coffee_chat_checkboxes.append(checkbox)
else:
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
self.other_chat_checkboxes.append(checkbox)
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
return
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
self._set_busy(False)
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
if os.name != "nt":
QMessageBox.information(
self,
"Автообновление",
"Автообновление пока поддерживается только в Windows-сборке.",
)
return False
if not getattr(sys, "frozen", False):
QMessageBox.information(
self,
"Автообновление",
"Автообновление доступно в собранной версии приложения (.exe).",
)
return False
if not download_url:
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
return False
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
self._set_busy(True)
try:
work_dir, source_dir = AutoUpdateService.prepare_update(
download_url=download_url,
checksum_url=checksum_url,
download_name=download_name,
)
app_exe = sys.executable
script_path = AutoUpdateService.build_update_script(
app_dir=os.path.dirname(app_exe),
source_dir=source_dir,
exe_name=os.path.basename(app_exe),
target_pid=os.getpid(),
)
AutoUpdateService.launch_update_script(script_path, work_dir)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information(
self,
"Обновление запущено",
"Обновление скачано. Приложение будет перезапущено.",
)
QApplication.instance().quit()
return True
except Exception as e:
self._log_event("auto_update_failed", str(e), level="ERROR")
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
return False
finally:
self._set_busy(False)
if __name__ == "__main__": if __name__ == "__main__":
if "--auth" in sys.argv: if "--auth" in sys.argv:

View File

@@ -1,3 +1,5 @@
from .auto_update_service import AutoUpdateService
from .chat_actions import load_chat_conversations, resolve_user_ids
from .token_store import load_token, save_token
from .update_service import UpdateChecker, detect_update_repository_url from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService from .vk_service import VkService

View File

@@ -0,0 +1,152 @@
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import urllib.request
import zipfile
class AutoUpdateService:
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
@staticmethod
def download_update_text(url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
@classmethod
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = cls.download_update_text(checksum_url)
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = cls.sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
@staticmethod
def locate_extracted_root(extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
@staticmethod
def build_update_script(app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal",
f"set APP_DIR={app_dir}",
f"set SRC_DIR={source_dir}",
f"set EXE_NAME={exe_name}",
f"set TARGET_PID={target_pid}",
"set BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set RC=%ERRORLEVEL%",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set RC=%ERRORLEVEL%",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. > \"%APP_DIR%\\update_error.log\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% > \"%APP_DIR%\\update_error.log\"",
"exit /b %RC%",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
@staticmethod
def launch_update_script(script_path, work_dir):
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod
def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
cls.download_update_archive(download_url, zip_path)
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

46
services/chat_actions.py Normal file
View File

@@ -0,0 +1,46 @@
from urllib.parse import urlparse
def resolve_user_ids(vk_call_with_retry, vk_api, links):
resolved_ids = []
failed_links = []
for link in links:
try:
path = urlparse(link).path
screen_name = path.split("/")[-1] if path else ""
if not screen_name and len(path.split("/")) > 1:
screen_name = path.split("/")[-2]
if not screen_name:
failed_links.append((link, None))
continue
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get("type") == "user":
resolved_ids.append(resolved_object["object_id"])
else:
failed_links.append((link, None))
except Exception as e:
failed_links.append((link, e))
return resolved_ids, failed_links
def load_chat_conversations(vk_call_with_retry, vk_api):
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
return conversations

136
services/token_store.py Normal file
View File

@@ -0,0 +1,136 @@
import base64
import ctypes
import json
import os
import time
from ctypes import wintypes
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def save_token(token, token_file, app_data_dir, expires_in=0):
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(app_data_dir, exist_ok=True)
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception:
pass
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted,
}
with open(token_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return expiration_time
def load_token(token_file):
if not os.path.exists(token_file):
return None, None
with open(token_file, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception:
try:
os.remove(token_file)
except Exception:
pass
return None, None
expiration_time = data.get("expiration_time")
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
try:
os.remove(token_file)
except Exception:
pass
return None, None

View File

@@ -28,7 +28,10 @@ class AuthReloginSmokeTests(unittest.TestCase):
self.assertIn("force_relogin", self.main_source) self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self): def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.main_source) self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source) self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source) self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source) self.assertIn('"execute_user_action",', self.main_source)
@@ -42,17 +45,15 @@ class AuthReloginSmokeTests(unittest.TestCase):
def test_update_check_actions_exist(self): def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source) self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import UpdateChecker, VkService, detect_update_repository_url", self.main_source) self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source) self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source) self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source) self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.main_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source) self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.main_source) self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.main_source) self.assertIn("AutoUpdateService.build_update_script", self.main_source)
self.assertIn("set TARGET_PID=", self.main_source)
self.assertIn("set BACKUP_DIR=", self.main_source)
self.assertIn(":rollback", self.main_source)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,50 @@
import hashlib
import importlib.util
import tempfile
import unittest
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"auto_update_service",
Path("services/auto_update_service.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
AutoUpdateService = _MODULE.AutoUpdateService
class AutoUpdateServiceTests(unittest.TestCase):
def test_extract_sha256_from_text(self):
digest = "a" * 64
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
extracted = AutoUpdateService.extract_sha256_from_text(
text,
"AnabasisManager-1.0.0-win.zip",
)
self.assertEqual(extracted, digest)
def test_sha256_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "payload.bin"
payload = b"anabasis"
path.write_bytes(payload)
expected = hashlib.sha256(payload).hexdigest()
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
def test_build_update_script_contains_core_vars(self):
script = AutoUpdateService.build_update_script(
app_dir=r"C:\Apps\AnabasisManager",
source_dir=r"C:\Temp\Extracted",
exe_name="AnabasisManager.exe",
target_pid=1234,
)
script_text = Path(script).read_text(encoding="utf-8")
self.assertIn("set APP_DIR=", script_text)
self.assertIn("set SRC_DIR=", script_text)
self.assertIn("set EXE_NAME=", script_text)
self.assertIn("set TARGET_PID=", script_text)
self.assertIn(":rollback", script_text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,65 @@
import unittest
import importlib.util
from types import SimpleNamespace
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"chat_actions",
Path("services/chat_actions.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_chat_conversations = _MODULE.load_chat_conversations
resolve_user_ids = _MODULE.resolve_user_ids
class ChatActionsTests(unittest.TestCase):
def test_resolve_user_ids_mixed_results(self):
mapping = {
"id1": {"type": "user", "object_id": 1},
"id2": {"type": "group", "object_id": 2},
}
def call_with_retry(func, **kwargs):
return func(**kwargs)
def resolve_screen_name(screen_name):
if screen_name == "boom":
raise RuntimeError("boom")
return mapping.get(screen_name)
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
links = [
"https://vk.com/id1",
"https://vk.com/id2",
"https://vk.com/boom",
"https://vk.com/",
]
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
self.assertEqual(resolved, [1])
self.assertEqual(len(failed), 3)
self.assertEqual(failed[0][0], "https://vk.com/id2")
self.assertIsNone(failed[0][1])
def test_load_chat_conversations_paginated(self):
pages = [
{"items": [{"id": 1}], "next_from": "page-2"},
{"items": [{"id": 2}]},
]
def get_conversations(**kwargs):
if kwargs.get("start_from") == "page-2":
return pages[1]
return pages[0]
def call_with_retry(func, **kwargs):
return func(**kwargs)
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
items = load_chat_conversations(call_with_retry, vk_api)
self.assertEqual(items, [{"id": 1}, {"id": 2}])
if __name__ == "__main__":
unittest.main()

53
tests/test_token_store.py Normal file
View File

@@ -0,0 +1,53 @@
import tempfile
import unittest
import importlib.util
from pathlib import Path
from unittest.mock import patch
_SPEC = importlib.util.spec_from_file_location(
"token_store",
Path("services/token_store.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_token = _MODULE.load_token
save_token = _MODULE.save_token
class TokenStoreTests(unittest.TestCase):
def test_save_and_load_non_expiring_token(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
expiration = save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=0,
)
token, loaded_expiration = load_token(str(token_file))
self.assertEqual(expiration, 0)
self.assertEqual(token, "abc123")
self.assertEqual(loaded_expiration, 0)
def test_expired_token_is_removed(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
with patch.object(_MODULE.time, "time", return_value=1000):
save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=1,
)
with patch.object(_MODULE.time, "time", return_value=2000):
token, expiration = load_token(str(token_file))
self.assertIsNone(token)
self.assertIsNone(expiration)
if __name__ == "__main__":
unittest.main()

25
ui/dialogs.py Normal file
View File

@@ -0,0 +1,25 @@
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]

9
ui/main_window.py Normal file
View File

@@ -0,0 +1,9 @@
def instructions_text():
return (
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)