11 Commits

Author SHA1 Message Date
df3a4c49c5 fix(build): use absolute icon path for updater pyinstaller
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 2m44s
- pass absolute icon path to PyInstaller to avoid missing icon in updater spec dir

- remove stale updater spec and use --clean before updater build
2026-02-15 21:51:36 +03:00
8d4bc10cb7 feat(updater): stage3 resilient gui update flow
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added retry-based file copy, rollback restart, and version marker validation in updater GUI

- added build step to write dist/version.txt for post-update validation

- added unit tests for updater helpers
2026-02-15 21:46:36 +03:00
a6cee33cf6 feat: improve updater flow and release channels
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added dedicated GUI updater executable and integrated launch path from main app

- added stable/beta update channel selection with persisted settings and checker support

- expanded CI/release validation to include updater and full test discovery
2026-02-15 21:41:18 +03:00
b30437faef ci(dev): add automated prerelease workflow
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 1m52s
- publish dev prereleases on each push to dev

- use tag format vX.Y.Z-<short_commit>

- upload versioned zip and checksum assets
2026-02-15 21:34:28 +03:00
44deba1382 chore(release): bump version to 2.0.0 2026-02-15 21:17:22 +03:00
eda8d43b9c refactor(ui): simplify instructions and stabilize About dialog
- remove duplicated inline instruction text

- switch About dialog to explicit QDialog with clickable link
2026-02-15 21:17:21 +03:00
4e6502bab7 Merge branch 'master' into dev
All checks were successful
Desktop CI / tests (pull_request) Successful in 11s
2026-02-15 20:51:22 +03:00
89237590c7 chore(release): bump version to 1.7.1
All checks were successful
Desktop CI / tests (pull_request) Successful in 12s
2026-02-15 20:50:36 +03:00
aca2bdfa85 fix(imports): restore shutil in main module
- fix unresolved reference for cache cleanup path
2026-02-15 20:48:39 +03:00
9d40f0017e refactor(ui): cleanup legacy code and add About dialog
- remove duplicate legacy implementations from main window

- add Help -> About with clickable repository link
2026-02-15 20:47:41 +03:00
798eacbf9a merge: release 1.7.0
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 1m49s
2026-02-15 20:38:32 +03:00
12 changed files with 879 additions and 582 deletions

View File

@@ -28,8 +28,8 @@ jobs:
- name: Validate syntax - name: Validate syntax
run: | run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
- name: Run tests - name: Run tests
run: | run: |
python -m unittest tests/test_auth_relogin_smoke.py python -m unittest discover -s tests -p "test_*.py" -v

View File

@@ -0,0 +1,148 @@
name: Desktop Dev Pre-release
on:
push:
branches:
- dev
workflow_dispatch:
jobs:
prerelease:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract prerelease metadata
id: meta
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$commit = (git rev-parse --short HEAD).Trim()
$tag = "v$version-$commit"
$archive = "AnabasisManager-$version-$commit"
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "commit=$commit`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "tag=$tag`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "archive=$archive`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
Write-Host "Detected tag: $tag"
- name: Stop if prerelease already exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Pre-release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Pre-release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Prepare prerelease artifacts
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.meta.outputs.version }}"
$archiveBase = "${{ steps.meta.outputs.archive }}"
$srcZip = "dist/AnabasisManager-$version.zip"
$dstZip = "dist/$archiveBase.zip"
if (-not (Test-Path $srcZip)) {
throw "Archive not found: $srcZip"
}
Copy-Item -Path $srcZip -Destination $dstZip -Force
$hash = (Get-FileHash -Path $dstZip -Algorithm SHA256).Hash.ToLower()
"$hash $archiveBase.zip" | Set-Content -Path "dist/$archiveBase.zip.sha256" -Encoding UTF8
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Pre-release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: ${{ steps.meta.outputs.tag }}
name: Anabasis Manager ${{ steps.meta.outputs.version }} (dev ${{ steps.meta.outputs.commit }})
prerelease: true
body: |
Development pre-release for commit ${{ steps.meta.outputs.commit }}
Version base: ${{ steps.meta.outputs.version }}
files: |
dist/${{ steps.meta.outputs.archive }}.zip
dist/${{ steps.meta.outputs.archive }}.zip.sha256

View File

@@ -70,8 +70,8 @@ jobs:
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
shell: powershell shell: powershell
run: | run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest tests/test_auth_relogin_smoke.py python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip - name: Build release zip
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'

View File

@@ -1 +1 @@
APP_VERSION = "1.7.0" APP_VERSION = "2.0.0"

View File

