35 Commits
v1.5.1 ... dev

Author SHA1 Message Date
df3a4c49c5 fix(build): use absolute icon path for updater pyinstaller
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 2m44s
- pass absolute icon path to PyInstaller to avoid missing icon in updater spec dir

- remove stale updater spec and use --clean before updater build
2026-02-15 21:51:36 +03:00
8d4bc10cb7 feat(updater): stage3 resilient gui update flow
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added retry-based file copy, rollback restart, and version marker validation in updater GUI

- added build step to write dist/version.txt for post-update validation

- added unit tests for updater helpers
2026-02-15 21:46:36 +03:00
a6cee33cf6 feat: improve updater flow and release channels
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added dedicated GUI updater executable and integrated launch path from main app

- added stable/beta update channel selection with persisted settings and checker support

- expanded CI/release validation to include updater and full test discovery
2026-02-15 21:41:18 +03:00
b30437faef ci(dev): add automated prerelease workflow
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 1m52s
- publish dev prereleases on each push to dev

- use tag format vX.Y.Z-<short_commit>

- upload versioned zip and checksum assets
2026-02-15 21:34:28 +03:00
44deba1382 chore(release): bump version to 2.0.0 2026-02-15 21:17:22 +03:00
eda8d43b9c refactor(ui): simplify instructions and stabilize About dialog
- remove duplicated inline instruction text

- switch About dialog to explicit QDialog with clickable link
2026-02-15 21:17:21 +03:00
4e6502bab7 Merge branch 'master' into dev
All checks were successful
Desktop CI / tests (pull_request) Successful in 11s
2026-02-15 20:51:22 +03:00
89237590c7 chore(release): bump version to 1.7.1
All checks were successful
Desktop CI / tests (pull_request) Successful in 12s
2026-02-15 20:50:36 +03:00
aca2bdfa85 fix(imports): restore shutil in main module
- fix unresolved reference for cache cleanup path
2026-02-15 20:48:39 +03:00
9d40f0017e refactor(ui): cleanup legacy code and add About dialog
- remove duplicate legacy implementations from main window

- add Help -> About with clickable repository link
2026-02-15 20:47:41 +03:00
798eacbf9a merge: release 1.7.0
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 1m49s
2026-02-15 20:38:32 +03:00
a9a394cf7d fix(update): improve updater script reliability for 1.7.0
- quote env vars in cmd script for paths with spaces

- add update_error.log diagnostics and timeout while waiting for app exit

- bump APP_VERSION to 1.7.0 and update updater tests
2026-02-15 20:38:24 +03:00
e1e2f8f0e8 refactor: вынес сервисы и ui-компоненты
- вынес token/chat/update логику в services

- вынес диалог и текст инструкции в ui

