import argparse import os import shutil import subprocess import sys import tempfile import time from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl from PySide6.QtGui import QDesktopServices from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout def _write_log(log_path, message): try: os.makedirs(os.path.dirname(log_path), exist_ok=True) with open(log_path, "a", encoding="utf-8") as f: ts = time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"[{ts}] {message.rstrip()}\n") except Exception: pass def _is_pid_running(pid): if pid <= 0: return False try: completed = subprocess.run( ["tasklist", "/FI", f"PID eq {pid}"], capture_output=True, text=True, timeout=5, check=False, ) return str(pid) in (completed.stdout or "") except Exception: return False def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5): last_error = None for _ in range(max(1, retries)): try: os.makedirs(os.path.dirname(target_file), exist_ok=True) shutil.copy2(source_file, target_file) return except Exception as exc: last_error = exc time.sleep(delay) raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}") def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5): skip_set = {name.lower() for name in (skip_names or [])} os.makedirs(dst_dir, exist_ok=True) for root, dirs, files in os.walk(src_dir): rel = os.path.relpath(root, src_dir) target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel) os.makedirs(target_root, exist_ok=True) for file_name in files: if file_name.lower() in skip_set: continue source_file = os.path.join(root, file_name) target_file = os.path.join(target_root, file_name) _copy_file_with_retries(source_file, target_file, retries=retries, delay=delay) def _read_version_marker(base_dir): marker_path = os.path.join(base_dir, "version.txt") if not os.path.exists(marker_path): return "" try: with open(marker_path, "r", encoding="utf-8") as f: return f.read().strip() except Exception: return "" class UpdateWorker(QObject): status = Signal(int, str) failed = Signal(str) done = Signal() def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""): super().__init__() self.app_dir = app_dir self.source_dir = source_dir self.exe_name = exe_name self.target_pid = int(target_pid or 0) self.version = version or "" self.work_dir = work_dir or "" self.log_path = os.path.join(app_dir, "update_error.log") def _start_app(self): app_exe = os.path.join(self.app_dir, self.exe_name) if not os.path.exists(app_exe): raise RuntimeError(f"Не найден файл приложения: {app_exe}") creation_flags = 0 if hasattr(subprocess, "DETACHED_PROCESS"): creation_flags |= subprocess.DETACHED_PROCESS if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags) def run(self): backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}") skip_names = {"anabasisupdater.exe"} prev_version = _read_version_marker(self.app_dir) source_version = _read_version_marker(self.source_dir) expected_version = (self.version or "").strip() try: self.status.emit(1, "Ожидание завершения приложения...") wait_loops = 0 while _is_pid_running(self.target_pid): time.sleep(1) wait_loops += 1 if wait_loops >= 180: self.status.emit(1, "Принудительное завершение зависшего процесса...") subprocess.run( ["taskkill", "/PID", str(self.target_pid), "/T", "/F"], capture_output=True, text=True, timeout=10, check=False, ) time.sleep(2) if _is_pid_running(self.target_pid): raise RuntimeError(f"Процесс {self.target_pid} не завершился.") break self.status.emit(2, "Проверка содержимого обновления...") source_app_exe = os.path.join(self.source_dir, self.exe_name) if not os.path.exists(source_app_exe): raise RuntimeError(f"В обновлении отсутствует {self.exe_name}") if expected_version and source_version and source_version != expected_version: raise RuntimeError( f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})." ) self.status.emit(3, "Создание резервной копии...") _mirror_tree(self.app_dir, backup_dir, skip_names=skip_names) self.status.emit(4, "Применение обновления...") _mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6) self.status.emit(5, "Проверка установленной версии...") installed_version = _read_version_marker(self.app_dir) if expected_version and installed_version and installed_version != expected_version: raise RuntimeError( f"После обновления версия {installed_version}, ожидалась {expected_version}." ) if expected_version and prev_version and prev_version == expected_version: _write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.") self.status.emit(6, "Запуск обновленного приложения...") self._start_app() _write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}") self.status.emit(7, "Очистка временных файлов...") try: shutil.rmtree(backup_dir, ignore_errors=True) if self.work_dir and os.path.isdir(self.work_dir): shutil.rmtree(self.work_dir, ignore_errors=True) except Exception: pass self.done.emit() except Exception as exc: _write_log(self.log_path, f"Update failed: {exc}") try: self.status.emit(6, "Восстановление из резервной копии...") if os.path.isdir(backup_dir): _mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5) _write_log(self.log_path, "Rollback completed.") try: self._start_app() _write_log(self.log_path, "Restored app started after rollback.") except Exception as start_exc: _write_log(self.log_path, f"Failed to start app after rollback: {start_exc}") except Exception as rollback_exc: _write_log(self.log_path, f"Rollback failed: {rollback_exc}") self.failed.emit(str(exc)) class UpdaterWindow(QWidget): def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""): super().__init__() self.setWindowTitle("Anabasis Updater") self.setMinimumWidth(480) self.log_path = os.path.join(app_dir, "update_error.log") self.label = QLabel("Подготовка обновления...") self.label.setWordWrap(True) self.progress = QProgressBar() self.progress.setRange(0, 7) self.progress.setValue(0) self.open_log_btn = QPushButton("Открыть лог") self.open_log_btn.setEnabled(False) self.open_log_btn.clicked.connect(self.open_log) self.close_btn = QPushButton("Закрыть") self.close_btn.setEnabled(False) self.close_btn.clicked.connect(self.close) layout = QVBoxLayout(self) layout.addWidget(self.label) layout.addWidget(self.progress) actions = QHBoxLayout() actions.addStretch(1) actions.addWidget(self.open_log_btn) actions.addWidget(self.close_btn) layout.addLayout(actions) self.thread = QThread(self) self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir) self.worker.moveToThread(self.thread) self.thread.started.connect(self.worker.run) self.worker.status.connect(self.on_status) self.worker.failed.connect(self.on_failed) self.worker.done.connect(self.on_done) self.worker.done.connect(self.thread.quit) self.worker.failed.connect(self.thread.quit) self.thread.start() def on_status(self, step, text): self.label.setText(text) self.progress.setValue(max(0, min(7, int(step)))) def on_done(self): self.label.setText("Обновление успешно применено. Приложение запущено.") self.progress.setValue(7) self.open_log_btn.setEnabled(True) QTimer.singleShot(900, self.close) def on_failed(self, error_text): self.label.setText( "Не удалось применить обновление.\n" f"Причина: {error_text}\n" "Подробности сохранены в update_error.log." ) self.open_log_btn.setEnabled(True) self.close_btn.setEnabled(True) def open_log(self): if os.path.exists(self.log_path): QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path)) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--app-dir", required=True) parser.add_argument("--source-dir", required=True) parser.add_argument("--exe-name", required=True) parser.add_argument("--target-pid", required=True) parser.add_argument("--version", default="") parser.add_argument("--work-dir", default="") return parser.parse_args() def main(): args = parse_args() app = QApplication(sys.argv) app.setStyle("Fusion") window = UpdaterWindow( app_dir=args.app_dir, source_dir=args.source_dir, exe_name=args.exe_name, target_pid=args.target_pid, version=args.version, work_dir=args.work_dir, ) window.show() return app.exec() if __name__ == "__main__": sys.exit(main())