@@ -6,12 +6,14 @@ from app_version import APP_VERSION
# --- Конфигурация --- # --- Конфигурация ---
APP_NAME = "AnabasisManager" APP_NAME = "AnabasisManager"
UPDATER_NAME = "AnabasisUpdater"
VERSION = APP_VERSION # Единая версия приложения VERSION = APP_VERSION # Единая версия приложения
MAIN_SCRIPT = "main.py" MAIN_SCRIPT = "main.py"
UPDATER_SCRIPT = "updater_gui.py"
ICON_PATH = "icon.ico" ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME) DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"} SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
REMOVE_LIST = [ REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll", "Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll", "Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
@@ -22,6 +24,18 @@ REMOVE_LIST = [
] ]
def write_version_marker():
marker_path = os.path.join(DIST_DIR, "version.txt")
try:
os.makedirs(DIST_DIR, exist_ok=True)
with open(marker_path, "w", encoding="utf-8") as f:
f.write(str(VERSION).strip() + "\n")
print(f"[OK] Обновлен маркер версии: {marker_path}")
except Exception as e:
print(f"[ERROR] Не удалось записать version.txt: {e}")
sys.exit(1)
def ensure_project_root(): def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)] missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing: if missing:
@@ -32,6 +46,8 @@ def ensure_project_root():
def run_build(): def run_build():
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---") print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
command = [ command = [
"pyinstaller", "pyinstaller",
@@ -42,8 +58,8 @@ def run_build():
"--exclude-module", "PySide6.QtWebEngineWidgets", "--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick", "--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}", f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "", f"--icon={icon_abs_path}" if has_icon else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "", f"--add-data={icon_abs_path}{os.pathsep}." if has_icon else "",
f"--add-data=auth_webview.py{os.pathsep}.", f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT MAIN_SCRIPT
] ]
@@ -58,6 +74,36 @@ def run_build():
sys.exit(1) sys.exit(1)
def run_updater_build():
print(f"\n--- 1.2 Сборка {UPDATER_NAME} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
updater_spec_dir = os.path.join("build", "updater_spec")
updater_spec_path = os.path.join(updater_spec_dir, f"{UPDATER_NAME}.spec")
if os.path.exists(updater_spec_path):
os.remove(updater_spec_path)
command = [
"pyinstaller",
"--noconfirm",
"--clean",
"--onefile",
"--windowed",
f"--name={UPDATER_NAME}",
"--distpath", DIST_DIR,
"--workpath", os.path.join("build", "updater"),
"--specpath", updater_spec_dir,
f"--icon={icon_abs_path}" if has_icon else "",
UPDATER_SCRIPT,
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print(f"[OK] {UPDATER_NAME} собран.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Ошибка при сборке {UPDATER_NAME}: {e}")
sys.exit(1)
def run_cleanup(): def run_cleanup():
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---") print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
@@ -99,7 +145,9 @@ if __name__ == "__main__":
shutil.rmtree(folder) shutil.rmtree(folder)
run_build() run_build()
run_updater_build()
run_cleanup() run_cleanup()
write_version_marker()
create_archive() create_archive()
print("\n" + "=" * 30) print("\n" + "=" * 30)

678
main.py
View File

@@ -1,18 +1,22 @@
import sys
import base64
import ctypes
import shutil
import json import json
import time
import auth_webview
import os import os
import re import shutil
import hashlib import sys
import subprocess
import threading import threading
import tempfile import time
import urllib.request
import zipfile from PySide6.QtCore import QProcess
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QTabWidget, QDialog, QDialogButtonBox,
QProgressBar)
from vk_api.exceptions import VkApiError
import auth_webview
from app_version import APP_VERSION from app_version import APP_VERSION
from services import ( from services import (
AutoUpdateService, AutoUpdateService,
@@ -26,22 +30,14 @@ from services import (
) )
from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog
from ui.main_window import instructions_text from ui.main_window import instructions_text
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices
from urllib.parse import parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess
from ctypes import wintypes
# --- Управление токенами и настройками --- # --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager") APP_DATA_DIR = os.path.join(
QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation),
"AnabasisVKChatManager",
)
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
SETTINGS_FILE = os.path.join(APP_DATA_DIR, "settings.json")
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup") CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log") LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
@@ -52,78 +48,10 @@ AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
UPDATE_REPOSITORY = "" UPDATE_REPOSITORY = ""
# Full repository URL is preferred (supports GitHub/Gitea). # Full repository URL is preferred (supports GitHub/Gitea).
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8 UPDATE_REQUEST_TIMEOUT = 8
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def get_resource_path(relative_path): def get_resource_path(relative_path):
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """ """ Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай) if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
@@ -131,98 +59,6 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска # Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path) return os.path.join(os.path.abspath("."), relative_path)
def save_token(token, expires_in=0):
"""Сохраняет токен. Если expires_in=0, токен считается бессрочным."""
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(APP_DATA_DIR, exist_ok=True)
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception as e:
print(f"Ошибка шифрования токена: {e}")
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted
}
try:
with open(TOKEN_FILE, "w") as f:
json.dump(data, f)
status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString()
print(f"Токен сохранен. Срок действия: {status}")
return expiration_time
except IOError as e:
print(f"Ошибка сохранения токена: {e}")
return None
def load_token():
"""Загружает токен и проверяет его валидность."""
try:
if not os.path.exists(TOKEN_FILE):
return None, None
with open(TOKEN_FILE, "r") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception as e:
print(f"Ошибка расшифровки токена: {e}")
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
expiration_time = data.get("expiration_time")
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
else:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
except Exception as e:
print(f"Ошибка загрузки: {e}")
return None, None
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()]
class VkChatManager(QMainWindow): class VkChatManager(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -250,6 +86,7 @@ class VkChatManager(QMainWindow):
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0 self._last_auth_relogin_ts = 0.0
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None self.update_checker = None
self.update_thread = None self.update_thread = None
self._update_check_silent = False self._update_check_silent = False
@@ -261,6 +98,7 @@ class VkChatManager(QMainWindow):
self._cleanup_cache_if_needed() self._cleanup_cache_if_needed()
self._ensure_log_dir() self._ensure_log_dir()
self._load_settings()
self.init_ui() self.init_ui()
self.load_saved_token_on_startup() self.load_saved_token_on_startup()
self.setup_token_timer() self.setup_token_timer()
@@ -274,14 +112,6 @@ class VkChatManager(QMainWindow):
layout.setSpacing(5) layout.setSpacing(5)
self.instructions = QTextBrowser() self.instructions = QTextBrowser()
self.instructions.setPlainText(
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)
self.instructions.setFixedHeight(120) self.instructions.setFixedHeight(120)
self.instructions.setPlainText(instructions_text()) self.instructions.setPlainText(instructions_text())
layout.addWidget(self.instructions) layout.addWidget(self.instructions)
@@ -293,7 +123,7 @@ class VkChatManager(QMainWindow):
layout.addWidget(self.token_input) layout.addWidget(self.token_input)
self.token_timer_label = QLabel("Срок действия токена: Н") self.token_timer_label = QLabel("Срок действия токена: Н")
self.token_timer_label.setAlignment(Qt.AlignRight) self.token_timer_label.setAlignment(Qt.AlignmentFlag.AlignRight)
layout.addWidget(self.token_timer_label) layout.addWidget(self.token_timer_label)
self.auth_btn = QPushButton("Авторизоваться через VK") self.auth_btn = QPushButton("Авторизоваться через VK")
@@ -355,7 +185,7 @@ class VkChatManager(QMainWindow):
self.add_user_btn.clicked.connect(self.add_user_to_chat) self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn) layout.addWidget(self.add_user_btn)
self.status_label = QLabel("Статус: не авторизован") self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignCenter) self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.status_label) layout.addWidget(self.status_label)
layout.addStretch(1) layout.addStretch(1)
@@ -389,55 +219,6 @@ class VkChatManager(QMainWindow):
return return
self._process_links_list([url]) self._process_links_list([url])
def _process_links_list(self, links_list):
if not self.vk:
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return
self.user_ids_to_process.clear()
resolved_ids = []
failed_links = []
self._set_busy(True, "Статус: Определяю ID...")
try:
for link in links_list:
try:
path = urlparse(link).path
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
if not screen_name:
failed_links.append(link)
continue
resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except VkApiError as e:
if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"):
return
failed_links.append(f"{link} ({self._format_vk_error(e)})")
except Exception:
failed_links.append(link)
finally:
self._set_busy(False)
self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links:
QMessageBox.warning(self, "Ошибка получения ID",
f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links))
self.status_label.setText(status_message)
self.set_ui_state(self.token is not None)
def create_menu(self): def create_menu(self):
"""Создает верхнее меню.""" """Создает верхнее меню."""
menu_bar = self.menuBar() menu_bar = self.menuBar()
@@ -458,12 +239,68 @@ class VkChatManager(QMainWindow):
tools_menu.addAction(check_updates_action) tools_menu.addAction(check_updates_action)
self.check_updates_action = check_updates_action self.check_updates_action = check_updates_action
channel_menu = tools_menu.addMenu("Канал обновлений")
self.update_channel_group = QActionGroup(self)
self.update_channel_group.setExclusive(True)
stable_channel_action = QAction("Релизы (stable)", self)
stable_channel_action.setCheckable(True)
stable_channel_action.setChecked(self.update_channel == "stable")
stable_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("stable"))
channel_menu.addAction(stable_channel_action)
self.update_channel_group.addAction(stable_channel_action)
self.update_channel_stable_action = stable_channel_action
beta_channel_action = QAction("Бета (pre-release)", self)
beta_channel_action.setCheckable(True)
beta_channel_action.setChecked(self.update_channel == "beta")
beta_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("beta"))
channel_menu.addAction(beta_channel_action)
self.update_channel_group.addAction(beta_channel_action)
self.update_channel_beta_action = beta_channel_action
logout_action = QAction("Выйти и очистить", self) logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш") logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear) logout_action.triggered.connect(self.logout_and_clear)
tools_menu.addAction(logout_action) tools_menu.addAction(logout_action)
self.logout_action = logout_action self.logout_action = logout_action
help_menu = menu_bar.addMenu("Справка")
about_action = QAction("О приложении", self)
about_action.setStatusTip("Показать информацию о приложении")
about_action.triggered.connect(self.show_about_dialog)
help_menu.addAction(about_action)
self.about_action = about_action
def show_about_dialog(self):
dialog = QDialog(self)
dialog.setWindowTitle("О приложении")
dialog.setMinimumWidth(460)
repo_url = self.update_repository_url
if repo_url:
repo_html = f'<a href="{repo_url}">{repo_url}</a>'
else:
repo_html = "не указан"
content = QLabel(
f"<b>Anabasis Chat Manager</b><br>"
f"Версия: {APP_VERSION}<br><br>"
"Инструмент для массового управления пользователями в чатах VK.<br>"
"Поддерживается проверка обновлений и автообновление Windows-сборки.<br><br>"
f"Репозиторий: {repo_html}"
)
content.setTextFormat(Qt.TextFormat.RichText)
content.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction)
content.setOpenExternalLinks(True)
content.setWordWrap(True)
button_box = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok, parent=dialog)
button_box.accepted.connect(dialog.accept)
layout = QVBoxLayout(dialog)
layout.addWidget(content)
layout.addWidget(button_box)
dialog.exec()
def create_chat_tab(self): def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists. # This implementation correctly creates a scrollable area for chat lists.
tab_content_widget = QWidget() tab_content_widget = QWidget()
@@ -473,8 +310,8 @@ class VkChatManager(QMainWindow):
scroll_area = QScrollArea() scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True) scroll_area.setWidgetResizable(True)
scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
tab_layout.addWidget(scroll_area) tab_layout.addWidget(scroll_area)
@@ -492,15 +329,60 @@ class VkChatManager(QMainWindow):
if hasattr(self, "check_updates_action"): if hasattr(self, "check_updates_action"):
self.check_updates_action.setEnabled(not in_progress) self.check_updates_action.setEnabled(not in_progress)
def _normalize_update_channel(self, value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _load_settings(self):
self.update_channel = UPDATE_CHANNEL_DEFAULT
if not os.path.exists(SETTINGS_FILE):
return
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
self.update_channel = self._normalize_update_channel(settings.get("update_channel"))
except Exception as e:
self._log_event("settings_load", f"Ошибка загрузки настроек: {e}", level="WARN")
def _save_settings(self):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
settings = {
"update_channel": self.update_channel,
}
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
except Exception as e:
self._log_event("settings_save", f"Ошибка сохранения настроек: {e}", level="WARN")
def set_update_channel(self, channel):
normalized = self._normalize_update_channel(channel)
if normalized == self.update_channel:
return
self.update_channel = normalized
self._save_settings()
self.status_label.setText(
f"Статус: канал обновлений переключен на {'бета' if normalized == 'beta' else 'релизы'}."
)
self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False): def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive(): if self.update_thread and self.update_thread.is_alive():
return return
self._update_check_silent = silent_no_updates self._update_check_silent = silent_no_updates
self._set_update_action_state(True) self._set_update_action_state(True)
self.status_label.setText("Статус: проверка обновлений...") channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT) self.update_checker = UpdateChecker(
self.update_repository_url,
APP_VERSION,
request_timeout=UPDATE_REQUEST_TIMEOUT,
channel=self.update_channel,
)
self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
@@ -516,17 +398,17 @@ class VkChatManager(QMainWindow):
self.status_label.setText(f"Статус: доступно обновление {latest_version}") self.status_label.setText(f"Статус: доступно обновление {latest_version}")
message_box = QMessageBox(self) message_box = QMessageBox(self)
message_box.setIcon(QMessageBox.Information) message_box.setIcon(QMessageBox.Icon.Information)
message_box.setWindowTitle("Доступно обновление") message_box.setWindowTitle("Доступно обновление")
message_box.setText( message_box.setText(
f"Текущая версия: {result.get('current_version')}\n" f"Текущая версия: {result.get('current_version')}\n"
f"Доступная версия: {latest_version}\n\n" f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?" "Открыть страницу загрузки?"
) )
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole) update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole) releases_button = message_box.addButton("Релизы", QMessageBox.ButtonRole.ActionRole)
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole) cancel_button = message_box.addButton("Позже", QMessageBox.ButtonRole.RejectRole)
message_box.setDefaultButton(update_now_button) message_box.setDefaultButton(update_now_button)
message_box.exec() message_box.exec()
@@ -548,9 +430,10 @@ class VkChatManager(QMainWindow):
self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN") self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN")
return return
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}).") channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}, канал: {channel_label}).")
if not self._update_check_silent: if not self._update_check_silent:
QMessageBox.information(self, "Обновления", "Установлена актуальная версия.") QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
def _on_update_check_failed(self, error_text): def _on_update_check_failed(self, error_text):
self._set_update_action_state(False) self._set_update_action_state(False)
@@ -572,180 +455,6 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent: if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text) QMessageBox.warning(self, "Проверка обновлений", error_text)
def _download_update_archive(self, download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
def _download_update_text(self, url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def _sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def _extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
def _verify_update_checksum(self, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = self._download_update_text(checksum_url)
expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = self._sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
def _locate_extracted_root(self, extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal",
f"set APP_DIR={app_dir}",
f"set SRC_DIR={source_dir}",
f"set EXE_NAME={exe_name}",
f"set TARGET_PID={target_pid}",
"set BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set RC=%ERRORLEVEL%",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set RC=%ERRORLEVEL%",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. > \"%APP_DIR%\\update_error.log\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% > \"%APP_DIR%\\update_error.log\"",
"exit /b %RC%",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
if os.name != "nt":
QMessageBox.information(
self,
"Автообновление",
"Автообновление пока поддерживается только в Windows-сборке.",
)
return False
if not getattr(sys, "frozen", False):
QMessageBox.information(
self,
"Автообновление",
"Автообновление доступно в собранной версии приложения (.exe).",
)
return False
if not download_url:
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
return False
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
self._set_busy(True)
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
try:
self._download_update_archive(download_url, zip_path)
self._verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = self._locate_extracted_root(unpack_dir)
app_exe = sys.executable
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
script_path = self._build_update_script(app_dir, source_dir, exe_name, os.getpid())
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information(
self,
"Обновление запущено",
"Обновление скачано. Приложение будет перезапущено.",
)
QApplication.instance().quit()
return True
except Exception as e:
self._log_event("auto_update_failed", str(e), level="ERROR")
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
return False
finally:
self._set_busy(False)
def setup_token_timer(self): def setup_token_timer(self):
self.token_countdown_timer = QTimer(self) self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -819,7 +528,7 @@ class VkChatManager(QMainWindow):
self.status_label.setText(status_text) self.status_label.setText(status_text)
if busy: if busy:
self._busy = True self._busy = True
QApplication.setOverrideCursor(Qt.WaitCursor) QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
for widget in [ for widget in [
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
@@ -903,9 +612,9 @@ class VkChatManager(QMainWindow):
self, self,
"Подтверждение выхода", "Подтверждение выхода",
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?", "Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
QMessageBox.Yes | QMessageBox.No QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
) )
if confirm != QMessageBox.Yes: if confirm != QMessageBox.StandardButton.Yes:
return return
self._clear_auth_state(stop_timer=True, remove_token_file=True) self._clear_auth_state(stop_timer=True, remove_token_file=True)
@@ -962,7 +671,7 @@ class VkChatManager(QMainWindow):
def _on_auth_process_error(self, process_error): def _on_auth_process_error(self, process_error):
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}" self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
# For failed starts Qt may not emit finished(), so release UI here. # For failed starts Qt may not emit finished(), so release UI here.
if self.auth_process and self.auth_process.state() == QProcess.NotRunning: if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:
output_path = self.auth_output_path output_path = self.auth_output_path
self.auth_output_path = None self.auth_output_path = None
self.auth_process = None self.auth_process = None
@@ -1022,7 +731,7 @@ class VkChatManager(QMainWindow):
self.handle_new_auth_token(token, expires_in) self.handle_new_auth_token(token, expires_in)
def start_auth(self, keep_status_text=False): def start_auth(self, keep_status_text=False):
if self.auth_process and self.auth_process.state() != QProcess.NotRunning: if self.auth_process and self.auth_process.state() != QProcess.ProcessState.NotRunning:
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.") self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
return return
@@ -1182,83 +891,6 @@ class VkChatManager(QMainWindow):
def _vk_call_with_retry(self, func, *args, **kwargs): def _vk_call_with_retry(self, func, *args, **kwargs):
return self.vk_service.call_with_retry(func, *args, **kwargs) return self.vk_service.call_with_retry(func, *args, **kwargs)
def load_chats(self):
self._clear_chat_tabs()
# Get the checkbox layouts from each tab
layouts = [
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
self.other_tab.findChild(QWidget).findChild(QVBoxLayout)
]
try:
self._set_busy(True, "Статус: загрузка чатов...")
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
for conv in conversations:
if conv['conversation']['peer']['type'] == 'chat':
chat_id = conv['conversation']['peer']['local_id']
title = conv['conversation']['chat_settings']['title']
self.chats.append({'id': chat_id, 'title': title})
checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id)
# Insert checkbox at the top of the layout (before the stretch)
if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox)
elif "AG розница" in title:
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
self.retail_chat_checkboxes.append(checkbox)
elif "AG склад" in title:
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
self.warehouse_chat_checkboxes.append(checkbox)
elif "AG кофейни" in title:
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
self.coffee_chat_checkboxes.append(checkbox)
else:
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
self.other_chat_checkboxes.append(checkbox)
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
if self._handle_vk_api_error(
"load_chats",
e,
action_name="загрузки чатов",
):
return
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
self._set_busy(False)
def get_user_info_by_id(self, user_id): def get_user_info_by_id(self, user_id):
try: try:
user = self.vk.users.get(user_ids=user_id)[0] user = self.vk.users.get(user_ids=user_id)[0]
@@ -1323,9 +955,9 @@ class VkChatManager(QMainWindow):
confirm_dialog = QMessageBox(self) confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение действия") confirm_dialog.setWindowTitle("Подтверждение действия")
confirm_dialog.setText(msg) confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question) confirm_dialog.setIcon(QMessageBox.Icon.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole)
confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию
confirm_dialog.exec() confirm_dialog.exec()
@@ -1398,9 +1030,9 @@ class VkChatManager(QMainWindow):
confirm_dialog = QMessageBox(self) confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение прав") confirm_dialog.setWindowTitle("Подтверждение прав")
confirm_dialog.setText(msg) confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question) confirm_dialog.setIcon(QMessageBox.Icon.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole)
confirm_dialog.setDefaultButton(no_button) confirm_dialog.setDefaultButton(no_button)
confirm_dialog.exec() confirm_dialog.exec()
@@ -1572,18 +1204,18 @@ class VkChatManager(QMainWindow):
download_name=download_name, download_name=download_name,
) )
app_exe = sys.executable app_exe = sys.executable
script_path = AutoUpdateService.build_update_script( AutoUpdateService.launch_gui_updater(
app_dir=os.path.dirname(app_exe), app_exe=app_exe,
source_dir=source_dir, source_dir=source_dir,
exe_name=os.path.basename(app_exe), work_dir=work_dir,
target_pid=os.getpid(), target_pid=os.getpid(),
version=latest_version,
) )
AutoUpdateService.launch_update_script(script_path, work_dir)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}") self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information( QMessageBox.information(
self, self,
"Обновление запущено", "Обновление запущено",
"Обновление скачано. Приложение будет перезапущено.", "Обновление скачано. Открылось окно обновления.",
) )
QApplication.instance().quit() QApplication.instance().quit()
return True return True

