18 Commits

Author SHA1 Message Date
90b3b4fc9d chore(release): bump version to 1.6.2
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m57s
2026-02-15 18:46:21 +03:00
190e67c931 feat(update): stage 2 sha256 verification for auto-update
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 16s
2026-02-15 18:42:37 +03:00
2eb4c52b81 ci(release): set release title to Anabasis Manager <version>
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m16s
2026-02-15 17:36:02 +03:00
3d73a504d2 ci(release): use v-prefixed semantic tags 2026-02-15 17:35:37 +03:00
1524271be7 ci(release): write outputs/env as utf8 no bom for powershell
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m5s
2026-02-15 17:31:37 +03:00
561cf43e09 ci(release): handle missing tag exit code in powershell step
All checks were successful
Desktop CI / tests (push) Successful in 14s
Desktop Release / release (push) Successful in 24s
2026-02-15 17:30:22 +03:00
e8930f7550 ci(release): switch windows release steps from bash to powershell
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 1m14s
2026-02-15 17:28:29 +03:00
c8da0f9191 ci(release): use preinstalled Python on self-hosted windows runner
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 14s
2026-02-15 17:20:42 +03:00
37ce500fd2 ci(release): remove node bootstrap step, require system node
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 25s
2026-02-15 17:19:24 +03:00
098a84e5bd ci(release): restore powershell node bootstrap
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 2m42s
2026-02-15 16:42:08 +03:00
5aa17c1a84 ci(release): avoid powershell policy by using cmd node bootstrap
All checks were successful
Desktop CI / tests (push) Successful in 13s
2026-02-15 16:38:40 +03:00
dde14f3714 ci(release): bootstrap node before js-based actions
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 5s
2026-02-15 16:37:42 +03:00
fa5d4c6993 ci(release): use windows runner label
Some checks failed
Desktop CI / tests (push) Successful in 20s
Desktop Release / release (push) Failing after 1m21s
2026-02-15 16:35:21 +03:00
f9e0225243 ci: add Gitea CI and desktop release workflow
Some checks failed
Desktop CI / tests (push) Successful in 1m29s
Desktop Release / release (push) Has been cancelled
2026-02-15 15:27:21 +03:00
c42b23bea5 chore(release): bump version to 1.6.1 2026-02-15 15:25:21 +03:00
b52cdea425 feat(update): stage 1 auto-update (one-click) 2026-02-15 15:24:45 +03:00
b7fad78a71 refactor(release): bump to 1.6.0 and unify version source 2026-02-15 15:17:30 +03:00
e590a6cde0 release: 1.5.1 fixes, relogin and updater 2026-02-15 15:13:13 +03:00
7 changed files with 923 additions and 84 deletions

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: Desktop CI
on:
push:
branches:
- master
pull_request:
jobs:
tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: https://git.daemonlord.ru/actions/setup-python@v5
with:
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py
- name: Run tests
run: |
python -m unittest tests/test_auth_relogin_smoke.py

View File

@@ -0,0 +1,133 @@
name: Desktop Release
on:
push:
branches:
- master
jobs:
release:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract app version
id: extract_version
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
Write-Host "Detected version: $version"
- name: Stop if version already released
id: stop
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
git show-ref --tags --quiet --verify "refs/tags/$tag"
$tagExists = ($LASTEXITCODE -eq 0)
$global:LASTEXITCODE = 0
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
if ($tagExists) {
Write-Host "Version $tag already released, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Version $tag not released yet, continuing workflow..."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
}
exit 0
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py
python -m unittest tests/test_auth_relogin_smoke.py
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Ensure archive exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archivePath = "dist/AnabasisManager-$version.zip"
if (-not (Test-Path $archivePath)) {
throw "Archive not found: $archivePath"
}
- name: Generate SHA256 checksum
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archiveName = "AnabasisManager-$version.zip"
$archivePath = "dist/$archiveName"
$checksumPath = "dist/$archiveName.sha256"
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
Write-Host "Checksum created: $checksumPath"
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
git tag "$tag"
git push origin "$tag"
- name: Create Gitea Release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: v${{ steps.extract_version.outputs.version }}
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
body: |
Desktop release v${{ steps.extract_version.outputs.version }}
files: |
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256

