From 4d84d2ebe5202d5ec23f215694486610ab8565db Mon Sep 17 00:00:00 2001 From: benya Date: Sun, 15 Feb 2026 20:04:21 +0300 Subject: [PATCH] refactor(core): split vk and update services into modules --- main.py | 217 +------------------------------ services/__init__.py | 3 + services/update_service.py | 163 +++++++++++++++++++++++ services/vk_service.py | 59 +++++++++ tests/test_auth_relogin_smoke.py | 69 +++++----- 5 files changed, 266 insertions(+), 245 deletions(-) create mode 100644 services/__init__.py create mode 100644 services/update_service.py create mode 100644 services/vk_service.py diff --git a/main.py b/main.py index cc25c8b..a1ac4df 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,6 @@ import sys import base64 import ctypes import shutil -from vk_api import VkApi import json import time import auth_webview @@ -12,16 +11,16 @@ import hashlib import subprocess import threading import tempfile -import urllib.error import urllib.request import zipfile from app_version import APP_VERSION +from services import UpdateChecker, VkService, detect_update_repository_url from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox, QProgressBar) -from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject +from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer from PySide6.QtGui import QIcon, QAction, QDesktopServices from urllib.parse import urlparse, parse_qs, unquote from vk_api.exceptions import VkApiError @@ -49,69 +48,6 @@ class _DataBlob(ctypes.Structure): _fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))] -def _version_key(version_text): - parts = [int(x) for x in re.findall(r"\d+", str(version_text))] - if not parts: - return (0, 0, 0) - while len(parts) < 3: - parts.append(0) - return tuple(parts[:3]) - - -def _is_newer_version(latest_version, current_version): - latest_key = _version_key(latest_version) - current_key = _version_key(current_version) - return latest_key > current_key - - -def _sanitize_repo_url(value): - value = (value or "").strip() - if not value: - return "" - if "://" not in value and value.count("/") == 1: - return f"https://github.com/{value}" - parsed = urlparse(value) - if not parsed.scheme or not parsed.netloc: - return "" - clean_path = parsed.path.rstrip("/") - if clean_path.endswith(".git"): - clean_path = clean_path[:-4] - return f"{parsed.scheme}://{parsed.netloc}{clean_path}" - - -def _detect_update_repository_url(): - env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) - if env_url: - return env_url - env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", "")) - if env_repo: - return env_repo - configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL) - if configured_url: - return configured_url - configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY) - if configured_repo: - return configured_repo - git_config_path = os.path.join(os.path.abspath("."), ".git", "config") - if not os.path.exists(git_config_path): - return "" - try: - with open(git_config_path, "r", encoding="utf-8") as f: - content = f.read() - match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content) - if match: - remote = match.group(1).strip() - if remote.startswith("git@"): - # git@host:owner/repo(.git) - ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote) - if ssh_match: - return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}") - return _sanitize_repo_url(remote) - except Exception: - return "" - return "" - - _crypt32 = None _kernel32 = None if os.name == "nt": @@ -255,149 +191,6 @@ def load_token(): print(f"Ошибка загрузки: {e}") return None, None -class VkService: - def __init__(self): - self.session = None - self.api = None - - def set_token(self, token): - self.session = VkApi(token=token) - self.api = self.session.get_api() - - def clear(self): - self.session = None - self.api = None - - @staticmethod - def build_auth_command(auth_url, output_path): - if getattr(sys, "frozen", False): - return sys.executable, ["--auth", auth_url, output_path] - return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path] - - @staticmethod - def vk_error_code(exc): - error = getattr(exc, "error", None) - if isinstance(error, dict): - return error.get("error_code") - return getattr(exc, "code", None) - - @classmethod - def is_auth_error(cls, exc, formatted_message=None): - code = cls.vk_error_code(exc) - if code == 5: - return True - message = (formatted_message or str(exc)).lower() - return "invalid_access_token" in message or "user authorization failed" in message - - @classmethod - def is_retryable_error(cls, exc): - return cls.vk_error_code(exc) in (6, 9, 10) - - def call_with_retry(self, func, *args, **kwargs): - max_attempts = 5 - for attempt in range(1, max_attempts + 1): - try: - return func(*args, **kwargs) - except VkApiError as e: - if not self.is_retryable_error(e) or attempt == max_attempts: - raise - delay = min(2.0, 0.35 * (2 ** (attempt - 1))) - if self.vk_error_code(e) == 9: - delay = max(delay, 1.0) - time.sleep(delay) - - -class UpdateChecker(QObject): - check_finished = Signal(dict) - check_failed = Signal(str) - - def __init__(self, repository_url, current_version): - super().__init__() - self.repository_url = repository_url - self.current_version = current_version - - def run(self): - if not self.repository_url: - self.check_failed.emit("Не задан URL репозитория обновлений.") - return - - parsed = urlparse(self.repository_url) - base_url = f"{parsed.scheme}://{parsed.netloc}" - repo_path = parsed.path.strip("/") - if not repo_path or repo_path.count("/") < 1: - self.check_failed.emit("Некорректный URL репозитория обновлений.") - return - - if parsed.netloc.lower().endswith("github.com"): - api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" - else: - api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" - releases_url = f"{base_url}/{repo_path}/releases" - request = urllib.request.Request( - api_url, - headers={ - "Accept": "application/vnd.github+json", - "User-Agent": "AnabasisManager-Updater", - }, - ) - try: - with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response: - release_data = json.loads(response.read().decode("utf-8")) - except urllib.error.HTTPError as e: - self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") - return - except urllib.error.URLError as e: - self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}") - return - except Exception as e: - self.check_failed.emit(f"Не удалось проверить обновления: {e}") - return - - latest_tag = release_data.get("tag_name") or release_data.get("name") or "" - latest_version = latest_tag.lstrip("vV").strip() - html_url = release_data.get("html_url") or releases_url - assets = release_data.get("assets") or [] - download_url = "" - download_name = "" - checksum_url = "" - for asset in assets: - url = asset.get("browser_download_url", "") - if url.lower().endswith(".zip"): - download_url = url - download_name = asset.get("name", "") - break - if not download_url and assets: - download_url = assets[0].get("browser_download_url", "") - download_name = assets[0].get("name", "") - - for asset in assets: - name = asset.get("name", "").lower() - if not name: - continue - is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt") - if not is_checksum_asset: - continue - if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")): - checksum_url = asset.get("browser_download_url", "") - break - if not checksum_url: - checksum_url = asset.get("browser_download_url", "") - - self.check_finished.emit( - { - "repository_url": self.repository_url, - "latest_version": latest_version, - "current_version": self.current_version, - "latest_tag": latest_tag, - "release_url": html_url, - "download_url": download_url, - "download_name": download_name, - "checksum_url": checksum_url, - "has_update": _is_newer_version(latest_version, self.current_version), - } - ) - - class MultiLinkDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) @@ -445,7 +238,7 @@ class VkChatManager(QMainWindow): self._auth_ui_busy = False self._auth_relogin_in_progress = False self._last_auth_relogin_ts = 0.0 - self.update_repository_url = _detect_update_repository_url() + self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_checker = None self.update_thread = None self._update_check_silent = False @@ -695,7 +488,7 @@ class VkChatManager(QMainWindow): self._set_update_action_state(True) self.status_label.setText("Статус: проверка обновлений...") - self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION) + self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT) self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) @@ -1152,7 +945,7 @@ class VkChatManager(QMainWindow): checkbox.setChecked(checked) def _build_auth_command(self, auth_url, output_path): - return self.vk_service.build_auth_command(auth_url, output_path) + return self.vk_service.build_auth_command(auth_url, output_path, entry_script_path=os.path.abspath(__file__)) def _on_auth_process_error(self, process_error): self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}" diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..4ee06d9 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1,3 @@ +from .update_service import UpdateChecker, detect_update_repository_url +from .vk_service import VkService + diff --git a/services/update_service.py b/services/update_service.py new file mode 100644 index 0000000..bc81f67 --- /dev/null +++ b/services/update_service.py @@ -0,0 +1,163 @@ +import json +import os +import re +import urllib.error +import urllib.request +from urllib.parse import urlparse + +from PySide6.QtCore import QObject, Signal + + +def _version_key(version_text): + parts = [int(x) for x in re.findall(r"\d+", str(version_text))] + if not parts: + return (0, 0, 0) + while len(parts) < 3: + parts.append(0) + return tuple(parts[:3]) + + +def _is_newer_version(latest_version, current_version): + latest_key = _version_key(latest_version) + current_key = _version_key(current_version) + return latest_key > current_key + + +def _sanitize_repo_url(value): + value = (value or "").strip() + if not value: + return "" + if "://" not in value and value.count("/") == 1: + return f"https://github.com/{value}" + parsed = urlparse(value) + if not parsed.scheme or not parsed.netloc: + return "" + clean_path = parsed.path.rstrip("/") + if clean_path.endswith(".git"): + clean_path = clean_path[:-4] + return f"{parsed.scheme}://{parsed.netloc}{clean_path}" + + +def detect_update_repository_url(configured_url="", configured_repo=""): + env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) + if env_url: + return env_url + env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", "")) + if env_repo: + return env_repo + cfg_url = _sanitize_repo_url(configured_url) + if cfg_url: + return cfg_url + cfg_repo = _sanitize_repo_url(configured_repo) + if cfg_repo: + return cfg_repo + git_config_path = os.path.join(os.path.abspath("."), ".git", "config") + if not os.path.exists(git_config_path): + return "" + try: + with open(git_config_path, "r", encoding="utf-8") as f: + content = f.read() + match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content) + if not match: + return "" + remote = match.group(1).strip() + if remote.startswith("git@"): + ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote) + if ssh_match: + return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}") + return _sanitize_repo_url(remote) + except Exception: + return "" + + +class UpdateChecker(QObject): + check_finished = Signal(dict) + check_failed = Signal(str) + + def __init__(self, repository_url, current_version, request_timeout=8): + super().__init__() + self.repository_url = repository_url + self.current_version = current_version + self.request_timeout = request_timeout + + def run(self): + if not self.repository_url: + self.check_failed.emit("Не задан URL репозитория обновлений.") + return + + parsed = urlparse(self.repository_url) + base_url = f"{parsed.scheme}://{parsed.netloc}" + repo_path = parsed.path.strip("/") + if not repo_path or repo_path.count("/") < 1: + self.check_failed.emit("Некорректный URL репозитория обновлений.") + return + + if parsed.netloc.lower().endswith("github.com"): + api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" + else: + api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" + releases_url = f"{base_url}/{repo_path}/releases" + request = urllib.request.Request( + api_url, + headers={ + "Accept": "application/vnd.github+json", + "User-Agent": "AnabasisManager-Updater", + }, + ) + try: + with urllib.request.urlopen(request, timeout=self.request_timeout) as response: + release_data = json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as e: + self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") + return + except urllib.error.URLError as e: + self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}") + return + except Exception as e: + self.check_failed.emit(f"Не удалось проверить обновления: {e}") + return + + latest_tag = release_data.get("tag_name") or release_data.get("name") or "" + latest_version = latest_tag.lstrip("vV").strip() + html_url = release_data.get("html_url") or releases_url + assets = release_data.get("assets") or [] + download_url = "" + download_name = "" + checksum_url = "" + for asset in assets: + url = asset.get("browser_download_url", "") + if url.lower().endswith(".zip"): + download_url = url + download_name = asset.get("name", "") + break + if not download_url and assets: + download_url = assets[0].get("browser_download_url", "") + download_name = assets[0].get("name", "") + + for asset in assets: + name = asset.get("name", "").lower() + if not name: + continue + is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt") + if not is_checksum_asset: + continue + if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")): + checksum_url = asset.get("browser_download_url", "") + break + if not checksum_url: + checksum_url = asset.get("browser_download_url", "") + + self.check_finished.emit( + { + "repository_url": self.repository_url, + "latest_version": latest_version, + "current_version": self.current_version, + "latest_tag": latest_tag, + "release_url": html_url, + "download_url": download_url, + "download_name": download_name, + "checksum_url": checksum_url, + "has_update": _is_newer_version(latest_version, self.current_version), + } + ) + diff --git a/services/vk_service.py b/services/vk_service.py new file mode 100644 index 0000000..a1b2f10 --- /dev/null +++ b/services/vk_service.py @@ -0,0 +1,59 @@ +import os +import sys +import time + +from vk_api import VkApi +from vk_api.exceptions import VkApiError + + +class VkService: + def __init__(self): + self.session = None + self.api = None + + def set_token(self, token): + self.session = VkApi(token=token) + self.api = self.session.get_api() + + def clear(self): + self.session = None + self.api = None + + @staticmethod + def build_auth_command(auth_url, output_path, entry_script_path=None): + if getattr(sys, "frozen", False): + return sys.executable, ["--auth", auth_url, output_path] + script_path = entry_script_path or os.path.abspath(__file__) + return sys.executable, [script_path, "--auth", auth_url, output_path] + + @staticmethod + def vk_error_code(exc): + error = getattr(exc, "error", None) + if isinstance(error, dict): + return error.get("error_code") + return getattr(exc, "code", None) + + @classmethod + def is_auth_error(cls, exc, formatted_message=None): + code = cls.vk_error_code(exc) + if code == 5: + return True + message = (formatted_message or str(exc)).lower() + return "invalid_access_token" in message or "user authorization failed" in message + + @classmethod + def is_retryable_error(cls, exc): + return cls.vk_error_code(exc) in (6, 9, 10) + + def call_with_retry(self, func, *args, **kwargs): + max_attempts = 5 + for attempt in range(1, max_attempts + 1): + try: + return func(*args, **kwargs) + except VkApiError as e: + if not self.is_retryable_error(e) or attempt == max_attempts: + raise + delay = min(2.0, 0.35 * (2 ** (attempt - 1))) + if self.vk_error_code(e) == 9: + delay = max(delay, 1.0) + time.sleep(delay) diff --git a/tests/test_auth_relogin_smoke.py b/tests/test_auth_relogin_smoke.py index 577fd2a..6ebe384 100644 --- a/tests/test_auth_relogin_smoke.py +++ b/tests/test_auth_relogin_smoke.py @@ -5,51 +5,54 @@ from pathlib import Path class AuthReloginSmokeTests(unittest.TestCase): @classmethod def setUpClass(cls): - cls.source = Path("main.py").read_text(encoding="utf-8") + cls.main_source = Path("main.py").read_text(encoding="utf-8") + cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8") + cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8") def test_auth_command_builder_handles_frozen_and_source(self): - self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source) - self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source) - self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source) + self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source) + self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source) + self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source) + self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source) def test_auth_runs_via_qprocess(self): - self.assertIn("process = QProcess(self)", self.source) - self.assertIn("process.start(program, args)", self.source) - self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source) - self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source) + self.assertIn("process = QProcess(self)", self.main_source) + self.assertIn("process.start(program, args)", self.main_source) + self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source) + self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source) def test_force_relogin_has_backoff_and_event_log(self): - self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source) - self.assertIn("if self._auth_relogin_in_progress:", self.source) - self.assertIn("force_relogin_backoff", self.source) - self.assertIn("force_relogin", self.source) + self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source) + self.assertIn("if self._auth_relogin_in_progress:", self.main_source) + self.assertIn("force_relogin_backoff", self.main_source) + self.assertIn("force_relogin", self.main_source) def test_auth_error_paths_trigger_force_relogin(self): - self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source) - self.assertIn("self._force_relogin(exc, action_name or context)", self.source) - self.assertIn('"load_chats",', self.source) - self.assertIn('"execute_user_action",', self.source) - self.assertIn('"set_user_admin",', self.source) + self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.main_source) + self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source) + self.assertIn('"load_chats",', self.main_source) + self.assertIn('"execute_user_action",', self.main_source) + self.assertIn('"set_user_admin",', self.main_source) def test_tab_checkbox_lists_use_existing_attributes(self): - self.assertIn("self.warehouse_chat_checkboxes", self.source) - self.assertIn("self.coffee_chat_checkboxes", self.source) - self.assertNotIn("self.retail_warehouse_checkboxes", self.source) - self.assertNotIn("self.retail_coffee_checkboxes", self.source) + self.assertIn("self.warehouse_chat_checkboxes", self.main_source) + self.assertIn("self.coffee_chat_checkboxes", self.main_source) + self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source) + self.assertNotIn("self.retail_coffee_checkboxes", self.main_source) def test_update_check_actions_exist(self): - self.assertIn("from app_version import APP_VERSION", self.source) - self.assertIn("UPDATE_REPOSITORY = ", self.source) - self.assertIn('QAction("Проверить обновления", self)', self.source) - self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source) - self.assertIn("class UpdateChecker(QObject):", self.source) - self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source) - self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.source) - self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.source) - self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.source) - self.assertIn("set TARGET_PID=", self.source) - self.assertIn("set BACKUP_DIR=", self.source) - self.assertIn(":rollback", self.source) + self.assertIn("from app_version import APP_VERSION", self.main_source) + self.assertIn("from services import UpdateChecker, VkService, detect_update_repository_url", self.main_source) + self.assertIn('QAction("Проверить обновления", self)', self.main_source) + self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source) + self.assertIn("class UpdateChecker(QObject):", self.update_source) + self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.main_source) + self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source) + self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.main_source) + self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.main_source) + self.assertIn("set TARGET_PID=", self.main_source) + self.assertIn("set BACKUP_DIR=", self.main_source) + self.assertIn(":rollback", self.main_source) if __name__ == "__main__":