11 Commits

Author SHA1 Message Date
e97d346682 feat: добавлена функция назначения администраторов чатов
- Добавлено верхнее меню "Инструменты".
- Реализован метод set_user_admin с вызовом API messages.setMemberRole.
- Добавлена конвертация локального chat_id в peer_id (2000000000+id) для корректной работы метода.
- Добавлены диалоги подтверждения и отчет о результатах выполнения.
2026-01-22 12:55:49 +03:00
3ed5bba9af feat: автоматизация сборки и поддержка бессрочных токенов
- Исправлена ошибка ImportError: QtWebEngineCore путем перехода на PyInstaller.
- Добавлен скрипт build.py для автоматической сборки, очистки DLL и создания ZIP-архива.
- Реализована поддержка бессрочного доступа (offline_access) для VK Access Token.
- Обновлен README.md: добавлены разделы для разработчиков и описание структуры данных.
- Оптимизирован размер билда за счет удаления неиспользуемых библиотек Qt и папок локализации.
2026-01-13 02:38:26 +03:00
1eab8651f2 Исправление логина в чатах, добавлены новые списки чатов для Склада и Кофеен
Signed-off-by: benya <benya@daemonlord.ru>
2025-12-25 15:59:03 +03:00
Alex
ea188ffc13 edit: changed .gitignore 2025-07-24 23:46:13 +03:00
Alex
6aa50b03da feat(build): Добавлена кросс-платформенная поддержка в setup.py
Модифицирован скрипт сборки на основе cx_Freeze для обеспечения совместимости с основными операционными системами (Windows, macOS, Linux). Ранее скрипт был настроен преимущественно для Windows.

Ключевые изменения:
- **Динамическое имя файла:** Исполняемый файл получает расширение `.exe` только при сборке на Windows.
- **Разделение сборок:** Для каждой целевой ОС создается своя папка (например, `build_linux`), что позволяет хранить сборки для разных систем одновременно.
- **Платформо-зависимые опции:** Учтены особенности сборки для каждой ОС, включая `base="Win32GUI"` для Windows и `base=None` для Linux.
2025-07-24 23:43:27 +03:00
Alex
86aa2ddc1a Merge remote-tracking branch 'origin/master' 2025-07-24 23:04:51 +03:00
Alex
32e30f5484 feat: Добавлены массовые операции и разделение чатов на вкладки
- Добавлена кнопка "Список" для одновременной обработки нескольких пользователей.
- Реализовано разделение чатов на вкладки "Офис", "Розница" и "Прочие".

fix: Улучшен интерфейс и исправлены ошибки

- Область списка чатов теперь корректно увеличивается после авторизации.
- В окне подтверждения отображаются имена всех пользователей.
- Исправлены грамматика и склонения в диалоговых окнах.
- Кнопки в диалогах заменены на русскоязычные ("Да"/"Нет", "ОК"/"Отмена").
2025-07-24 23:01:59 +03:00
benya
194d696430 Обновить README.md 2025-06-27 23:49:33 +00:00
Alex
0dbd71c036 Добавлены README и LICENSE 2025-06-28 02:48:28 +03:00
Alex
134aa88f22 Добавлены параметры для сборки программы в исполняемый файл 2025-06-28 02:15:41 +03:00
Alex
0c270a6cb1 - Улучшена верстка UI, устранено перекрытие элементов.
- UI был упрощён
- Добавлено автоматическое получение ID пользователя из VK-ссылки.
- Улучшена обработка ошибок VK API при смене IP-адреса.
- Добавлено отображение имени пользователя в диалогах подтверждения.
- Удалены неиспользуемые импорты и обновлены вызовы методов Qt.
- Добавлено сохранение данных браузера (включая куки) через QWebEngineProfile для поддержания сессии в WebEngine.
2025-06-28 02:03:50 +03:00
27 changed files with 414 additions and 3461 deletions

View File

@@ -1,35 +0,0 @@
name: Desktop CI
on:
push:
branches:
- master
pull_request:
jobs:
tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: https://git.daemonlord.ru/actions/setup-python@v5
with:
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
- name: Run tests
run: |
python -m unittest discover -s tests -p "test_*.py" -v

View File

@@ -1,148 +0,0 @@
name: Desktop Dev Pre-release
on:
push:
branches:
- dev
workflow_dispatch:
jobs:
prerelease:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract prerelease metadata
id: meta
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$commit = (git rev-parse --short HEAD).Trim()
$tag = "v$version-$commit"
$archive = "AnabasisManager-$version-$commit"
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "commit=$commit`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "tag=$tag`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "archive=$archive`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
Write-Host "Detected tag: $tag"
- name: Stop if prerelease already exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Pre-release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Pre-release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Prepare prerelease artifacts
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.meta.outputs.version }}"
$archiveBase = "${{ steps.meta.outputs.archive }}"
$srcZip = "dist/AnabasisManager-$version.zip"
$dstZip = "dist/$archiveBase.zip"
if (-not (Test-Path $srcZip)) {
throw "Archive not found: $srcZip"
}
Copy-Item -Path $srcZip -Destination $dstZip -Force
$hash = (Get-FileHash -Path $dstZip -Algorithm SHA256).Hash.ToLower()
"$hash $archiveBase.zip" | Set-Content -Path "dist/$archiveBase.zip.sha256" -Encoding UTF8
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Pre-release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: ${{ steps.meta.outputs.tag }}
name: Anabasis Manager ${{ steps.meta.outputs.version }} (dev ${{ steps.meta.outputs.commit }})
prerelease: true
body: |
Development pre-release for commit ${{ steps.meta.outputs.commit }}
Version base: ${{ steps.meta.outputs.version }}
files: |
dist/${{ steps.meta.outputs.archive }}.zip
dist/${{ steps.meta.outputs.archive }}.zip.sha256

View File

