import hashlib import os import re import shutil import subprocess import tempfile import urllib.request import zipfile class AutoUpdateService: @staticmethod def download_update_archive(download_url, destination_path): request = urllib.request.Request( download_url, headers={"User-Agent": "AnabasisManager-Updater"}, ) with urllib.request.urlopen(request, timeout=60) as response: with open(destination_path, "wb") as f: shutil.copyfileobj(response, f) @staticmethod def download_update_text(url): request = urllib.request.Request( url, headers={"User-Agent": "AnabasisManager-Updater"}, ) with urllib.request.urlopen(request, timeout=30) as response: return response.read().decode("utf-8", errors="replace") @staticmethod def sha256_file(path): digest = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest().lower() @staticmethod def extract_sha256_from_text(checksum_text, target_file_name): target = (target_file_name or "").strip().lower() for raw_line in checksum_text.splitlines(): line = raw_line.strip() if not line: continue match = re.search(r"\b([A-Fa-f0-9]{64})\b", line) if not match: continue checksum = match.group(1).lower() if not target: return checksum line_lower = line.lower() if target in line_lower: return checksum if os.path.basename(target) in line_lower: return checksum return "" @classmethod def verify_update_checksum(cls, zip_path, checksum_url, download_name): if not checksum_url: raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.") checksum_text = cls.download_update_text(checksum_url) expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path)) if not expected_hash: raise RuntimeError("Не удалось найти SHA256 для архива обновления.") actual_hash = cls.sha256_file(zip_path) if actual_hash != expected_hash: raise RuntimeError("SHA256 не совпадает, обновление отменено.") @staticmethod def locate_extracted_root(extracted_dir): entries = [] for name in os.listdir(extracted_dir): full_path = os.path.join(extracted_dir, name) if os.path.isdir(full_path): entries.append(full_path) if len(entries) == 1: candidate = entries[0] if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")): return candidate return extracted_dir @staticmethod def build_update_script(app_dir, source_dir, exe_name, target_pid): script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd") script_lines = [ "@echo off", "setlocal EnableExtensions", f"set \"APP_DIR={app_dir}\"", f"set \"SRC_DIR={source_dir}\"", f"set \"EXE_NAME={exe_name}\"", f"set \"TARGET_PID={target_pid}\"", "set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"", "set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"", "echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"", "if not exist \"%SRC_DIR%\\%EXE_NAME%\" (", " echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"", " exit /b 3", ")", "set /a WAIT_LOOPS=0", ":wait_for_exit", "tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul", "if %ERRORLEVEL% EQU 0 (", " set /a WAIT_LOOPS+=1", " if %WAIT_LOOPS% GEQ 180 (", " echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"", " taskkill /PID %TARGET_PID% /T /F >nul 2>&1", " timeout /t 2 /nobreak >nul", " tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul", " if %ERRORLEVEL% EQU 0 goto :pid_still_running", " goto :wait_image_unlock", " )", " timeout /t 1 /nobreak >nul", " goto :wait_for_exit", ")", ":wait_image_unlock", "set /a IMG_LOOPS=0", ":check_image", "tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul", "if %ERRORLEVEL% EQU 0 (", " set /a IMG_LOOPS+=1", " if %IMG_LOOPS% GEQ 60 goto :image_still_running", " timeout /t 1 /nobreak >nul", " goto :check_image", ")", ":backup", "timeout /t 1 /nobreak >nul", "mkdir \"%BACKUP_DIR%\" >nul 2>&1", "robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul", "set \"RC=%ERRORLEVEL%\"", "if %RC% GEQ 8 goto :backup_error", "robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul", "set \"RC=%ERRORLEVEL%\"", "if %RC% GEQ 8 goto :rollback", "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", "timeout /t 2 /nobreak >nul", "tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul", "if %ERRORLEVEL% NEQ 0 goto :rollback", "echo Update success >> \"%UPDATE_LOG%\"", "rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1", "exit /b 0", ":rollback", "robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul", "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", "echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"", "exit /b 2", ":backup_error", "echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"", "exit /b %RC%", ":pid_still_running", "echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"", "exit /b 4", ":image_still_running", "echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"", "exit /b 5", ] with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: f.write("\r\n".join(script_lines) + "\r\n") return script_path @staticmethod def launch_update_script(script_path, work_dir): creation_flags = 0 if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP if hasattr(subprocess, "DETACHED_PROCESS"): creation_flags |= subprocess.DETACHED_PROCESS subprocess.Popen( ["cmd.exe", "/c", script_path], cwd=work_dir, creationflags=creation_flags, ) @staticmethod def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""): app_dir = os.path.dirname(app_exe) exe_name = os.path.basename(app_exe) updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe") if not os.path.exists(updater_exe): raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.") creation_flags = 0 if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP if hasattr(subprocess, "DETACHED_PROCESS"): creation_flags |= subprocess.DETACHED_PROCESS subprocess.Popen( [ updater_exe, "--app-dir", app_dir, "--source-dir", source_dir, "--exe-name", exe_name, "--target-pid", str(target_pid), "--version", str(version or ""), "--work-dir", str(work_dir or ""), ], cwd=work_dir, creationflags=creation_flags, ) @classmethod def prepare_update(cls, download_url, checksum_url, download_name): work_dir = tempfile.mkdtemp(prefix="anabasis_update_") zip_path = os.path.join(work_dir, "update.zip") unpack_dir = os.path.join(work_dir, "extracted") cls.download_update_archive(download_url, zip_path) cls.verify_update_checksum(zip_path, checksum_url, download_name) os.makedirs(unpack_dir, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as archive: archive.extractall(unpack_dir) source_dir = cls.locate_extracted_root(unpack_dir) return work_dir, source_dir