- добавил и обновил тесты для нового слоя
2026-02-15 20:32:36 +03:00
4d84d2ebe5 refactor(core): split vk and update services into modules 2026-02-15 20:04:21 +03:00
02350cfca1 chore(release): bump version to 1.6.4
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 1m51s
2026-02-15 19:44:05 +03:00
68fa841857 feat(update): stage 3 rollback on failed apply/start 2026-02-15 19:43:15 +03:00
bca9007463 chore(release): bump version to 1.6.3
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m54s
2026-02-15 18:52:06 +03:00
52b1301982 fix(update): wait for app exit before file replacement
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 13s
2026-02-15 18:51:00 +03:00
90b3b4fc9d chore(release): bump version to 1.6.2
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m57s
2026-02-15 18:46:21 +03:00
190e67c931 feat(update): stage 2 sha256 verification for auto-update
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 16s
2026-02-15 18:42:37 +03:00
2eb4c52b81 ci(release): set release title to Anabasis Manager <version>
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m16s
2026-02-15 17:36:02 +03:00
3d73a504d2 ci(release): use v-prefixed semantic tags 2026-02-15 17:35:37 +03:00
1524271be7 ci(release): write outputs/env as utf8 no bom for powershell
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m5s
2026-02-15 17:31:37 +03:00
561cf43e09 ci(release): handle missing tag exit code in powershell step
All checks were successful
Desktop CI / tests (push) Successful in 14s
Desktop Release / release (push) Successful in 24s
2026-02-15 17:30:22 +03:00
e8930f7550 ci(release): switch windows release steps from bash to powershell
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 1m14s
2026-02-15 17:28:29 +03:00
c8da0f9191 ci(release): use preinstalled Python on self-hosted windows runner
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 14s
2026-02-15 17:20:42 +03:00
37ce500fd2 ci(release): remove node bootstrap step, require system node
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 25s
2026-02-15 17:19:24 +03:00
098a84e5bd ci(release): restore powershell node bootstrap
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 2m42s
2026-02-15 16:42:08 +03:00
5aa17c1a84 ci(release): avoid powershell policy by using cmd node bootstrap
All checks were successful
Desktop CI / tests (push) Successful in 13s
2026-02-15 16:38:40 +03:00
dde14f3714 ci(release): bootstrap node before js-based actions
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 5s
2026-02-15 16:37:42 +03:00
fa5d4c6993 ci(release): use windows runner label
Some checks failed
Desktop CI / tests (push) Successful in 20s
Desktop Release / release (push) Failing after 1m21s
2026-02-15 16:35:21 +03:00
f9e0225243 ci: add Gitea CI and desktop release workflow
Some checks failed
Desktop CI / tests (push) Successful in 1m29s
Desktop Release / release (push) Has been cancelled
2026-02-15 15:27:21 +03:00
c42b23bea5 chore(release): bump version to 1.6.1 2026-02-15 15:25:21 +03:00
b52cdea425 feat(update): stage 1 auto-update (one-click) 2026-02-15 15:24:45 +03:00
b7fad78a71 refactor(release): bump to 1.6.0 and unify version source 2026-02-15 15:17:30 +03:00
22 changed files with 1977 additions and 560 deletions

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: Desktop CI
on:
push:
branches:
- master
pull_request:
jobs:
tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: https://git.daemonlord.ru/actions/setup-python@v5
with:
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
- name: Run tests
run: |
python -m unittest discover -s tests -p "test_*.py" -v

View File

@@ -0,0 +1,148 @@
name: Desktop Dev Pre-release
on:
push:
branches:
- dev
workflow_dispatch:
jobs:
prerelease:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract prerelease metadata
id: meta
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$commit = (git rev-parse --short HEAD).Trim()
$tag = "v$version-$commit"
$archive = "AnabasisManager-$version-$commit"
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "commit=$commit`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "tag=$tag`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "archive=$archive`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
Write-Host "Detected tag: $tag"
- name: Stop if prerelease already exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Pre-release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Pre-release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Prepare prerelease artifacts
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.meta.outputs.version }}"
$archiveBase = "${{ steps.meta.outputs.archive }}"
$srcZip = "dist/AnabasisManager-$version.zip"
$dstZip = "dist/$archiveBase.zip"
if (-not (Test-Path $srcZip)) {
throw "Archive not found: $srcZip"
}
Copy-Item -Path $srcZip -Destination $dstZip -Force
$hash = (Get-FileHash -Path $dstZip -Algorithm SHA256).Hash.ToLower()
"$hash $archiveBase.zip" | Set-Content -Path "dist/$archiveBase.zip.sha256" -Encoding UTF8
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Pre-release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: ${{ steps.meta.outputs.tag }}
name: Anabasis Manager ${{ steps.meta.outputs.version }} (dev ${{ steps.meta.outputs.commit }})
prerelease: true
body: |
Development pre-release for commit ${{ steps.meta.outputs.commit }}
Version base: ${{ steps.meta.outputs.version }}
files: |
dist/${{ steps.meta.outputs.archive }}.zip
dist/${{ steps.meta.outputs.archive }}.zip.sha256

View File

