refactor(core): split vk and update services into modules

This commit is contained in:
2026-02-15 20:04:21 +03:00
parent 02350cfca1
commit 4d84d2ebe5
5 changed files with 266 additions and 245 deletions

217
main.py
View File

@@ -2,7 +2,6 @@ import sys
import base64
import ctypes
import shutil
from vk_api import VkApi
import json
import time
import auth_webview
@@ -12,16 +11,16 @@ import hashlib
import subprocess
import threading
import tempfile
import urllib.error
import urllib.request
import zipfile
from app_version import APP_VERSION
from services import UpdateChecker, VkService, detect_update_repository_url
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices
from urllib.parse import urlparse, parse_qs, unquote
from vk_api.exceptions import VkApiError
@@ -49,69 +48,6 @@ class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _detect_update_repository_url():
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL)
if configured_url:
return configured_url
configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY)
if configured_repo:
return configured_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if match:
remote = match.group(1).strip()
if remote.startswith("git@"):
# git@host:owner/repo(.git)
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
return ""
_crypt32 = None
_kernel32 = None
if os.name == "nt":
@@ -255,149 +191,6 @@ def load_token():
print(f"Ошибка загрузки: {e}")
return None, None
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
if parsed.netloc.lower().endswith("github.com"):
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response:
release_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
@@ -445,7 +238,7 @@ class VkChatManager(QMainWindow):
self._auth_ui_busy = False
self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0
self.update_repository_url = _detect_update_repository_url()
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_checker = None
self.update_thread = None
self._update_check_silent = False
@@ -695,7 +488,7 @@ class VkChatManager(QMainWindow):
self._set_update_action_state(True)
self.status_label.setText("Статус: проверка обновлений...")
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION)
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT)
self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
@@ -1152,7 +945,7 @@ class VkChatManager(QMainWindow):
checkbox.setChecked(checked)
def _build_auth_command(self, auth_url, output_path):
return self.vk_service.build_auth_command(auth_url, output_path)
return self.vk_service.build_auth_command(auth_url, output_path, entry_script_path=os.path.abspath(__file__))
def _on_auth_process_error(self, process_error):
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"

3
services/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService

163
services/update_service.py Normal file
View File

@@ -0,0 +1,163 @@
import json
import os
import re
import urllib.error
import urllib.request
from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
cfg_url = _sanitize_repo_url(configured_url)
if cfg_url:
return cfg_url
cfg_repo = _sanitize_repo_url(configured_repo)
if cfg_repo:
return cfg_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if not match:
return ""
remote = match.group(1).strip()
if remote.startswith("git@"):
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
self.request_timeout = request_timeout
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
if parsed.netloc.lower().endswith("github.com"):
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
release_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)

59
services/vk_service.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import sys
import time
from vk_api import VkApi
from vk_api.exceptions import VkApiError
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path, entry_script_path=None):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
script_path = entry_script_path or os.path.abspath(__file__)
return sys.executable, [script_path, "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)

View File

@@ -5,51 +5,54 @@ from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.source = Path("main.py").read_text(encoding="utf-8")
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source)
self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source)
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.source)
self.assertIn("process.start(program, args)", self.source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source)
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source)
self.assertIn("if self._auth_relogin_in_progress:", self.source)
self.assertIn("force_relogin_backoff", self.source)
self.assertIn("force_relogin", self.source)
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source)
self.assertIn("self._force_relogin(exc, action_name or context)", self.source)
self.assertIn('"load_chats",', self.source)
self.assertIn('"execute_user_action",', self.source)
self.assertIn('"set_user_admin",', self.source)
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.main_source)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.source)
self.assertIn("self.coffee_chat_checkboxes", self.source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.source)
self.assertNotIn("self.retail_coffee_checkboxes", self.source)
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.source)
self.assertIn("UPDATE_REPOSITORY = ", self.source)
self.assertIn('QAction("Проверить обновления", self)', self.source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source)
self.assertIn("class UpdateChecker(QObject):", self.source)
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.source)
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.source)
self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.source)
self.assertIn("set TARGET_PID=", self.source)
self.assertIn("set BACKUP_DIR=", self.source)
self.assertIn(":rollback", self.source)
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import UpdateChecker, VkService, detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.main_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.main_source)
self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.main_source)
self.assertIn("set TARGET_PID=", self.main_source)
self.assertIn("set BACKUP_DIR=", self.main_source)
self.assertIn(":rollback", self.main_source)
if __name__ == "__main__":