From b52cdea4256f452b11fac08d7ecc6f20b7ade083 Mon Sep 17 00:00:00 2001 From: benya Date: Sun, 15 Feb 2026 15:24:45 +0300 Subject: [PATCH] feat(update): stage 1 auto-update (one-click) --- main.py | 116 ++++++++++++++++++++++++++++++- tests/test_auth_relogin_smoke.py | 2 + 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index 64f0cd0..33a6d81 100644 --- a/main.py +++ b/main.py @@ -8,9 +8,12 @@ import time import auth_webview import os import re +import subprocess import threading +import tempfile import urllib.error import urllib.request +import zipfile from app_version import APP_VERSION from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, @@ -695,15 +698,21 @@ class VkChatManager(QMainWindow): f"Доступная версия: {latest_version}\n\n" "Открыть страницу загрузки?" ) + update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole) releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole) cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole) - message_box.setDefaultButton(download_button) + message_box.setDefaultButton(update_now_button) message_box.exec() clicked = message_box.clickedButton() download_url = result.get("download_url") release_url = result.get("release_url") + if clicked is update_now_button and download_url: + if not self._start_auto_update(download_url, latest_version): + if release_url: + QDesktopServices.openUrl(QUrl(release_url)) + return if clicked is download_button and download_url: QDesktopServices.openUrl(QUrl(download_url)) elif clicked in (download_button, releases_button) and release_url: @@ -736,6 +745,111 @@ class VkChatManager(QMainWindow): if not self._update_check_silent: QMessageBox.warning(self, "Проверка обновлений", error_text) + def _download_update_archive(self, download_url, destination_path): + request = urllib.request.Request( + download_url, + headers={"User-Agent": "AnabasisManager-Updater"}, + ) + with urllib.request.urlopen(request, timeout=60) as response: + with open(destination_path, "wb") as f: + shutil.copyfileobj(response, f) + + def _locate_extracted_root(self, extracted_dir): + entries = [] + for name in os.listdir(extracted_dir): + full_path = os.path.join(extracted_dir, name) + if os.path.isdir(full_path): + entries.append(full_path) + if len(entries) == 1: + candidate = entries[0] + if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")): + return candidate + return extracted_dir + + def _build_update_script(self, app_dir, source_dir, exe_name): + script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd") + script_lines = [ + "@echo off", + "setlocal", + f"set APP_DIR={app_dir}", + f"set SRC_DIR={source_dir}", + f"set EXE_NAME={exe_name}", + "timeout /t 2 /nobreak >nul", + "robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:3 /W:1 >nul", + "set RC=%ERRORLEVEL%", + "if %RC% GEQ 8 goto :copy_error", + "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", + "exit /b 0", + ":copy_error", + "echo Auto-update failed with code %RC% > \"%APP_DIR%\\update_error.log\"", + "exit /b %RC%", + ] + with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: + f.write("\r\n".join(script_lines) + "\r\n") + return script_path + + def _start_auto_update(self, download_url, latest_version): + if os.name != "nt": + QMessageBox.information( + self, + "Автообновление", + "Автообновление пока поддерживается только в Windows-сборке.", + ) + return False + if not getattr(sys, "frozen", False): + QMessageBox.information( + self, + "Автообновление", + "Автообновление доступно в собранной версии приложения (.exe).", + ) + return False + if not download_url: + QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.") + return False + + self.status_label.setText(f"Статус: загрузка обновления {latest_version}...") + self._set_busy(True) + work_dir = tempfile.mkdtemp(prefix="anabasis_update_") + zip_path = os.path.join(work_dir, "update.zip") + unpack_dir = os.path.join(work_dir, "extracted") + try: + self._download_update_archive(download_url, zip_path) + os.makedirs(unpack_dir, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as archive: + archive.extractall(unpack_dir) + + source_dir = self._locate_extracted_root(unpack_dir) + app_exe = sys.executable + app_dir = os.path.dirname(app_exe) + exe_name = os.path.basename(app_exe) + script_path = self._build_update_script(app_dir, source_dir, exe_name) + + creation_flags = 0 + if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): + creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP + if hasattr(subprocess, "DETACHED_PROCESS"): + creation_flags |= subprocess.DETACHED_PROCESS + + subprocess.Popen( + ["cmd.exe", "/c", script_path], + cwd=work_dir, + creationflags=creation_flags, + ) + self._log_event("auto_update", f"Update {latest_version} started from {download_url}") + QMessageBox.information( + self, + "Обновление запущено", + "Обновление скачано. Приложение будет перезапущено.", + ) + QTimer.singleShot(150, QApplication.instance().quit) + return True + except Exception as e: + self._log_event("auto_update_failed", str(e), level="ERROR") + QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}") + return False + finally: + self._set_busy(False) + def setup_token_timer(self): self.token_countdown_timer = QTimer(self) self.token_countdown_timer.timeout.connect(self.update_token_timer_display) diff --git a/tests/test_auth_relogin_smoke.py b/tests/test_auth_relogin_smoke.py index 1abd10b..f5090b8 100644 --- a/tests/test_auth_relogin_smoke.py +++ b/tests/test_auth_relogin_smoke.py @@ -43,6 +43,8 @@ class AuthReloginSmokeTests(unittest.TestCase): self.assertIn('QAction("Проверить обновления", self)', self.source) self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source) self.assertIn("class UpdateChecker(QObject):", self.source) + self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source) + self.assertIn("def _start_auto_update(self, download_url, latest_version):", self.source) if __name__ == "__main__":