View File

@@ -104,12 +104,26 @@ class AutoUpdateService:
"if %ERRORLEVEL% EQU 0 (", "if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1", " set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (", " if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID% to exit >> \"%UPDATE_LOG%\"", " echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"",
" goto :backup", " taskkill /PID %TARGET_PID% /T /F >nul 2>&1",
" timeout /t 2 /nobreak >nul",
" tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
" if %ERRORLEVEL% EQU 0 goto :pid_still_running",
" goto :wait_image_unlock",
" )", " )",
" timeout /t 1 /nobreak >nul", " timeout /t 1 /nobreak >nul",
" goto :wait_for_exit", " goto :wait_for_exit",
")", ")",
":wait_image_unlock",
"set /a IMG_LOOPS=0",
":check_image",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a IMG_LOOPS+=1",
" if %IMG_LOOPS% GEQ 60 goto :image_still_running",
" timeout /t 1 /nobreak >nul",
" goto :check_image",
")",
":backup", ":backup",
"timeout /t 1 /nobreak >nul", "timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1", "mkdir \"%BACKUP_DIR%\" >nul 2>&1",
@@ -134,6 +148,12 @@ class AutoUpdateService:
":backup_error", ":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"", "echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%", "exit /b %RC%",
":pid_still_running",
"echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"",
"exit /b 4",
":image_still_running",
"echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"",
"exit /b 5",
] ]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n") f.write("\r\n".join(script_lines) + "\r\n")
@@ -152,6 +172,40 @@ class AutoUpdateService:
creationflags=creation_flags, creationflags=creation_flags,
) )
@staticmethod
def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""):
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe")
if not os.path.exists(updater_exe):
raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.")
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
[
updater_exe,
"--app-dir",
app_dir,
"--source-dir",
source_dir,
"--exe-name",
exe_name,
"--target-pid",
str(target_pid),
"--version",
str(version or ""),
"--work-dir",
str(work_dir or ""),
],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod @classmethod
def prepare_update(cls, download_url, checksum_url, download_name): def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_") work_dir = tempfile.mkdtemp(prefix="anabasis_update_")