9
.gitignore vendored
View File

@@ -3,4 +3,11 @@
/build_cx/
/build_linux/
/build_win32/
/build_darwin/
/build_darwin/
.idea/
__pycache__/
*.py[cod]
tests/__pycache__/
build/
dist/
AnabasisManager.spec

1
app_version.py Normal file
View File

@@ -0,0 +1 @@
APP_VERSION = "1.6.2"

View File

@@ -2,10 +2,11 @@ import os
import shutil
import subprocess
import sys
from app_version import APP_VERSION
# --- Конфигурация ---
APP_NAME = "AnabasisManager"
VERSION = "1.5" # Ваша версия
VERSION = APP_VERSION # Единая версия приложения
MAIN_SCRIPT = "main.py"
ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME)

774
main.py
View File

@@ -2,21 +2,31 @@ import sys
import base64
import ctypes
import shutil
import subprocess
from vk_api import VkApi
import json
import time
import auth_webview
import os
import re
import hashlib
import subprocess
import threading
import tempfile
import urllib.error
import urllib.request
import zipfile
from app_version import APP_VERSION
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
from PySide6.QtGui import QIcon, QAction
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject
from PySide6.QtGui import QIcon, QAction, QDesktopServices
from urllib.parse import urlparse, parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess
from ctypes import wintypes
# --- Управление токенами и настройками ---
@@ -27,12 +37,81 @@ CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
# Legacy owner/repo format for GitHub-only fallback.
UPDATE_REPOSITORY = ""
# Full repository URL is preferred (supports GitHub/Gitea).
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_REQUEST_TIMEOUT = 8
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _detect_update_repository_url():
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL)
if configured_url:
return configured_url
configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY)
if configured_repo:
return configured_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if match:
remote = match.group(1).strip()
if remote.startswith("git@"):
# git@host:owner/repo(.git)
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
return ""
_crypt32 = None
_kernel32 = None
if os.name == "nt":
@@ -176,6 +255,149 @@ def load_token():
print(f"Ошибка загрузки: {e}")
return None, None
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
if parsed.netloc.lower().endswith("github.com"):
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response:
release_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
@@ -211,11 +433,22 @@ class VkChatManager(QMainWindow):
self.warehouse_chat_checkboxes = []
self.coffee_chat_checkboxes = []
self.other_chat_checkboxes = []
self.vk_service = VkService()
self.vk_session = None
self.vk = None
self.user_ids_to_process = []
self._busy = False
self.suppress_resolve = False
self.auth_process = None
self.auth_output_path = None
self._auth_process_error_text = None
self._auth_ui_busy = False
self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0
self.update_repository_url = _detect_update_repository_url()
self.update_checker = None
self.update_thread = None
self._update_check_silent = False
self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True)
@@ -227,6 +460,7 @@ class VkChatManager(QMainWindow):
self.init_ui()
self.load_saved_token_on_startup()
self.setup_token_timer()
QTimer.singleShot(1800, lambda: self.check_for_updates(silent_no_updates=True))
def init_ui(self):
central_widget = QWidget()
@@ -260,6 +494,11 @@ class VkChatManager(QMainWindow):
self.auth_btn = QPushButton("Авторизоваться через VK")
self.auth_btn.clicked.connect(self.start_auth)
layout.addWidget(self.auth_btn)
self.auth_progress = QProgressBar()
self.auth_progress.setRange(0, 0)
self.auth_progress.setTextVisible(False)
self.auth_progress.hide()
layout.addWidget(self.auth_progress)
self.chat_tabs = QTabWidget()
self.chat_tabs.hide()
@@ -373,7 +612,8 @@ class VkChatManager(QMainWindow):
else:
failed_links.append(link)
except VkApiError as e:
self._log_error("resolveScreenName", e)
if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"):
return
failed_links.append(f"{link} ({self._format_vk_error(e)})")
except Exception:
failed_links.append(link)
@@ -405,11 +645,19 @@ class VkChatManager(QMainWindow):
make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах")
make_admin_action.triggered.connect(self.set_user_admin)
tools_menu.addAction(make_admin_action)
self.make_admin_action = make_admin_action
check_updates_action = QAction("Проверить обновления", self)
check_updates_action.setStatusTip("Проверить наличие новой версии приложения")
check_updates_action.triggered.connect(self.check_for_updates)
tools_menu.addAction(check_updates_action)
self.check_updates_action = check_updates_action
logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear)
tools_menu.addAction(logout_action)
self.logout_action = logout_action
def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists.
@@ -435,6 +683,243 @@ class VkChatManager(QMainWindow):
return tab_content_widget
def _set_update_action_state(self, in_progress):
if hasattr(self, "check_updates_action"):
self.check_updates_action.setEnabled(not in_progress)
def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive():
return
self._update_check_silent = silent_no_updates
self._set_update_action_state(True)
self.status_label.setText("Статус: проверка обновлений...")
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION)
self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
self.update_thread.start()
def _on_update_check_finished(self, result):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
if result.get("has_update"):
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
self.status_label.setText(f"Статус: доступно обновление {latest_version}")
message_box = QMessageBox(self)
message_box.setIcon(QMessageBox.Information)
message_box.setWindowTitle("Доступно обновление")
message_box.setText(
f"Текущая версия: {result.get('current_version')}\n"
f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?"
)
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole)
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole)
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole)
message_box.setDefaultButton(update_now_button)
message_box.exec()
clicked = message_box.clickedButton()
download_url = result.get("download_url")
checksum_url = result.get("checksum_url")
download_name = result.get("download_name")
release_url = result.get("release_url")
if clicked is update_now_button and download_url:
if not self._start_auto_update(download_url, latest_version, checksum_url, download_name):
if release_url:
QDesktopServices.openUrl(QUrl(release_url))
return
if clicked is download_button and download_url:
QDesktopServices.openUrl(QUrl(download_url))
elif clicked in (download_button, releases_button) and release_url:
QDesktopServices.openUrl(QUrl(release_url))
elif clicked not in (cancel_button,):
self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN")
return
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}).")
if not self._update_check_silent:
QMessageBox.information(self, "Обновления", "Установлена актуальная версия.")
def _on_update_check_failed(self, error_text):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
self._log_event("update_check_failed", error_text, level="WARN")
if not self.update_repository_url:
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
if not self._update_check_silent:
QMessageBox.warning(
self,
"Обновления не настроены",
"Не задан URL репозитория для обновлений.\n"
"Укажите UPDATE_REPOSITORY_URL в main.py или переменную окружения "
"ANABASIS_UPDATE_URL (например: https://git.daemonlord.ru/owner/repo).",
)
return
self.status_label.setText("Статус: не удалось проверить обновления.")
if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text)
def _download_update_archive(self, download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
def _download_update_text(self, url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def _sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def _extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
def _verify_update_checksum(self, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = self._download_update_text(checksum_url)
expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = self._sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
def _locate_extracted_root(self, extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
def _build_update_script(self, app_dir, source_dir, exe_name):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal",
f"set APP_DIR={app_dir}",
f"set SRC_DIR={source_dir}",
f"set EXE_NAME={exe_name}",
"timeout /t 2 /nobreak >nul",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:3 /W:1 >nul",
"set RC=%ERRORLEVEL%",
"if %RC% GEQ 8 goto :copy_error",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"exit /b 0",
":copy_error",
"echo Auto-update failed with code %RC% > \"%APP_DIR%\\update_error.log\"",
"exit /b %RC%",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
if os.name != "nt":
QMessageBox.information(
self,
"Автообновление",
"Автообновление пока поддерживается только в Windows-сборке.",
)
return False
if not getattr(sys, "frozen", False):
QMessageBox.information(
self,
"Автообновление",
"Автообновление доступно в собранной версии приложения (.exe).",
)
return False
if not download_url:
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
return False
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
self._set_busy(True)
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
try:
self._download_update_archive(download_url, zip_path)
self._verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = self._locate_extracted_root(unpack_dir)
app_exe = sys.executable
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
script_path = self._build_update_script(app_dir, source_dir, exe_name)
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information(
self,
"Обновление запущено",
"Обновление скачано. Приложение будет перезапущено.",
)
QTimer.singleShot(150, QApplication.instance().quit)
return True
except Exception as e:
self._log_event("auto_update_failed", str(e), level="ERROR")
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
return False
finally:
self._set_busy(False)
def setup_token_timer(self):
self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -467,7 +952,7 @@ class VkChatManager(QMainWindow):
self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с")
def set_ui_state(self, authorized):
self.auth_btn.setEnabled(not authorized)
self.auth_btn.setEnabled((not authorized) and (not self._auth_ui_busy))
for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn,
self.vk_url_input, self.multi_link_btn,
self.visible_messages_checkbox]:
@@ -489,6 +974,19 @@ class VkChatManager(QMainWindow):
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self._clear_chat_tabs()
if hasattr(self, "make_admin_action"):
self.make_admin_action.setEnabled(authorized and (not self._auth_ui_busy))
if hasattr(self, "logout_action"):
self.logout_action.setEnabled(not self._auth_ui_busy)
def _set_auth_ui_state(self, in_progress):
self._auth_ui_busy = in_progress
self.auth_progress.setVisible(in_progress)
if hasattr(self, "logout_action"):
self.logout_action.setEnabled(not in_progress)
if hasattr(self, "make_admin_action"):
self.make_admin_action.setEnabled(not in_progress and self.token is not None)
self.auth_btn.setEnabled((self.token is None) and (not in_progress))
def _set_busy(self, busy, status_text=None):
if status_text:
@@ -513,17 +1011,22 @@ class VkChatManager(QMainWindow):
def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True)
def _log_error(self, context, exc):
def _log(self, level, context, message):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
self._rotate_log_if_needed()
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
message = self._format_vk_error(exc)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {context}: {message}\n")
f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
except Exception:
pass
def _log_error(self, context, exc):
self._log("ERROR", context, self._format_vk_error(exc))
def _log_event(self, context, message, level="INFO"):
self._log(level, context, message)
def _rotate_log_if_needed(self):
try:
if not os.path.exists(LOG_FILE):
@@ -579,30 +1082,11 @@ class VkChatManager(QMainWindow):
if confirm != QMessageBox.Yes:
return
self.token = None
self.token_expiration_time = None
self.vk_session = None
self.vk = None
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self.token_input.clear()
self.token_timer_label.setText("Срок действия токена: Н")
self.status_label.setText("Статус: не авторизован")
if self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self._clear_chat_tabs()
self.set_ui_state(False)
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception as e:
print(f"Ошибка удаления токена: {e}")
self._clear_auth_state(stop_timer=True, remove_token_file=True)
try:
self._try_remove_web_cache()
except Exception as e:
print(f"Ошибка удаления кэша: {e}")
self._log_event("logout_and_clear", f"Cache cleanup failed: {e}", level="WARN")
def _cleanup_cache_if_needed(self):
if os.path.exists(CACHE_CLEANUP_MARKER):
@@ -641,13 +1125,88 @@ class VkChatManager(QMainWindow):
def set_all_checkboxes_on_current_tab(self, checked):
current_index = self.chat_tabs.currentIndex()
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.retail_warehouse_checkboxes, self.retail_coffee_checkboxes, self.other_chat_checkboxes]
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]
if 0 <= current_index < len(checkbox_lists):
for checkbox in checkbox_lists[current_index]:
checkbox.setChecked(checked)
def start_auth(self):
self.status_label.setText("Статус: ожидание авторизации...")
def _build_auth_command(self, auth_url, output_path):
return self.vk_service.build_auth_command(auth_url, output_path)
def _on_auth_process_error(self, process_error):
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
# For failed starts Qt may not emit finished(), so release UI here.
if self.auth_process and self.auth_process.state() == QProcess.NotRunning:
output_path = self.auth_output_path
self.auth_output_path = None
self.auth_process = None
self._set_auth_ui_state(False)
self.status_label.setText(f"Статус: {self._auth_process_error_text}")
self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR")
self._auth_process_error_text = None
self._auth_relogin_in_progress = False
self.set_ui_state(self.token is not None)
try:
if output_path and os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
def _on_auth_process_finished(self, exit_code, _exit_status):
output_path = self.auth_output_path
self.auth_output_path = None
self.auth_process = None
self._set_auth_ui_state(False)
if self._auth_process_error_text:
self.status_label.setText(f"Статус: {self._auth_process_error_text}")
self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR")
self._auth_process_error_text = None
self._auth_relogin_in_progress = False
self.set_ui_state(self.token is not None)
return
if exit_code != 0:
self.status_label.setText(f"Статус: авторизация не удалась (код {exit_code}).")
self._log_event("auth_process_finished", f"exit_code={exit_code}", level="WARN")
self._auth_relogin_in_progress = False
self.set_ui_state(self.token is not None)
return
token = None
expires_in = 0
if output_path and os.path.exists(output_path):
try:
with open(output_path, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
expires_in = data.get("expires_in", 0)
except Exception as e:
self._log_event("auth_result_parse", f"Ошибка чтения результата авторизации: {e}", level="ERROR")
finally:
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
else:
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
self._auth_relogin_in_progress = False
self.handle_new_auth_token(token, expires_in)
def start_auth(self, keep_status_text=False):
if self.auth_process and self.auth_process.state() != QProcess.NotRunning:
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
return
if keep_status_text and hasattr(self, "_relogin_status_text"):
status_text = self._relogin_status_text
self._relogin_status_text = None
else:
status_text = "Статус: ожидание авторизации..."
self.status_label.setText(status_text)
auth_url = (
"https://oauth.vk.com/authorize?"
"client_id=2685278&"
@@ -664,38 +1223,22 @@ class VkChatManager(QMainWindow):
except Exception:
pass
cmd = [sys.executable, "--auth", auth_url, output_path]
try:
subprocess.check_call(cmd)
except Exception as e:
self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}")
return
program, args = self._build_auth_command(auth_url, output_path)
self.auth_output_path = output_path
self._auth_process_error_text = None
if not os.path.exists(output_path):
self.status_label.setText("Статус: авторизация не удалась")
return
try:
with open(output_path, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
expires_in = data.get("expires_in", 0)
except Exception:
token = None
expires_in = 0
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
self.handle_new_auth_token(token, expires_in)
process = QProcess(self)
process.finished.connect(self._on_auth_process_finished)
process.errorOccurred.connect(self._on_auth_process_error)
self.auth_process = process
self._set_auth_ui_state(True)
process.start(program, args)
def handle_new_auth_token(self, token, expires_in):
if not token:
self.status_label.setText("Статус: Авторизация не удалась")
self.set_ui_state(False)
self._auth_relogin_in_progress = False
return
self.token = token
@@ -704,9 +1247,11 @@ class VkChatManager(QMainWindow):
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован")
self.vk_session = VkApi(token=self.token)
self.vk = self.vk_session.get_api()
self.vk_service.set_token(self.token)
self.vk_session = self.vk_service.session
self.vk = self.vk_service.api
self.set_ui_state(True)
self._auth_relogin_in_progress = False
self.load_chats()
def handle_auth_token_on_load(self, token, expiration_time):
@@ -715,9 +1260,11 @@ class VkChatManager(QMainWindow):
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован (токен загружен)")
self.vk_session = VkApi(token=self.token)
self.vk = self.vk_session.get_api()
self.vk_service.set_token(self.token)
self.vk_session = self.vk_service.session
self.vk = self.vk_service.api
self.set_ui_state(True)
self._auth_relogin_in_progress = False
self.load_chats()
def _clear_chat_tabs(self):
@@ -729,24 +1276,80 @@ class VkChatManager(QMainWindow):
chk_list.clear()
def _vk_error_code(self, exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
return self.vk_service.vk_error_code(exc)
def _is_auth_token_error(self, exc):
message = self._format_vk_error(exc).lower()
return self.vk_service.is_auth_error(exc, message)
def _clear_auth_state(self, stop_timer=False, remove_token_file=True):
self.token = None
self.token_expiration_time = None
self.vk_service.clear()
self.vk_session = None
self.vk = None
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self.token_input.clear()
if stop_timer and self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self.token_timer_label.setText("Срок действия токена: Н")
self.status_label.setText("Статус: не авторизован")
self._clear_chat_tabs()
self.set_ui_state(False)
if remove_token_file:
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception:
pass
def _force_relogin(self, exc, action_name):
now = time.monotonic()
if self._auth_relogin_in_progress:
self._log_event("force_relogin_skip", f"already_in_progress action={action_name}", level="WARN")
return
elapsed = now - self._last_auth_relogin_ts
if elapsed < AUTH_RELOGIN_BACKOFF_SECONDS:
wait_seconds = int(AUTH_RELOGIN_BACKOFF_SECONDS - elapsed) + 1
self.status_label.setText(f"Статус: повторная авторизация через {wait_seconds} сек.")
self._log_event("force_relogin_backoff", f"action={action_name}; wait={wait_seconds}s", level="WARN")
return
self._auth_relogin_in_progress = True
self._last_auth_relogin_ts = now
error_code = self._vk_error_code(exc)
self._log_event(
"force_relogin",
f"action={action_name}; code={error_code}; message={self._format_vk_error(exc)}",
level="WARN",
)
self._clear_auth_state()
self._relogin_status_text = "Статус: Токен отозван VK, выполните повторный вход."
QMessageBox.warning(
self,
"Требуется повторная авторизация",
f"Во время {action_name} получена ошибка авторизации:\n"
f"{self._format_vk_error(exc)}\n\n"
"Сейчас откроется окно авторизации VK."
)
self.start_auth(keep_status_text=True)
def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):
self._log_error(context, exc)
if self._is_auth_token_error(exc):
self._force_relogin(exc, action_name or context)
return True
if ui_message_prefix:
QMessageBox.critical(self, "Ошибка", f"{ui_message_prefix}: {self._format_vk_error(exc)}")
if disable_ui:
self.set_ui_state(False)
return False
def _vk_call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
code = self._vk_error_code(e)
if code not in (6, 9, 10) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if code == 9:
delay = max(delay, 1.0)
time.sleep(delay)
return self.vk_service.call_with_retry(func, *args, **kwargs)
def load_chats(self):
self._clear_chat_tabs()
@@ -814,7 +1417,12 @@ class VkChatManager(QMainWindow):
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
self._log_error("load_chats", e)
if self._handle_vk_api_error(
"load_chats",
e,
action_name="загрузки чатов",
):
return
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
@@ -913,7 +1521,8 @@ class VkChatManager(QMainWindow):
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
self._log_error("execute_user_action", e)
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
@@ -992,7 +1601,8 @@ class VkChatManager(QMainWindow):
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
self._log_error("set_user_admin", e)
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1

View File

@@ -0,0 +1,52 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.source = Path("main.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source)
self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.source)
self.assertIn("process.start(program, args)", self.source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source)
self.assertIn("if self._auth_relogin_in_progress:", self.source)
self.assertIn("force_relogin_backoff", self.source)
self.assertIn("force_relogin", self.source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source)
self.assertIn("self._force_relogin(exc, action_name or context)", self.source)
self.assertIn('"load_chats",', self.source)
self.assertIn('"execute_user_action",', self.source)
self.assertIn('"set_user_admin",', self.source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.source)
self.assertIn("self.coffee_chat_checkboxes", self.source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.source)
self.assertNotIn("self.retail_coffee_checkboxes", self.source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.source)
self.assertIn("UPDATE_REPOSITORY = ", self.source)
self.assertIn('QAction("Проверить обновления", self)', self.source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source)
self.assertIn("class UpdateChecker(QObject):", self.source)
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.source)
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.source)
if __name__ == "__main__":
unittest.main()