@@ -1,208 +0,0 @@
name: Desktop Release
on:
push:
branches:
- master
jobs:
release:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Ensure Inno Setup 6
shell: powershell
run: |
$isccPath = ""
$inPath = Get-Command iscc.exe -ErrorAction SilentlyContinue
if ($inPath) {
$isccPath = $inPath.Source
Write-Host "Inno Setup compiler found in PATH."
} elseif (Test-Path "C:\Program Files (x86)\Inno Setup 6\ISCC.exe") {
"C:\Program Files (x86)\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
$isccPath = "C:\Program Files (x86)\Inno Setup 6\ISCC.exe"
Write-Host "Inno Setup compiler found in Program Files (x86)."
} elseif (Test-Path "C:\Program Files\Inno Setup 6\ISCC.exe") {
"C:\Program Files\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
$isccPath = "C:\Program Files\Inno Setup 6\ISCC.exe"
Write-Host "Inno Setup compiler found in Program Files."
} else {
throw "Inno Setup 6 is not installed on runner. Install Inno Setup and restart runner service."
}
Write-Host "Using ISCC: $isccPath"
exit 0
- name: Extract app version
id: extract_version
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
Write-Host "Detected version: $version"
- name: Initialize release flow
id: flow_init
shell: powershell
run: |
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
exit 0
- name: Stop if release already exists
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
env:
PYTHONUTF8: "1"
PYTHONIOENCODING: "utf-8"
run: |
$ErrorActionPreference = "Continue"
$repoRoot = (git rev-parse --show-toplevel).Trim()
if (-not [string]::IsNullOrWhiteSpace($repoRoot)) {
Set-Location $repoRoot
}
$logDir = Join-Path $env:RUNNER_TEMP "anabasis-build"
New-Item -ItemType Directory -Force -Path $logDir | Out-Null
$buildLog = Join-Path $logDir "build.log"
python build.py *>&1 | Tee-Object -FilePath $buildLog
$code = $LASTEXITCODE
if ($code -ne 0) {
Write-Host "Build failed with exit code $code. Dumping build log:"
if (Test-Path $buildLog) {
Get-Content -Path $buildLog -Raw
} else {
Write-Host "Build log was not created: $buildLog"
}
exit $code
}
- name: Ensure artifacts exist
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archivePath = "dist/AnabasisManager-$version.zip"
$installerPath = "dist/AnabasisManager-setup-$version.exe"
if (-not (Test-Path $archivePath)) {
throw "Archive not found: $archivePath"
}
if (-not (Test-Path $installerPath)) {
throw "Installer not found: $installerPath"
}
- name: Generate SHA256 checksum
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archiveName = "AnabasisManager-$version.zip"
$installerName = "AnabasisManager-setup-$version.exe"
foreach ($name in @($archiveName, $installerName)) {
$path = "dist/$name"
$checksumPath = "dist/$name.sha256"
$hash = (Get-FileHash -Path $path -Algorithm SHA256).Hash.ToLower()
"$hash $name" | Set-Content -Path $checksumPath -Encoding UTF8
Write-Host "Checksum created: $checksumPath"
}
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
$sha = "${{ gitea.sha }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag -a "$tag" -m "Release $tag" "$sha"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: v${{ steps.extract_version.outputs.version }}
target_commitish: ${{ gitea.sha }}
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
body: |
Desktop release v${{ steps.extract_version.outputs.version }}
files: |
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe.sha256

7
.gitignore vendored
View File

@@ -4,10 +4,3 @@
/build_linux/
/build_win32/
/build_darwin/
.idea/
__pycache__/
*.py[cod]
tests/__pycache__/
build/
dist/
AnabasisManager.spec

View File

@@ -13,12 +13,9 @@
* Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед.
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
* Визуальный прогресс-бар по ходу операции.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса.
---
@@ -50,7 +47,7 @@
3. **Установите зависимости:**
```bash
pip install -r requirements.txt
pip install PySide6 vk_api
```
4. **Запустите приложение:**
@@ -71,12 +68,6 @@
## 📂 Техническая информация
### Последние обновления
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.

View File

@@ -1 +0,0 @@
APP_VERSION = "2.2.5"

View File

@@ -1,94 +0,0 @@
import os
import sys
import time
import json
import threading
from urllib.parse import urlparse, parse_qs, unquote
import webview
def extract_token(url_string):
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
return token, expires_in
def main_auth(auth_url, output_path):
def poll_url():
try:
url = window.get_current_url()
except Exception:
url = None
if url:
token, expires_in = extract_token(url)
if token:
data = {"token": token, "expires_in": expires_in}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f)
window.destroy()
return
threading.Timer(0.5, poll_url).start()
def on_loaded():
threading.Timer(0.5, poll_url).start()
window = webview.create_window("VK Авторизация", auth_url)
window.events.loaded += on_loaded
storage_path = os.path.join(os.path.dirname(output_path), "webview_profile")
webview.start(private_mode=False, storage_path=storage_path)
def main():
# Supports both: `python auth_webview.py <auth_url> <output_path>`
# and: `python auth_webview.py --auth <auth_url> <output_path>`
args = sys.argv[1:]
if len(args) == 3 and args[0] == "--auth":
auth_url, output_path = args[1], args[2]
elif len(args) == 2:
auth_url, output_path = args[0], args[1]
else:
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
return 1
main_auth(auth_url, output_path)
return 0
if __name__ == "__main__":
sys.exit(main())

270
build.py
View File