View File

@@ -38,6 +38,75 @@ def _sanitize_repo_url(value):
return f"{parsed.scheme}://{parsed.netloc}{clean_path}" return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _normalize_update_channel(value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _select_release_from_list(releases):
for item in releases:
if not isinstance(item, dict):
continue
if item.get("draft"):
continue
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
if not tag_name:
continue
return item
return None
def _extract_release_payload(release_data, repository_url, current_version):
parsed = urlparse(repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
releases_url = f"{base_url}/{repo_path}/releases"
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
return {
"repository_url": repository_url,
"latest_version": latest_version,
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, current_version),
}
def detect_update_repository_url(configured_url="", configured_repo=""): def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url: if env_url:
@@ -74,11 +143,12 @@ class UpdateChecker(QObject):
check_finished = Signal(dict) check_finished = Signal(dict)
check_failed = Signal(str) check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8): def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
super().__init__() super().__init__()
self.repository_url = repository_url self.repository_url = repository_url
self.current_version = current_version self.current_version = current_version
self.request_timeout = request_timeout self.request_timeout = request_timeout
self.channel = _normalize_update_channel(channel)
def run(self): def run(self):
if not self.repository_url: if not self.repository_url:
@@ -92,8 +162,15 @@ class UpdateChecker(QObject):
self.check_failed.emit("Некорректный URL репозитория обновлений.") self.check_failed.emit("Некорректный URL репозитория обновлений.")
return return
use_beta_channel = self.channel == "beta"
if parsed.netloc.lower().endswith("github.com"): if parsed.netloc.lower().endswith("github.com"):
if use_beta_channel:
api_url = f"https://api.github.com/repos/{repo_path}/releases"
else:
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
if use_beta_channel:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
else: else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases" releases_url = f"{base_url}/{repo_path}/releases"
@@ -106,7 +183,7 @@ class UpdateChecker(QObject):
) )
try: try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response: with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
release_data = json.loads(response.read().decode("utf-8")) response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return return
@@ -117,47 +194,20 @@ class UpdateChecker(QObject):
self.check_failed.emit(f"Не удалось проверить обновления: {e}") self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return return
latest_tag = release_data.get("tag_name") or release_data.get("name") or "" release_data = response_data
latest_version = latest_tag.lstrip("vV").strip() if use_beta_channel:
html_url = release_data.get("html_url") or releases_url if not isinstance(response_data, list):
assets = release_data.get("assets") or [] self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
download_url = "" return
download_name = "" release_data = _select_release_from_list(response_data)
checksum_url = "" if not release_data:
for asset in assets: self.check_failed.emit("В канале beta не найдено доступных релизов.")
url = asset.get("browser_download_url", "") return
if url.lower().endswith(".zip"): elif not isinstance(response_data, dict):
download_url = url self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
download_name = asset.get("name", "") return
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
payload["release_channel"] = self.channel
payload["releases_url"] = releases_url
self.check_finished.emit(payload)