@@ -0,0 +1,133 @@
name: Desktop Release
on:
push:
branches:
- master
jobs:
release:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract app version
id: extract_version
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
Write-Host "Detected version: $version"
- name: Stop if version already released
id: stop
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
git show-ref --tags --quiet --verify "refs/tags/$tag"
$tagExists = ($LASTEXITCODE -eq 0)
$global:LASTEXITCODE = 0
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
if ($tagExists) {
Write-Host "Version $tag already released, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Version $tag not released yet, continuing workflow..."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
}
exit 0
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Ensure archive exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archivePath = "dist/AnabasisManager-$version.zip"
if (-not (Test-Path $archivePath)) {
throw "Archive not found: $archivePath"
}
- name: Generate SHA256 checksum
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archiveName = "AnabasisManager-$version.zip"
$archivePath = "dist/$archiveName"
$checksumPath = "dist/$archiveName.sha256"
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
Write-Host "Checksum created: $checksumPath"
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
git tag "$tag"
git push origin "$tag"
- name: Create Gitea Release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: v${{ steps.extract_version.outputs.version }}
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
body: |
Desktop release v${{ steps.extract_version.outputs.version }}
files: |
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256

1
.gitignore vendored
View File

@@ -10,3 +10,4 @@ __pycache__/
tests/__pycache__/ tests/__pycache__/
build/ build/
dist/ dist/
AnabasisManager.spec

1
app_version.py Normal file
View File

@@ -0,0 +1 @@
APP_VERSION = "2.0.0"

View File