@@ -2,21 +2,51 @@ import os
import shutil
import subprocess
import sys
from app_version import APP_VERSION
# --- Configuration ---
# --- Конфигурация ---
APP_NAME = "AnabasisManager"
UPDATER_NAME = "AnabasisUpdater"
VERSION = APP_VERSION # Единая версия приложения
VERSION = "1.3" # Ваша версия
MAIN_SCRIPT = "main.py"
UPDATER_SCRIPT = "updater_gui.py"
ICON_PATH = "icon.ico"
INSTALLER_SCRIPT = os.path.join("installer", "AnabasisManager.iss")
DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
INSTALLER_NAME = f"{APP_NAME}-setup-{VERSION}.exe"
SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
REMOVE_LIST = [
def run_build():
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
command = [
"pyinstaller",
"--noconfirm",
"--onedir",
"--windowed",
f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "",
"--collect-all", "PySide6.QtWebEngineCore",
"--collect-all", "PySide6.QtWebEngineWidgets",
MAIN_SCRIPT
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print("\n[OK] Сборка PyInstaller завершена.")
except subprocess.CalledProcessError as e:
print(f"\n[ERROR] Ошибка при сборке: {e}")
sys.exit(1)
def run_cleanup():
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
# Пытаемся найти папку PySide6 внутри сборки
pyside_path = os.path.join(DIST_DIR, "PySide6")
if not os.path.exists(pyside_path):
pyside_path = DIST_DIR
to_remove = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
@@ -25,112 +55,7 @@ REMOVE_LIST = [
"Qt6QuickTemplates2.dll"
]
def write_version_marker():
marker_path = os.path.join(DIST_DIR, "version.txt")
try:
os.makedirs(DIST_DIR, exist_ok=True)
with open(marker_path, "w", encoding="utf-8") as f:
f.write(str(VERSION).strip() + "\n")
print(f"[OK] Version marker written: {marker_path}")
except Exception as e:
print(f"[ERROR] Failed to write version.txt: {e}")
sys.exit(1)
def copy_icon_to_dist():
icon_abs_path = os.path.abspath(ICON_PATH)
if not os.path.exists(icon_abs_path):
print("[WARN] icon.ico not found, skipping icon copy into dist.")
return
try:
os.makedirs("dist", exist_ok=True)
os.makedirs(DIST_DIR, exist_ok=True)
shutil.copy2(icon_abs_path, os.path.join("dist", "icon.ico"))
shutil.copy2(icon_abs_path, os.path.join(DIST_DIR, "icon.ico"))
print("[OK] Icon copied to dist/icon.ico and dist/AnabasisManager/icon.ico")
except Exception as e:
print(f"[ERROR] Failed to copy icon.ico into dist: {e}")
sys.exit(1)
def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing:
print("[ERROR] Run this script from the project root.")
print(f"[ERROR] Missing files: {', '.join(missing)}")
sys.exit(1)
def run_build():
print(f"--- 1. Running PyInstaller for {APP_NAME} v{VERSION} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
command = [
"pyinstaller",
"--noconfirm",
"--onedir",
"--windowed",
"--exclude-module", "PySide6.QtWebEngineCore",
"--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}",
f"--icon={icon_abs_path}" if has_icon else "",
f"--add-data={icon_abs_path}{os.pathsep}." if has_icon else "",
f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print("\n[OK] PyInstaller build completed.")
except subprocess.CalledProcessError as e:
print(f"\n[ERROR] Build failed: {e}")
sys.exit(1)
def run_updater_build():
print(f"\n--- 1.2 Building {UPDATER_NAME} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
updater_spec_dir = os.path.join("build", "updater_spec")
updater_spec_path = os.path.join(updater_spec_dir, f"{UPDATER_NAME}.spec")
if os.path.exists(updater_spec_path):
os.remove(updater_spec_path)
command = [
"pyinstaller",
"--noconfirm",
"--clean",
"--onefile",
"--windowed",
f"--name={UPDATER_NAME}",
"--distpath", DIST_DIR,
"--workpath", os.path.join("build", "updater"),
"--specpath", updater_spec_dir,
f"--icon={icon_abs_path}" if has_icon else "",
UPDATER_SCRIPT,
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print(f"[OK] {UPDATER_NAME} built.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to build {UPDATER_NAME}: {e}")
sys.exit(1)
def run_cleanup():
print(f"\n--- 2. Optimizing {APP_NAME} folder ---")
# Пытаемся найти папку PySide6 внутри сборки
pyside_path = os.path.join(DIST_DIR, "PySide6")
if not os.path.exists(pyside_path):
pyside_path = DIST_DIR
for item in REMOVE_LIST:
for item in to_remove:
path = os.path.join(pyside_path, item)
if os.path.exists(path):
try:
@@ -138,135 +63,34 @@ def run_cleanup():
shutil.rmtree(path)
else:
os.remove(path)
print(f"Removed: {item}")
print(f"Удалено: {item}")
except Exception as e:
print(f"Skipped {item}: {e}")
print(f"Пропуск {item}: {e}")
def create_archive():
print(f"\n--- 3. Creating archive {ARCHIVE_NAME}.zip ---")
print(f"\n--- 3. Создание архива {ARCHIVE_NAME}.zip ---")
try:
# Создаем zip-архив из папки DIST_DIR
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
print(f"[OK] Archive created: dist/{ARCHIVE_NAME}.zip")
print(f"[OK] Архив создан: dist/{ARCHIVE_NAME}.zip")
except Exception as e:
print(f"[ERROR] Failed to create archive: {e}")
def _find_iscc():
candidates = []
iscc_env = os.getenv("ISCC_PATH", "").strip()
if iscc_env:
candidates.append(iscc_env)
candidates.append(shutil.which("iscc"))
candidates.append(shutil.which("ISCC.exe"))
candidates.append(r"C:\Program Files (x86)\Inno Setup 6\ISCC.exe")
candidates.append(r"C:\Program Files\Inno Setup 6\ISCC.exe")
for candidate in candidates:
if candidate and os.path.exists(candidate):
return candidate
return ""
def _decode_process_output(raw_bytes):
if raw_bytes is None:
return ""
if isinstance(raw_bytes, str):
return raw_bytes
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "cp1251", "cp866", "latin-1"):
try:
return raw_bytes.decode(enc)
except Exception:
continue
return raw_bytes.decode("utf-8", errors="replace")
def build_installer():
print(f"\n--- 4. Building installer {INSTALLER_NAME} ---")
if os.name != "nt":
print("[INFO] Inno Setup installer is built only on Windows. Step skipped.")
return
if not os.path.exists(INSTALLER_SCRIPT):
print(f"[ERROR] Installer script not found: {INSTALLER_SCRIPT}")
sys.exit(1)
if not os.path.exists(DIST_DIR):
print(f"[ERROR] Build output folder not found: {DIST_DIR}")
sys.exit(1)
iscc_path = _find_iscc()
if not iscc_path:
print("[ERROR] Inno Setup Compiler (ISCC.exe) not found.")
print("[ERROR] Install Inno Setup 6 or set ISCC_PATH environment variable.")
sys.exit(1)
project_root = os.path.abspath(".")
source_dir = os.path.abspath(DIST_DIR)
output_dir = os.path.abspath("dist")
iss_path = os.path.abspath(INSTALLER_SCRIPT)
icon_path = os.path.abspath(ICON_PATH)
print(f"[INFO] ISCC source dir: {source_dir}")
print(f"[INFO] ISCC output dir: {output_dir}")
print(f"[INFO] ISCC script: {iss_path}")
print(f"[INFO] ISCC icon path: {icon_path}")
if not os.path.exists(source_dir):
print(f"[ERROR] Source dir does not exist: {source_dir}")
sys.exit(1)
if not os.path.exists(iss_path):
print(f"[ERROR] Installer script does not exist: {iss_path}")
sys.exit(1)
if not os.path.exists(icon_path):
print(f"[ERROR] Icon file does not exist: {icon_path}")
sys.exit(1)
command = [
iscc_path,
f"/DMyAppVersion={VERSION}",
f"/DMyIconFile={icon_path}",
f"/O{output_dir}",
iss_path,
]
try:
completed = subprocess.run(
command,
capture_output=True,
cwd=project_root,
check=False,
)
stdout_text = _decode_process_output(completed.stdout)
stderr_text = _decode_process_output(completed.stderr)
if stdout_text:
print(stdout_text.rstrip())
if stderr_text:
print(stderr_text.rstrip())
if completed.returncode != 0:
raise RuntimeError(f"ISCC exited with code {completed.returncode}")
installer_path = os.path.join("dist", INSTALLER_NAME)
if not os.path.exists(installer_path):
print(f"[ERROR] Installer was not created: {installer_path}")
sys.exit(1)
print(f"[OK] Installer created: {installer_path}")
except Exception as e:
print(f"[ERROR] Failed to build installer: {e}")
sys.exit(1)
print(f"[ERROR] Не удалось создать архив: {e}")
if __name__ == "__main__":
ensure_project_root()
# Предварительная очистка
for folder in ["build", "dist"]:
if os.path.exists(folder):
shutil.rmtree(folder)
run_build()
run_updater_build()
run_cleanup()
copy_icon_to_dist()
write_version_marker()
create_archive()
build_installer()
print("\n" + "=" * 30)
print("BUILD COMPLETED")
print(f"Release archive: dist/{ARCHIVE_NAME}.zip")
print(f"Installer: dist/{INSTALLER_NAME}")
print(f"ПРОЦЕСС ЗАВЕРШЕН")
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
print("=" * 30)

View File

@@ -1,42 +0,0 @@
#define MyAppName "Anabasis Manager"
#ifndef MyAppVersion
#define MyAppVersion "0.0.0"
#endif
#ifndef MyIconFile
#define MyIconFile "..\icon.ico"
#endif
[Setup]
AppId={{6CD9D6F2-4B95-4E9C-A8D8-2A9C8F6AA741}
AppName={#MyAppName}
AppVersion={#MyAppVersion}
AppPublisher=Benya
DefaultDirName={localappdata}\Programs\Anabasis Manager
DefaultGroupName=Anabasis Manager
DisableProgramGroupPage=yes
PrivilegesRequired=lowest
OutputDir=..\dist
OutputBaseFilename=AnabasisManager-setup-{#MyAppVersion}
Compression=lzma2
SolidCompression=yes
WizardStyle=modern
ArchitecturesInstallIn64BitMode=x64compatible
UninstallDisplayIcon={app}\AnabasisManager.exe
SetupIconFile={#MyIconFile}
[Languages]
Name: "russian"; MessagesFile: "compiler:Languages\Russian.isl"
Name: "english"; MessagesFile: "compiler:Default.isl"
[Tasks]
Name: "desktopicon"; Description: "Создать ярлык на рабочем столе"; GroupDescription: "Дополнительные задачи:"
[Files]
Source: "..\dist\AnabasisManager\*"; DestDir: "{app}"; Flags: ignoreversion recursesubdirs createallsubdirs
[Icons]
Name: "{group}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"
Name: "{autodesktop}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"; Tasks: desktopicon
[Run]
Filename: "{app}\AnabasisManager.exe"; Description: "Запустить Anabasis Manager"; Flags: nowait postinstall skipifsilent

1435
main.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,2 @@
PySide6~=6.10.2
PySide6~=6.9.1
vk-api~=11.9.9
pywebview

View File

@@ -1,5 +0,0 @@
from .auto_update_service import AutoUpdateService
from .chat_actions import load_chat_conversations, resolve_user_ids
from .token_store import load_token, save_token
from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService

View File

@@ -1,232 +0,0 @@
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import urllib.request
import zipfile
class AutoUpdateService:
@staticmethod
def _safe_extract_zip(archive, destination_dir):
destination_real = os.path.realpath(destination_dir)
for member in archive.infolist():
member_name = member.filename or ""
if not member_name:
continue
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
archive.extractall(destination_dir)
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
@staticmethod
def download_update_text(url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
@classmethod
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = cls.download_update_text(checksum_url)
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = cls.sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
@staticmethod
def locate_extracted_root(extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
@staticmethod
def build_update_script(app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal EnableExtensions",
f"set \"APP_DIR={app_dir}\"",
f"set \"SRC_DIR={source_dir}\"",
f"set \"EXE_NAME={exe_name}\"",
f"set \"TARGET_PID={target_pid}\"",
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
" exit /b 3",
")",
"set /a WAIT_LOOPS=0",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"",
" taskkill /PID %TARGET_PID% /T /F >nul 2>&1",
" timeout /t 2 /nobreak >nul",
" tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
" if %ERRORLEVEL% EQU 0 goto :pid_still_running",
" goto :wait_image_unlock",
" )",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
":wait_image_unlock",
"set /a IMG_LOOPS=0",
":check_image",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a IMG_LOOPS+=1",
" if %IMG_LOOPS% GEQ 60 goto :image_still_running",
" timeout /t 1 /nobreak >nul",
" goto :check_image",
")",
":backup",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"echo Update success >> \"%UPDATE_LOG%\"",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%",
":pid_still_running",
"echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"",
"exit /b 4",
":image_still_running",
"echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"",
"exit /b 5",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
@staticmethod
def launch_update_script(script_path, work_dir):
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
@staticmethod
def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""):
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe")
if not os.path.exists(updater_exe):
raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.")
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
[
updater_exe,
"--app-dir",
app_dir,
"--source-dir",
source_dir,
"--exe-name",
exe_name,
"--target-pid",
str(target_pid),
"--version",
str(version or ""),
"--work-dir",
str(work_dir or ""),
],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod
def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
cls.download_update_archive(download_url, zip_path)
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
cls._safe_extract_zip(archive, unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

View File

@@ -1,105 +0,0 @@
from urllib.parse import urlparse
def _safe_log(log_func, context, message):
if not log_func:
return
try:
log_func(context, message)
except TypeError:
log_func(f"{context}: {message}")
def resolve_user_ids(vk_call_with_retry, vk_api, links):
resolved_ids = []
failed_links = []
for link in links:
try:
path = urlparse(link).path
screen_name = path.split("/")[-1] if path else ""
if not screen_name and len(path.split("/")) > 1:
screen_name = path.split("/")[-2]
if not screen_name:
failed_links.append((link, None))
continue
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get("type") == "user":
resolved_ids.append(resolved_object["object_id"])
else:
failed_links.append((link, None))
except Exception as e:
failed_links.append((link, e))
return resolved_ids, failed_links
def load_chat_conversations(vk_call_with_retry, vk_api, log_func=None):
conversations = []
start_from = None
seen_start_tokens = set()
total_count = None
page_num = 0
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
_safe_log(log_func, "load_chats_page", f"stop duplicate next_from={start_from}")
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_num += 1
if total_count is None:
total_count = response.get("count")
page_items = response.get("items", [])
_safe_log(
log_func,
"load_chats_page",
f"page={page_num} items={len(page_items)} next_from={response.get('next_from')} total={total_count}",
)
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
if total_count is not None and total_count > len(conversations):
_safe_log(
log_func,
"load_chats_fallback",
f"start offset pagination total={total_count} current={len(conversations)}",
)
seen_keys = set()
for conv in conversations:
peer = (conv.get("conversation") or {}).get("peer", {})
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
seen_keys.add(key)
offset = len(conversations)
safety_pages = 0
while offset < total_count:
params = {"count": 200, "filter": "all", "offset": offset}
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_items = response.get("items", [])
_safe_log(
log_func,
"load_chats_fallback",
f"offset={offset} items={len(page_items)} total={response.get('count')}",
)
if not page_items:
break
for item in page_items:
peer = (item.get("conversation") or {}).get("peer", {})
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
if key in seen_keys:
continue
seen_keys.add(key)
conversations.append(item)
offset += len(page_items)
safety_pages += 1
if safety_pages > 50:
_safe_log(log_func, "load_chats_fallback", "stop safety_pages>50")
break
return conversations

View File

@@ -1,136 +0,0 @@
import base64
import ctypes
import json
import os
import time
from ctypes import wintypes
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def save_token(token, token_file, app_data_dir, expires_in=0):
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(app_data_dir, exist_ok=True)
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception as exc:
raise RuntimeError("Failed to securely store token with DPAPI.") from exc
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted,
}
with open(token_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return expiration_time
def load_token(token_file):
if not os.path.exists(token_file):
return None, None
with open(token_file, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception:
try:
os.remove(token_file)
except Exception:
pass
return None, None
expiration_time = data.get("expiration_time")
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
try:
os.remove(token_file)
except Exception:
pass
return None, None

View File

@@ -1,264 +0,0 @@
import json
import os
import re
import urllib.error
import urllib.request
from urllib.parse import urlparse
try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _normalize_update_channel(value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _select_release_from_list(releases):
for item in releases:
if not isinstance(item, dict):
continue
if item.get("draft"):
continue
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
if not tag_name:
continue
return item
return None
def _extract_release_payload(release_data, repository_url, current_version):
parsed = urlparse(repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
releases_url = f"{base_url}/{repo_path}/releases"
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
installer_url = ""
installer_name = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
url = asset.get("browser_download_url", "")
name = asset.get("name", "")
name_lower = name.lower()
if installer_url:
break
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
installer_url = url
installer_name = name
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
return {
"repository_url": repository_url,
"latest_version": latest_version,
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"release_notes": release_notes,
"download_url": download_url,
"download_name": download_name,
"installer_url": installer_url,
"installer_name": installer_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, current_version),
}
def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
cfg_url = _sanitize_repo_url(configured_url)
if cfg_url:
return cfg_url
cfg_repo = _sanitize_repo_url(configured_repo)
if cfg_repo:
return cfg_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if not match:
return ""
remote = match.group(1).strip()
if remote.startswith("git@"):
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
self.request_timeout = request_timeout
self.channel = _normalize_update_channel(channel)
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
use_beta_channel = self.channel == "beta"
if parsed.netloc.lower().endswith("github.com"):
if use_beta_channel:
api_url = f"https://api.github.com/repos/{repo_path}/releases"
else:
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
if use_beta_channel:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
release_data = response_data
if use_beta_channel:
if not isinstance(response_data, list):
self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
return
release_data = _select_release_from_list(response_data)
if not release_data:
self.check_failed.emit("В канале beta не найдено доступных релизов.")
return
elif not isinstance(response_data, dict):
self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
return
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
payload["release_channel"] = self.channel
payload["releases_url"] = releases_url
self.check_finished.emit(payload)

View File

@@ -1,59 +0,0 @@
import os
import sys
import time
from vk_api import VkApi
from vk_api.exceptions import VkApiError
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path, entry_script_path=None):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
script_path = entry_script_path or os.path.abspath(__file__)
return sys.executable, [script_path, "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)

View File

@@ -1,51 +0,0 @@
import hashlib
import importlib.util
import tempfile
import unittest
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"auto_update_service",
Path("services/auto_update_service.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
AutoUpdateService = _MODULE.AutoUpdateService
class AutoUpdateServiceTests(unittest.TestCase):
def test_extract_sha256_from_text(self):
digest = "a" * 64
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
extracted = AutoUpdateService.extract_sha256_from_text(
text,
"AnabasisManager-1.0.0-win.zip",
)
self.assertEqual(extracted, digest)
def test_sha256_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "payload.bin"
payload = b"anabasis"
path.write_bytes(payload)
expected = hashlib.sha256(payload).hexdigest()
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
def test_build_update_script_contains_core_vars(self):
script = AutoUpdateService.build_update_script(
app_dir=r"C:\Apps\AnabasisManager",
source_dir=r"C:\Temp\Extracted",
exe_name="AnabasisManager.exe",
target_pid=1234,
)
script_text = Path(script).read_text(encoding="utf-8")
self.assertIn("set \"APP_DIR=", script_text)
self.assertIn("set \"SRC_DIR=", script_text)
self.assertIn("set \"EXE_NAME=", script_text)
self.assertIn("set \"TARGET_PID=", script_text)
self.assertIn(":rollback", script_text)
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,65 +0,0 @@
import unittest
import importlib.util
from types import SimpleNamespace
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"chat_actions",
Path("services/chat_actions.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_chat_conversations = _MODULE.load_chat_conversations
resolve_user_ids = _MODULE.resolve_user_ids
class ChatActionsTests(unittest.TestCase):
def test_resolve_user_ids_mixed_results(self):
mapping = {
"id1": {"type": "user", "object_id": 1},
"id2": {"type": "group", "object_id": 2},
}
def call_with_retry(func, **kwargs):
return func(**kwargs)
def resolve_screen_name(screen_name):
if screen_name == "boom":
raise RuntimeError("boom")
return mapping.get(screen_name)
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
links = [
"https://vk.com/id1",
"https://vk.com/id2",
"https://vk.com/boom",
"https://vk.com/",
]
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
self.assertEqual(resolved, [1])
self.assertEqual(len(failed), 3)
self.assertEqual(failed[0][0], "https://vk.com/id2")
self.assertIsNone(failed[0][1])
def test_load_chat_conversations_paginated(self):
pages = [
{"items": [{"id": 1}], "next_from": "page-2"},
{"items": [{"id": 2}]},
]
def get_conversations(**kwargs):
if kwargs.get("start_from") == "page-2":
return pages[1]
return pages[0]
def call_with_retry(func, **kwargs):
return func(**kwargs)
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
items = load_chat_conversations(call_with_retry, vk_api)
self.assertEqual(items, [{"id": 1}, {"id": 2}])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,114 +0,0 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,53 +0,0 @@
import tempfile
import unittest
import importlib.util
from pathlib import Path
from unittest.mock import patch
_SPEC = importlib.util.spec_from_file_location(
"token_store",
Path("services/token_store.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_token = _MODULE.load_token
save_token = _MODULE.save_token
class TokenStoreTests(unittest.TestCase):
def test_save_and_load_non_expiring_token(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
expiration = save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=0,
)
token, loaded_expiration = load_token(str(token_file))
self.assertEqual(expiration, 0)
self.assertEqual(token, "abc123")
self.assertEqual(loaded_expiration, 0)
def test_expired_token_is_removed(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
with patch.object(_MODULE.time, "time", return_value=1000):
save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=1,
)
with patch.object(_MODULE.time, "time", return_value=2000):
token, expiration = load_token(str(token_file))
self.assertIsNone(token)
self.assertIsNone(expiration)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,97 +0,0 @@
import unittest
from types import SimpleNamespace
from unittest import mock
class _DummySignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _DummyThread:
created = 0
def __init__(self, _parent=None):
type(self).created += 1
self.started = _DummySignal()
self.finished = _DummySignal()
def start(self):
self.started.emit()
def quit(self):
self.finished.emit()
def deleteLater(self):
return None
class _DummyChecker:
created = 0
def __init__(self, *_args, **_kwargs):
type(self).created += 1
self.check_finished = _DummySignal()
self.check_failed = _DummySignal()
def moveToThread(self, _thread):
return None
def run(self):
return None
def deleteLater(self):
return None
class UpdateReentryRuntimeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
import main # noqa: PLC0415
except Exception as exc:
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
cls.main = main
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
_DummyChecker.created = 0
_DummyThread.created = 0
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
manager._update_in_progress = False
manager._update_check_silent = False
manager.update_channel = "stable"
manager.update_repository_url = "https://example.com/org/repo"
manager.update_checker = None
manager.update_thread = None
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
manager._log_event = lambda *_args, **_kwargs: None
manager._set_update_action_state = lambda *_args, **_kwargs: None
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertTrue(manager._update_in_progress)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
first_thread = manager.update_thread
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
self.assertIs(manager.update_thread, first_thread)
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
self.assertFalse(manager._update_in_progress)
self.assertIsNone(manager.update_checker)
self.assertIsNone(manager.update_thread)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,53 +0,0 @@
import unittest
import importlib.util
from pathlib import Path
MODULE_PATH = Path("services/update_service.py")
SPEC = importlib.util.spec_from_file_location("update_service_under_test", MODULE_PATH)
update_service = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(update_service)
class UpdateServiceTests(unittest.TestCase):
def test_normalize_update_channel(self):
self.assertEqual(update_service._normalize_update_channel("stable"), "stable")
self.assertEqual(update_service._normalize_update_channel("beta"), "beta")
self.assertEqual(update_service._normalize_update_channel("pre-release"), "beta")
self.assertEqual(update_service._normalize_update_channel("unknown"), "stable")
self.assertEqual(update_service._normalize_update_channel(""), "stable")
def test_select_release_from_list_skips_drafts(self):
releases = [
{"tag_name": "v2.0.0", "draft": True},
{"tag_name": "", "draft": False},
{"tag_name": "v1.9.0-beta.1", "draft": False},
]
selected = update_service._select_release_from_list(releases)
self.assertIsNotNone(selected)
self.assertEqual(selected["tag_name"], "v1.9.0-beta.1")
def test_extract_release_payload_uses_zip_and_checksum(self):
release_data = {
"tag_name": "v1.7.2",
"html_url": "https://example.com/release/v1.7.2",
"assets": [
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
{"name": "AnabasisManager-setup-1.7.2.exe", "browser_download_url": "https://example.com/setup.exe"},
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
],
}
payload = update_service._extract_release_payload(
release_data=release_data,
repository_url="https://git.daemonlord.ru/benya/AnabasisChatRemove",
current_version="1.7.1",
)
self.assertEqual(payload["latest_version"], "1.7.2")
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
self.assertEqual(payload["installer_url"], "https://example.com/setup.exe")
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
self.assertTrue(payload["has_update"])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,107 +0,0 @@
import importlib.util
import sys
import tempfile
import unittest
from pathlib import Path
import types
def _install_pyside6_stubs():
pyside6_module = types.ModuleType("PySide6")
pyside6_module.__path__ = [] # treat as package
qtcore_module = types.ModuleType("PySide6.QtCore")
qtgui_module = types.ModuleType("PySide6.QtGui")
qtwidgets_module = types.ModuleType("PySide6.QtWidgets")
class _Signal:
def __init__(self, *args, **kwargs):
pass
def connect(self, *args, **kwargs):
pass
class _QObject:
pass
class _QThread:
def __init__(self, *args, **kwargs):
pass
class _QTimer:
@staticmethod
def singleShot(*args, **kwargs):
pass
class _QUrl:
@staticmethod
def fromLocalFile(path):
return path
class _QDesktopServices:
@staticmethod
def openUrl(*args, **kwargs):
return True
class _Widget:
def __init__(self, *args, **kwargs):
pass
qtcore_module.QObject = _QObject
qtcore_module.Qt = type("Qt", (), {})
qtcore_module.QThread = _QThread
qtcore_module.Signal = _Signal
qtcore_module.QTimer = _QTimer
qtcore_module.QUrl = _QUrl
qtgui_module.QDesktopServices = _QDesktopServices
qtwidgets_module.QApplication = _Widget
qtwidgets_module.QLabel = _Widget
qtwidgets_module.QProgressBar = _Widget
qtwidgets_module.QVBoxLayout = _Widget
qtwidgets_module.QWidget = _Widget
qtwidgets_module.QPushButton = _Widget
qtwidgets_module.QHBoxLayout = _Widget
# Force stubs even if real PySide6 was imported earlier in the process.
for mod_name in list(sys.modules.keys()):
if mod_name == "PySide6" or mod_name.startswith("PySide6."):
del sys.modules[mod_name]
sys.modules["PySide6"] = pyside6_module
sys.modules["PySide6.QtCore"] = qtcore_module
sys.modules["PySide6.QtGui"] = qtgui_module
sys.modules["PySide6.QtWidgets"] = qtwidgets_module
MODULE_PATH = Path("updater_gui.py")
_install_pyside6_stubs()
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
updater_gui = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(updater_gui)
class UpdaterGuiTests(unittest.TestCase):
def test_read_version_marker(self):
with tempfile.TemporaryDirectory() as tmp_dir:
marker = Path(tmp_dir) / "version.txt"
marker.write_text("2.0.1\n", encoding="utf-8")
value = updater_gui._read_version_marker(tmp_dir)
self.assertEqual(value, "2.0.1")
def test_mirror_tree_skips_selected_file(self):
with tempfile.TemporaryDirectory() as src_tmp, tempfile.TemporaryDirectory() as dst_tmp:
src = Path(src_tmp)
dst = Path(dst_tmp)
(src / "keep.txt").write_text("ok", encoding="utf-8")
(src / "skip.bin").write_text("x", encoding="utf-8")
(src / "sub").mkdir()
(src / "sub" / "nested.txt").write_text("nested", encoding="utf-8")
updater_gui._mirror_tree(str(src), str(dst), skip_names={"skip.bin"})
self.assertTrue((dst / "keep.txt").exists())
self.assertTrue((dst / "sub" / "nested.txt").exists())
self.assertFalse((dst / "skip.bin").exists())
if __name__ == "__main__":
unittest.main()

View File

@@ -1,25 +0,0 @@
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]

View File

@@ -1,9 +0,0 @@
def instructions_text():
return (
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)

View File

@@ -1,276 +0,0 @@
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import time
from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl
from PySide6.QtGui import QDesktopServices
from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout
def _write_log(log_path, message):
try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as f:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{ts}] {message.rstrip()}\n")
except Exception:
pass
def _is_pid_running(pid):
if pid <= 0:
return False
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
return str(pid) in (completed.stdout or "")
except Exception:
return False
def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5):
last_error = None
for _ in range(max(1, retries)):
try:
os.makedirs(os.path.dirname(target_file), exist_ok=True)
shutil.copy2(source_file, target_file)
return
except Exception as exc:
last_error = exc
time.sleep(delay)
raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}")
def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5):
skip_set = {name.lower() for name in (skip_names or [])}
os.makedirs(dst_dir, exist_ok=True)
for root, dirs, files in os.walk(src_dir):
rel = os.path.relpath(root, src_dir)
target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.lower() in skip_set:
continue
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
_copy_file_with_retries(source_file, target_file, retries=retries, delay=delay)
def _read_version_marker(base_dir):
marker_path = os.path.join(base_dir, "version.txt")
if not os.path.exists(marker_path):
return ""
try:
with open(marker_path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
class UpdateWorker(QObject):
status = Signal(int, str)
failed = Signal(str)
done = Signal()
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.app_dir = app_dir
self.source_dir = source_dir
self.exe_name = exe_name
self.target_pid = int(target_pid or 0)
self.version = version or ""
self.work_dir = work_dir or ""
self.log_path = os.path.join(app_dir, "update_error.log")
def _start_app(self):
app_exe = os.path.join(self.app_dir, self.exe_name)
if not os.path.exists(app_exe):
raise RuntimeError(f"Не найден файл приложения: {app_exe}")
creation_flags = 0
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags)
def run(self):
backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}")
skip_names = {"anabasisupdater.exe"}
prev_version = _read_version_marker(self.app_dir)
source_version = _read_version_marker(self.source_dir)
expected_version = (self.version or "").strip()
try:
self.status.emit(1, "Ожидание завершения приложения...")
wait_loops = 0
while _is_pid_running(self.target_pid):
time.sleep(1)
wait_loops += 1
if wait_loops >= 180:
self.status.emit(1, "Принудительное завершение зависшего процесса...")
subprocess.run(
["taskkill", "/PID", str(self.target_pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
check=False,
)
time.sleep(2)
if _is_pid_running(self.target_pid):
raise RuntimeError(f"Процесс {self.target_pid} не завершился.")
break
self.status.emit(2, "Проверка содержимого обновления...")
source_app_exe = os.path.join(self.source_dir, self.exe_name)
if not os.path.exists(source_app_exe):
raise RuntimeError(f"В обновлении отсутствует {self.exe_name}")
if expected_version and source_version and source_version != expected_version:
raise RuntimeError(
f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})."
)
self.status.emit(3, "Создание резервной копии...")
_mirror_tree(self.app_dir, backup_dir, skip_names=skip_names)
self.status.emit(4, "Применение обновления...")
_mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6)
self.status.emit(5, "Проверка установленной версии...")
installed_version = _read_version_marker(self.app_dir)
if expected_version and installed_version and installed_version != expected_version:
raise RuntimeError(
f"После обновления версия {installed_version}, ожидалась {expected_version}."
)
if expected_version and prev_version and prev_version == expected_version:
_write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.")
self.status.emit(6, "Запуск обновленного приложения...")
self._start_app()
_write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}")
self.status.emit(7, "Очистка временных файлов...")
try:
shutil.rmtree(backup_dir, ignore_errors=True)
if self.work_dir and os.path.isdir(self.work_dir):
shutil.rmtree(self.work_dir, ignore_errors=True)
except Exception:
pass
self.done.emit()
except Exception as exc:
_write_log(self.log_path, f"Update failed: {exc}")
try:
self.status.emit(6, "Восстановление из резервной копии...")
if os.path.isdir(backup_dir):
_mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5)
_write_log(self.log_path, "Rollback completed.")
try:
self._start_app()
_write_log(self.log_path, "Restored app started after rollback.")
except Exception as start_exc:
_write_log(self.log_path, f"Failed to start app after rollback: {start_exc}")
except Exception as rollback_exc:
_write_log(self.log_path, f"Rollback failed: {rollback_exc}")
self.failed.emit(str(exc))
class UpdaterWindow(QWidget):
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.setWindowTitle("Anabasis Updater")
self.setMinimumWidth(480)
self.log_path = os.path.join(app_dir, "update_error.log")
self.label = QLabel("Подготовка обновления...")
self.label.setWordWrap(True)
self.progress = QProgressBar()
self.progress.setRange(0, 7)
self.progress.setValue(0)
self.open_log_btn = QPushButton("Открыть лог")
self.open_log_btn.setEnabled(False)
self.open_log_btn.clicked.connect(self.open_log)
self.close_btn = QPushButton("Закрыть")
self.close_btn.setEnabled(False)
self.close_btn.clicked.connect(self.close)
layout = QVBoxLayout(self)
layout.addWidget(self.label)
layout.addWidget(self.progress)
actions = QHBoxLayout()
actions.addStretch(1)
actions.addWidget(self.open_log_btn)
actions.addWidget(self.close_btn)
layout.addLayout(actions)
self.thread = QThread(self)
self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir)
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.worker.status.connect(self.on_status)
self.worker.failed.connect(self.on_failed)
self.worker.done.connect(self.on_done)
self.worker.done.connect(self.thread.quit)
self.worker.failed.connect(self.thread.quit)
self.thread.start()
def on_status(self, step, text):
self.label.setText(text)
self.progress.setValue(max(0, min(7, int(step))))
def on_done(self):
self.label.setText("Обновление успешно применено. Приложение запущено.")
self.progress.setValue(7)
self.open_log_btn.setEnabled(True)
QTimer.singleShot(900, self.close)
def on_failed(self, error_text):
self.label.setText(
"Не удалось применить обновление.\n"
f"Причина: {error_text}\n"
"Подробности сохранены в update_error.log."
)
self.open_log_btn.setEnabled(True)
self.close_btn.setEnabled(True)
def open_log(self):
if os.path.exists(self.log_path):
QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--app-dir", required=True)
parser.add_argument("--source-dir", required=True)
parser.add_argument("--exe-name", required=True)
parser.add_argument("--target-pid", required=True)
parser.add_argument("--version", default="")
parser.add_argument("--work-dir", default="")
return parser.parse_args()
def main():
args = parse_args()
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = UpdaterWindow(
app_dir=args.app_dir,
source_dir=args.source_dir,
exe_name=args.exe_name,
target_pid=args.target_pid,
version=args.version,
work_dir=args.work_dir,
)
window.show()
return app.exec()
if __name__ == "__main__":
sys.exit(main())