View File

@@ -19,7 +19,7 @@ class AuthReloginSmokeTests(unittest.TestCase):
self.assertIn("process = QProcess(self)", self.main_source) self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source) self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source) self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source) self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self): def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source) self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
@@ -53,7 +53,7 @@ class AuthReloginSmokeTests(unittest.TestCase):
self.assertIn("class UpdateChecker(QObject):", self.update_source) self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source) self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source) self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.build_update_script", self.main_source) self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,51 @@
import unittest
import importlib.util
from pathlib import Path
MODULE_PATH = Path("services/update_service.py")
SPEC = importlib.util.spec_from_file_location("update_service_under_test", MODULE_PATH)
update_service = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(update_service)
class UpdateServiceTests(unittest.TestCase):
def test_normalize_update_channel(self):
self.assertEqual(update_service._normalize_update_channel("stable"), "stable")
self.assertEqual(update_service._normalize_update_channel("beta"), "beta")
self.assertEqual(update_service._normalize_update_channel("pre-release"), "beta")
self.assertEqual(update_service._normalize_update_channel("unknown"), "stable")
self.assertEqual(update_service._normalize_update_channel(""), "stable")
def test_select_release_from_list_skips_drafts(self):
releases = [
{"tag_name": "v2.0.0", "draft": True},
{"tag_name": "", "draft": False},
{"tag_name": "v1.9.0-beta.1", "draft": False},
]
selected = update_service._select_release_from_list(releases)
self.assertIsNotNone(selected)
self.assertEqual(selected["tag_name"], "v1.9.0-beta.1")
def test_extract_release_payload_uses_zip_and_checksum(self):
release_data = {
"tag_name": "v1.7.2",
"html_url": "https://example.com/release/v1.7.2",
"assets": [
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
],
}
payload = update_service._extract_release_payload(
release_data=release_data,
repository_url="https://git.daemonlord.ru/benya/AnabasisChatRemove",
current_version="1.7.1",
)
self.assertEqual(payload["latest_version"], "1.7.2")
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
self.assertTrue(payload["has_update"])
if __name__ == "__main__":
unittest.main()