@@ -2,15 +2,18 @@ import os
import shutil import shutil
import subprocess import subprocess
import sys import sys
from app_version import APP_VERSION
# --- Конфигурация --- # --- Конфигурация ---
APP_NAME = "AnabasisManager" APP_NAME = "AnabasisManager"
VERSION = "1.5.1" # Ваша версия UPDATER_NAME = "AnabasisUpdater"
VERSION = APP_VERSION # Единая версия приложения
MAIN_SCRIPT = "main.py" MAIN_SCRIPT = "main.py"
UPDATER_SCRIPT = "updater_gui.py"
ICON_PATH = "icon.ico" ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME) DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"} SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
REMOVE_LIST = [ REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll", "Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll", "Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
@@ -21,6 +24,18 @@ REMOVE_LIST = [
] ]
def write_version_marker():
marker_path = os.path.join(DIST_DIR, "version.txt")
try:
os.makedirs(DIST_DIR, exist_ok=True)
with open(marker_path, "w", encoding="utf-8") as f:
f.write(str(VERSION).strip() + "\n")
print(f"[OK] Обновлен маркер версии: {marker_path}")
except Exception as e:
print(f"[ERROR] Не удалось записать version.txt: {e}")
sys.exit(1)
def ensure_project_root(): def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)] missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing: if missing:
@@ -31,6 +46,8 @@ def ensure_project_root():
def run_build(): def run_build():
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---") print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
command = [ command = [
"pyinstaller", "pyinstaller",
@@ -41,8 +58,8 @@ def run_build():
"--exclude-module", "PySide6.QtWebEngineWidgets", "--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick", "--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}", f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "", f"--icon={icon_abs_path}" if has_icon else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "", f"--add-data={icon_abs_path}{os.pathsep}." if has_icon else "",
f"--add-data=auth_webview.py{os.pathsep}.", f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT MAIN_SCRIPT
] ]
@@ -57,6 +74,36 @@ def run_build():
sys.exit(1) sys.exit(1)
def run_updater_build():
print(f"\n--- 1.2 Сборка {UPDATER_NAME} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
updater_spec_dir = os.path.join("build", "updater_spec")
updater_spec_path = os.path.join(updater_spec_dir, f"{UPDATER_NAME}.spec")
if os.path.exists(updater_spec_path):
os.remove(updater_spec_path)
command = [
"pyinstaller",
"--noconfirm",
"--clean",
"--onefile",
"--windowed",
f"--name={UPDATER_NAME}",
"--distpath", DIST_DIR,
"--workpath", os.path.join("build", "updater"),
"--specpath", updater_spec_dir,
f"--icon={icon_abs_path}" if has_icon else "",
UPDATER_SCRIPT,
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print(f"[OK] {UPDATER_NAME} собран.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Ошибка при сборке {UPDATER_NAME}: {e}")
sys.exit(1)
def run_cleanup(): def run_cleanup():
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---") print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
@@ -98,7 +145,9 @@ if __name__ == "__main__":
shutil.rmtree(folder) shutil.rmtree(folder)
run_build() run_build()
run_updater_build()
run_cleanup() run_cleanup()
write_version_marker()
create_archive() create_archive()
print("\n" + "=" * 30) print("\n" + "=" * 30)

852
main.py

File diff suppressed because it is too large Load Diff

5
services/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .auto_update_service import AutoUpdateService
from .chat_actions import load_chat_conversations, resolve_user_ids
from .token_store import load_token, save_token
from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService

View File

@@ -0,0 +1,220 @@
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import urllib.request
import zipfile
class AutoUpdateService:
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
@staticmethod
def download_update_text(url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
@classmethod
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = cls.download_update_text(checksum_url)
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = cls.sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
@staticmethod
def locate_extracted_root(extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
@staticmethod
def build_update_script(app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal EnableExtensions",
f"set \"APP_DIR={app_dir}\"",
f"set \"SRC_DIR={source_dir}\"",
f"set \"EXE_NAME={exe_name}\"",
f"set \"TARGET_PID={target_pid}\"",
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
" exit /b 3",
")",
"set /a WAIT_LOOPS=0",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"",
" taskkill /PID %TARGET_PID% /T /F >nul 2>&1",
" timeout /t 2 /nobreak >nul",
" tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
" if %ERRORLEVEL% EQU 0 goto :pid_still_running",
" goto :wait_image_unlock",
" )",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
":wait_image_unlock",
"set /a IMG_LOOPS=0",
":check_image",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a IMG_LOOPS+=1",
" if %IMG_LOOPS% GEQ 60 goto :image_still_running",
" timeout /t 1 /nobreak >nul",
" goto :check_image",
")",
":backup",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"echo Update success >> \"%UPDATE_LOG%\"",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%",
":pid_still_running",
"echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"",
"exit /b 4",
":image_still_running",
"echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"",
"exit /b 5",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
@staticmethod
def launch_update_script(script_path, work_dir):
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
@staticmethod
def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""):
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe")
if not os.path.exists(updater_exe):
raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.")
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
[
updater_exe,
"--app-dir",
app_dir,
"--source-dir",
source_dir,
"--exe-name",
exe_name,
"--target-pid",
str(target_pid),
"--version",
str(version or ""),
"--work-dir",
str(work_dir or ""),
],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod
def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
cls.download_update_archive(download_url, zip_path)
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

46
services/chat_actions.py Normal file
View File

@@ -0,0 +1,46 @@
from urllib.parse import urlparse
def resolve_user_ids(vk_call_with_retry, vk_api, links):
resolved_ids = []
failed_links = []
for link in links:
try:
path = urlparse(link).path
screen_name = path.split("/")[-1] if path else ""
if not screen_name and len(path.split("/")) > 1:
screen_name = path.split("/")[-2]
if not screen_name:
failed_links.append((link, None))
continue
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get("type") == "user":
resolved_ids.append(resolved_object["object_id"])
else:
failed_links.append((link, None))
except Exception as e:
failed_links.append((link, e))
return resolved_ids, failed_links
def load_chat_conversations(vk_call_with_retry, vk_api):
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
return conversations

136
services/token_store.py Normal file
View File

@@ -0,0 +1,136 @@
import base64
import ctypes
import json
import os
import time
from ctypes import wintypes
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def save_token(token, token_file, app_data_dir, expires_in=0):
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(app_data_dir, exist_ok=True)
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception:
pass
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted,
}
with open(token_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return expiration_time
def load_token(token_file):
if not os.path.exists(token_file):
return None, None
with open(token_file, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception:
try:
os.remove(token_file)
except Exception:
pass
return None, None
expiration_time = data.get("expiration_time")
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
try:
os.remove(token_file)
except Exception:
pass
return None, None

213
services/update_service.py Normal file
View File

@@ -0,0 +1,213 @@
import json
import os
import re
import urllib.error
import urllib.request
from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _normalize_update_channel(value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _select_release_from_list(releases):
for item in releases:
if not isinstance(item, dict):
continue
if item.get("draft"):
continue
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
if not tag_name:
continue
return item
return None
def _extract_release_payload(release_data, repository_url, current_version):
parsed = urlparse(repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
releases_url = f"{base_url}/{repo_path}/releases"
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
return {
"repository_url": repository_url,
"latest_version": latest_version,
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, current_version),
}
def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
cfg_url = _sanitize_repo_url(configured_url)
if cfg_url:
return cfg_url
cfg_repo = _sanitize_repo_url(configured_repo)
if cfg_repo:
return cfg_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if not match:
return ""
remote = match.group(1).strip()
if remote.startswith("git@"):
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
self.request_timeout = request_timeout
self.channel = _normalize_update_channel(channel)
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
use_beta_channel = self.channel == "beta"
if parsed.netloc.lower().endswith("github.com"):
if use_beta_channel:
api_url = f"https://api.github.com/repos/{repo_path}/releases"
else:
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
if use_beta_channel:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
release_data = response_data
if use_beta_channel:
if not isinstance(response_data, list):
self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
return
release_data = _select_release_from_list(response_data)
if not release_data:
self.check_failed.emit("В канале beta не найдено доступных релизов.")
return
elif not isinstance(response_data, dict):
self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
return
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
payload["release_channel"] = self.channel
payload["releases_url"] = releases_url
self.check_finished.emit(payload)

59
services/vk_service.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import sys
import time
from vk_api import VkApi
from vk_api.exceptions import VkApiError
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path, entry_script_path=None):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
script_path = entry_script_path or os.path.abspath(__file__)
return sys.executable, [script_path, "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)

View File

@@ -5,44 +5,55 @@ from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase): class AuthReloginSmokeTests(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.source = Path("main.py").read_text(encoding="utf-8") cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self): def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source) self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source) self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source) self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self): def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.source) self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.source) self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source) self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source) self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self): def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source) self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.source) self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.source) self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.source) self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self): def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source) self.assertIn(
self.assertIn("self._force_relogin(exc, action_name or context)", self.source) "def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.assertIn('"load_chats",', self.source) self.main_source,
self.assertIn('"execute_user_action",', self.source) )
self.assertIn('"set_user_admin",', self.source) self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self): def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.source) self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.source) self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.source) self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.source) self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self): def test_update_check_actions_exist(self):
self.assertIn("APP_VERSION = ", self.source) self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("UPDATE_REPOSITORY = ", self.source) self.assertIn("from services import (", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.source) self.assertIn("UpdateChecker", self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source) self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.source) self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,51 @@
import hashlib
import importlib.util
import tempfile
import unittest
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"auto_update_service",
Path("services/auto_update_service.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
AutoUpdateService = _MODULE.AutoUpdateService
class AutoUpdateServiceTests(unittest.TestCase):
def test_extract_sha256_from_text(self):
digest = "a" * 64
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
extracted = AutoUpdateService.extract_sha256_from_text(
text,
"AnabasisManager-1.0.0-win.zip",
)
self.assertEqual(extracted, digest)
def test_sha256_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "payload.bin"
payload = b"anabasis"
path.write_bytes(payload)
expected = hashlib.sha256(payload).hexdigest()
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
def test_build_update_script_contains_core_vars(self):
script = AutoUpdateService.build_update_script(
app_dir=r"C:\Apps\AnabasisManager",
source_dir=r"C:\Temp\Extracted",
exe_name="AnabasisManager.exe",
target_pid=1234,
)
script_text = Path(script).read_text(encoding="utf-8")
self.assertIn("set \"APP_DIR=", script_text)
self.assertIn("set \"SRC_DIR=", script_text)
self.assertIn("set \"EXE_NAME=", script_text)
self.assertIn("set \"TARGET_PID=", script_text)
self.assertIn(":rollback", script_text)
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,65 @@
import unittest
import importlib.util
from types import SimpleNamespace
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"chat_actions",
Path("services/chat_actions.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_chat_conversations = _MODULE.load_chat_conversations
resolve_user_ids = _MODULE.resolve_user_ids
class ChatActionsTests(unittest.TestCase):
def test_resolve_user_ids_mixed_results(self):
mapping = {
"id1": {"type": "user", "object_id": 1},
"id2": {"type": "group", "object_id": 2},
}
def call_with_retry(func, **kwargs):
return func(**kwargs)
def resolve_screen_name(screen_name):
if screen_name == "boom":
raise RuntimeError("boom")
return mapping.get(screen_name)
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
links = [
"https://vk.com/id1",
"https://vk.com/id2",
"https://vk.com/boom",
"https://vk.com/",
]
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
self.assertEqual(resolved, [1])
self.assertEqual(len(failed), 3)
self.assertEqual(failed[0][0], "https://vk.com/id2")
self.assertIsNone(failed[0][1])
def test_load_chat_conversations_paginated(self):
pages = [
{"items": [{"id": 1}], "next_from": "page-2"},
{"items": [{"id": 2}]},
]
def get_conversations(**kwargs):
if kwargs.get("start_from") == "page-2":
return pages[1]
return pages[0]
def call_with_retry(func, **kwargs):
return func(**kwargs)
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
items = load_chat_conversations(call_with_retry, vk_api)
self.assertEqual(items, [{"id": 1}, {"id": 2}])
if __name__ == "__main__":
unittest.main()

53
tests/test_token_store.py Normal file
View File

@@ -0,0 +1,53 @@
import tempfile
import unittest
import importlib.util
from pathlib import Path
from unittest.mock import patch
_SPEC = importlib.util.spec_from_file_location(
"token_store",
Path("services/token_store.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_token = _MODULE.load_token
save_token = _MODULE.save_token
class TokenStoreTests(unittest.TestCase):
def test_save_and_load_non_expiring_token(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
expiration = save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=0,
)
token, loaded_expiration = load_token(str(token_file))
self.assertEqual(expiration, 0)
self.assertEqual(token, "abc123")
self.assertEqual(loaded_expiration, 0)
def test_expired_token_is_removed(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
with patch.object(_MODULE.time, "time", return_value=1000):
save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=1,
)
with patch.object(_MODULE.time, "time", return_value=2000):
token, expiration = load_token(str(token_file))
self.assertIsNone(token)
self.assertIsNone(expiration)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,51 @@
import unittest
import importlib.util
from pathlib import Path
MODULE_PATH = Path("services/update_service.py")
SPEC = importlib.util.spec_from_file_location("update_service_under_test", MODULE_PATH)
update_service = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(update_service)
class UpdateServiceTests(unittest.TestCase):
def test_normalize_update_channel(self):
self.assertEqual(update_service._normalize_update_channel("stable"), "stable")
self.assertEqual(update_service._normalize_update_channel("beta"), "beta")
self.assertEqual(update_service._normalize_update_channel("pre-release"), "beta")
self.assertEqual(update_service._normalize_update_channel("unknown"), "stable")
self.assertEqual(update_service._normalize_update_channel(""), "stable")
def test_select_release_from_list_skips_drafts(self):
releases = [
{"tag_name": "v2.0.0", "draft": True},
{"tag_name": "", "draft": False},
{"tag_name": "v1.9.0-beta.1", "draft": False},
]
selected = update_service._select_release_from_list(releases)
self.assertIsNotNone(selected)
self.assertEqual(selected["tag_name"], "v1.9.0-beta.1")
def test_extract_release_payload_uses_zip_and_checksum(self):
release_data = {
"tag_name": "v1.7.2",
"html_url": "https://example.com/release/v1.7.2",
"assets": [
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
],
}
payload = update_service._extract_release_payload(
release_data=release_data,
repository_url="https://git.daemonlord.ru/benya/AnabasisChatRemove",
current_version="1.7.1",
)
self.assertEqual(payload["latest_version"], "1.7.2")
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
self.assertTrue(payload["has_update"])
if __name__ == "__main__":
unittest.main()

38
tests/test_updater_gui.py Normal file
View File

@@ -0,0 +1,38 @@
import importlib.util
import tempfile
import unittest
from pathlib import Path
MODULE_PATH = Path("updater_gui.py")
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
updater_gui = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(updater_gui)
class UpdaterGuiTests(unittest.TestCase):
def test_read_version_marker(self):
with tempfile.TemporaryDirectory() as tmp_dir:
marker = Path(tmp_dir) / "version.txt"
marker.write_text("2.0.1\n", encoding="utf-8")
value = updater_gui._read_version_marker(tmp_dir)
self.assertEqual(value, "2.0.1")
def test_mirror_tree_skips_selected_file(self):
with tempfile.TemporaryDirectory() as src_tmp, tempfile.TemporaryDirectory() as dst_tmp:
src = Path(src_tmp)
dst = Path(dst_tmp)
(src / "keep.txt").write_text("ok", encoding="utf-8")
(src / "skip.bin").write_text("x", encoding="utf-8")
(src / "sub").mkdir()
(src / "sub" / "nested.txt").write_text("nested", encoding="utf-8")
updater_gui._mirror_tree(str(src), str(dst), skip_names={"skip.bin"})
self.assertTrue((dst / "keep.txt").exists())
self.assertTrue((dst / "sub" / "nested.txt").exists())
self.assertFalse((dst / "skip.bin").exists())
if __name__ == "__main__":
unittest.main()

25
ui/dialogs.py Normal file
View File

@@ -0,0 +1,25 @@
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]

9
ui/main_window.py Normal file
View File

@@ -0,0 +1,9 @@
def instructions_text():
return (
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)

276
updater_gui.py Normal file
View File

@@ -0,0 +1,276 @@
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import time
from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl
from PySide6.QtGui import QDesktopServices
from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout
def _write_log(log_path, message):
try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as f:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{ts}] {message.rstrip()}\n")
except Exception:
pass
def _is_pid_running(pid):
if pid <= 0:
return False
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
return str(pid) in (completed.stdout or "")
except Exception:
return False
def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5):
last_error = None
for _ in range(max(1, retries)):
try:
os.makedirs(os.path.dirname(target_file), exist_ok=True)
shutil.copy2(source_file, target_file)
return
except Exception as exc:
last_error = exc
time.sleep(delay)
raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}")
def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5):
skip_set = {name.lower() for name in (skip_names or [])}
os.makedirs(dst_dir, exist_ok=True)
for root, dirs, files in os.walk(src_dir):
rel = os.path.relpath(root, src_dir)
target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.lower() in skip_set:
continue
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
_copy_file_with_retries(source_file, target_file, retries=retries, delay=delay)
def _read_version_marker(base_dir):
marker_path = os.path.join(base_dir, "version.txt")
if not os.path.exists(marker_path):
return ""
try:
with open(marker_path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
class UpdateWorker(QObject):
status = Signal(int, str)
failed = Signal(str)
done = Signal()
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.app_dir = app_dir
self.source_dir = source_dir
self.exe_name = exe_name
self.target_pid = int(target_pid or 0)
self.version = version or ""
self.work_dir = work_dir or ""
self.log_path = os.path.join(app_dir, "update_error.log")
def _start_app(self):
app_exe = os.path.join(self.app_dir, self.exe_name)
if not os.path.exists(app_exe):
raise RuntimeError(f"Не найден файл приложения: {app_exe}")
creation_flags = 0
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags)
def run(self):
backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}")
skip_names = {"anabasisupdater.exe"}
prev_version = _read_version_marker(self.app_dir)
source_version = _read_version_marker(self.source_dir)
expected_version = (self.version or "").strip()
try:
self.status.emit(1, "Ожидание завершения приложения...")
wait_loops = 0
while _is_pid_running(self.target_pid):
time.sleep(1)
wait_loops += 1
if wait_loops >= 180:
self.status.emit(1, "Принудительное завершение зависшего процесса...")
subprocess.run(
["taskkill", "/PID", str(self.target_pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
check=False,
)
time.sleep(2)
if _is_pid_running(self.target_pid):
raise RuntimeError(f"Процесс {self.target_pid} не завершился.")
break
self.status.emit(2, "Проверка содержимого обновления...")
source_app_exe = os.path.join(self.source_dir, self.exe_name)
if not os.path.exists(source_app_exe):
raise RuntimeError(f"В обновлении отсутствует {self.exe_name}")
if expected_version and source_version and source_version != expected_version:
raise RuntimeError(
f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})."
)
self.status.emit(3, "Создание резервной копии...")
_mirror_tree(self.app_dir, backup_dir, skip_names=skip_names)
self.status.emit(4, "Применение обновления...")
_mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6)
self.status.emit(5, "Проверка установленной версии...")
installed_version = _read_version_marker(self.app_dir)
if expected_version and installed_version and installed_version != expected_version:
raise RuntimeError(
f"После обновления версия {installed_version}, ожидалась {expected_version}."
)
if expected_version and prev_version and prev_version == expected_version:
_write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.")
self.status.emit(6, "Запуск обновленного приложения...")
self._start_app()
_write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}")
self.status.emit(7, "Очистка временных файлов...")
try:
shutil.rmtree(backup_dir, ignore_errors=True)
if self.work_dir and os.path.isdir(self.work_dir):
shutil.rmtree(self.work_dir, ignore_errors=True)
except Exception:
pass
self.done.emit()
except Exception as exc:
_write_log(self.log_path, f"Update failed: {exc}")
try:
self.status.emit(6, "Восстановление из резервной копии...")
if os.path.isdir(backup_dir):
_mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5)
_write_log(self.log_path, "Rollback completed.")
try:
self._start_app()
_write_log(self.log_path, "Restored app started after rollback.")
except Exception as start_exc:
_write_log(self.log_path, f"Failed to start app after rollback: {start_exc}")
except Exception as rollback_exc:
_write_log(self.log_path, f"Rollback failed: {rollback_exc}")
self.failed.emit(str(exc))
class UpdaterWindow(QWidget):
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.setWindowTitle("Anabasis Updater")
self.setMinimumWidth(480)
self.log_path = os.path.join(app_dir, "update_error.log")
self.label = QLabel("Подготовка обновления...")
self.label.setWordWrap(True)
self.progress = QProgressBar()
self.progress.setRange(0, 7)
self.progress.setValue(0)
self.open_log_btn = QPushButton("Открыть лог")
self.open_log_btn.setEnabled(False)
self.open_log_btn.clicked.connect(self.open_log)
self.close_btn = QPushButton("Закрыть")
self.close_btn.setEnabled(False)
self.close_btn.clicked.connect(self.close)
layout = QVBoxLayout(self)
layout.addWidget(self.label)
layout.addWidget(self.progress)
actions = QHBoxLayout()
actions.addStretch(1)
actions.addWidget(self.open_log_btn)
actions.addWidget(self.close_btn)
layout.addLayout(actions)
self.thread = QThread(self)
self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir)
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.worker.status.connect(self.on_status)
self.worker.failed.connect(self.on_failed)
self.worker.done.connect(self.on_done)
self.worker.done.connect(self.thread.quit)
self.worker.failed.connect(self.thread.quit)
self.thread.start()
def on_status(self, step, text):
self.label.setText(text)
self.progress.setValue(max(0, min(7, int(step))))
def on_done(self):
self.label.setText("Обновление успешно применено. Приложение запущено.")
self.progress.setValue(7)
self.open_log_btn.setEnabled(True)
QTimer.singleShot(900, self.close)
def on_failed(self, error_text):
self.label.setText(
"Не удалось применить обновление.\n"
f"Причина: {error_text}\n"
"Подробности сохранены в update_error.log."
)
self.open_log_btn.setEnabled(True)
self.close_btn.setEnabled(True)
def open_log(self):
if os.path.exists(self.log_path):
QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--app-dir", required=True)
parser.add_argument("--source-dir", required=True)
parser.add_argument("--exe-name", required=True)
parser.add_argument("--target-pid", required=True)
parser.add_argument("--version", default="")
parser.add_argument("--work-dir", default="")
return parser.parse_args()
def main():
args = parse_args()
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = UpdaterWindow(
app_dir=args.app_dir,
source_dir=args.source_dir,
exe_name=args.exe_name,
target_pid=args.target_pid,
version=args.version,
work_dir=args.work_dir,
)
window.show()
return app.exec()
if __name__ == "__main__":
sys.exit(main())