38
tests/test_updater_gui.py Normal file
View File

@@ -0,0 +1,38 @@
import importlib.util
import tempfile
import unittest
from pathlib import Path
MODULE_PATH = Path("updater_gui.py")
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
updater_gui = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(updater_gui)
class UpdaterGuiTests(unittest.TestCase):
def test_read_version_marker(self):
with tempfile.TemporaryDirectory() as tmp_dir:
marker = Path(tmp_dir) / "version.txt"
marker.write_text("2.0.1\n", encoding="utf-8")
value = updater_gui._read_version_marker(tmp_dir)
self.assertEqual(value, "2.0.1")
def test_mirror_tree_skips_selected_file(self):
with tempfile.TemporaryDirectory() as src_tmp, tempfile.TemporaryDirectory() as dst_tmp:
src = Path(src_tmp)
dst = Path(dst_tmp)
(src / "keep.txt").write_text("ok", encoding="utf-8")
(src / "skip.bin").write_text("x", encoding="utf-8")
(src / "sub").mkdir()
(src / "sub" / "nested.txt").write_text("nested", encoding="utf-8")
updater_gui._mirror_tree(str(src), str(dst), skip_names={"skip.bin"})
self.assertTrue((dst / "keep.txt").exists())
self.assertTrue((dst / "sub" / "nested.txt").exists())
self.assertFalse((dst / "skip.bin").exists())
if __name__ == "__main__":
unittest.main()

276
updater_gui.py Normal file
View File

@@ -0,0 +1,276 @@
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import time
from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl
from PySide6.QtGui import QDesktopServices
from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout
def _write_log(log_path, message):
try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as f:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{ts}] {message.rstrip()}\n")
except Exception:
pass
def _is_pid_running(pid):
if pid <= 0:
return False
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
return str(pid) in (completed.stdout or "")
except Exception:
return False
def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5):
last_error = None
for _ in range(max(1, retries)):
try:
os.makedirs(os.path.dirname(target_file), exist_ok=True)
shutil.copy2(source_file, target_file)
return
except Exception as exc:
last_error = exc
time.sleep(delay)
raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}")
def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5):
skip_set = {name.lower() for name in (skip_names or [])}
os.makedirs(dst_dir, exist_ok=True)
for root, dirs, files in os.walk(src_dir):
rel = os.path.relpath(root, src_dir)
target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.lower() in skip_set:
continue
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
_copy_file_with_retries(source_file, target_file, retries=retries, delay=delay)
def _read_version_marker(base_dir):
marker_path = os.path.join(base_dir, "version.txt")
if not os.path.exists(marker_path):
return ""
try:
with open(marker_path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
class UpdateWorker(QObject):
status = Signal(int, str)
failed = Signal(str)
done = Signal()
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.app_dir = app_dir
self.source_dir = source_dir
self.exe_name = exe_name
self.target_pid = int(target_pid or 0)
self.version = version or ""
self.work_dir = work_dir or ""
self.log_path = os.path.join(app_dir, "update_error.log")
def _start_app(self):
app_exe = os.path.join(self.app_dir, self.exe_name)
if not os.path.exists(app_exe):
raise RuntimeError(f"Не найден файл приложения: {app_exe}")
creation_flags = 0
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags)
def run(self):
backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}")
skip_names = {"anabasisupdater.exe"}
prev_version = _read_version_marker(self.app_dir)
source_version = _read_version_marker(self.source_dir)
expected_version = (self.version or "").strip()
try:
self.status.emit(1, "Ожидание завершения приложения...")
wait_loops = 0
while _is_pid_running(self.target_pid):
time.sleep(1)
wait_loops += 1
if wait_loops >= 180:
self.status.emit(1, "Принудительное завершение зависшего процесса...")
subprocess.run(
["taskkill", "/PID", str(self.target_pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
check=False,
)
time.sleep(2)
if _is_pid_running(self.target_pid):
raise RuntimeError(f"Процесс {self.target_pid} не завершился.")
break
self.status.emit(2, "Проверка содержимого обновления...")
source_app_exe = os.path.join(self.source_dir, self.exe_name)
if not os.path.exists(source_app_exe):
raise RuntimeError(f"В обновлении отсутствует {self.exe_name}")
if expected_version and source_version and source_version != expected_version:
raise RuntimeError(
f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})."
)
self.status.emit(3, "Создание резервной копии...")
_mirror_tree(self.app_dir, backup_dir, skip_names=skip_names)
self.status.emit(4, "Применение обновления...")
_mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6)
self.status.emit(5, "Проверка установленной версии...")
installed_version = _read_version_marker(self.app_dir)
if expected_version and installed_version and installed_version != expected_version:
raise RuntimeError(
f"После обновления версия {installed_version}, ожидалась {expected_version}."
)
if expected_version and prev_version and prev_version == expected_version:
_write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.")
self.status.emit(6, "Запуск обновленного приложения...")
self._start_app()
_write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}")
self.status.emit(7, "Очистка временных файлов...")
try:
shutil.rmtree(backup_dir, ignore_errors=True)
if self.work_dir and os.path.isdir(self.work_dir):
shutil.rmtree(self.work_dir, ignore_errors=True)
except Exception:
pass
self.done.emit()
except Exception as exc:
_write_log(self.log_path, f"Update failed: {exc}")
try:
self.status.emit(6, "Восстановление из резервной копии...")
if os.path.isdir(backup_dir):
_mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5)
_write_log(self.log_path, "Rollback completed.")
try:
self._start_app()
_write_log(self.log_path, "Restored app started after rollback.")
except Exception as start_exc:
_write_log(self.log_path, f"Failed to start app after rollback: {start_exc}")
except Exception as rollback_exc:
_write_log(self.log_path, f"Rollback failed: {rollback_exc}")
self.failed.emit(str(exc))
class UpdaterWindow(QWidget):
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.setWindowTitle("Anabasis Updater")
self.setMinimumWidth(480)
self.log_path = os.path.join(app_dir, "update_error.log")
self.label = QLabel("Подготовка обновления...")
self.label.setWordWrap(True)
self.progress = QProgressBar()
self.progress.setRange(0, 7)
self.progress.setValue(0)
self.open_log_btn = QPushButton("Открыть лог")
self.open_log_btn.setEnabled(False)
self.open_log_btn.clicked.connect(self.open_log)
self.close_btn = QPushButton("Закрыть")
self.close_btn.setEnabled(False)
self.close_btn.clicked.connect(self.close)
layout = QVBoxLayout(self)
layout.addWidget(self.label)
layout.addWidget(self.progress)
actions = QHBoxLayout()
actions.addStretch(1)
actions.addWidget(self.open_log_btn)
actions.addWidget(self.close_btn)
layout.addLayout(actions)
self.thread = QThread(self)
self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir)
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.worker.status.connect(self.on_status)
self.worker.failed.connect(self.on_failed)
self.worker.done.connect(self.on_done)
self.worker.done.connect(self.thread.quit)
self.worker.failed.connect(self.thread.quit)
self.thread.start()
def on_status(self, step, text):
self.label.setText(text)
self.progress.setValue(max(0, min(7, int(step))))
def on_done(self):
self.label.setText("Обновление успешно применено. Приложение запущено.")
self.progress.setValue(7)
self.open_log_btn.setEnabled(True)
QTimer.singleShot(900, self.close)
def on_failed(self, error_text):
self.label.setText(
"Не удалось применить обновление.\n"
f"Причина: {error_text}\n"
"Подробности сохранены в update_error.log."
)
self.open_log_btn.setEnabled(True)
self.close_btn.setEnabled(True)
def open_log(self):
if os.path.exists(self.log_path):
QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--app-dir", required=True)
parser.add_argument("--source-dir", required=True)
parser.add_argument("--exe-name", required=True)
parser.add_argument("--target-pid", required=True)
parser.add_argument("--version", default="")
parser.add_argument("--work-dir", default="")
return parser.parse_args()
def main():
args = parse_args()
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = UpdaterWindow(
app_dir=args.app_dir,
source_dir=args.source_dir,
exe_name=args.exe_name,
target_pid=args.target_pid,
version=args.version,
work_dir=args.work_dir,
)
window.show()
return app.exec()
if __name__ == "__main__":
sys.exit(main())