32 Commits

Author SHA1 Message Date
Денисов Александр Андреевич
ac2013bcca Add logging for chat load and bump version to 2.2.4
Some checks failed
Desktop CI / tests (push) Successful in 17s
Desktop Release / release (push) Failing after 3m10s
2026-02-17 17:58:17 +03:00
c77ca4652b fix(ci): update syntax validation test file list
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m54s
2026-02-16 00:37:26 +03:00
4ec24c6d0f chore(version): bump to 2.2.3
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Has been cancelled
2026-02-16 00:35:38 +03:00
72edfffd9e fix(update): harden reentry state and add runtime regression test 2026-02-16 00:35:03 +03:00
db5d901435 test(main): replace brittle smoke checks with AST contracts 2026-02-16 00:31:38 +03:00
cd5e6e1f6b fix(update): prevent repeated check crash and bump to 2.2.2
All checks were successful
Desktop CI / tests (push) Successful in 15s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:59:57 +03:00
4b3347a069 fix(update,security): add release notes in updater and bump to 2.2.1
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:51:44 +03:00
e0628b1792 chore(version): bump to 2.2.0
Some checks are pending
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Has started running
2026-02-15 23:49:04 +03:00
201184700f docs(readme): update install steps and feature list 2026-02-15 23:48:15 +03:00
5253c942e8 fix(core,security): safe update extraction and async bulk vk actions 2026-02-15 23:42:51 +03:00
c645d964bf fix(auth,ui): add auth webview cli entrypoint and bulk action progress bar
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 17s
2026-02-15 23:31:01 +03:00
13890fbbfc fix(release): create annotated tags on current commit
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
- create release tag as annotated (-a) with explicit gitea.sha target

- pass target_commitish to Gitea release action for stable ordering
2026-02-15 23:18:49 +03:00
d7494c1092 feat(update): add setup fallback action and bump 2.1.2
All checks were successful
Desktop CI / tests (push) Successful in 14s
Desktop Release / release (push) Successful in 3m29s
- add installer asset detection in update service

- add 'Download and install (setup)' action in update dialog

- bump app version to 2.1.2 and extend update service test
2026-02-15 23:11:15 +03:00
67f6910435 feat(installer): restore Russian UI and setup icon
All checks were successful
Desktop CI / tests (push) Successful in 17s
Desktop Release / release (push) Successful in 3m31s
- re-enable Russian language entry and Russian labels in Inno script

- restore SetupIconFile via MyIconFile define

- pass MyIconFile from build.py with explicit file existence check
2026-02-15 23:03:57 +03:00
2c502fe3bf fix(installer): remove Russian language dependency in ISCC
All checks were successful
Desktop CI / tests (push) Successful in 14s
Desktop Release / release (push) Successful in 3m43s
- keep English-only MessagesFile to avoid compiler language path issues on runner

- switch post-install run description to English ASCII
2026-02-15 22:48:57 +03:00
02078282bc fix(ci-installer): remove dynamic path defines for ISCC
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 2m35s
- use static relative paths in .iss for source/output

- pass only version and /O override to ISCC

- add explicit source/script path diagnostics in build.py
2026-02-15 22:42:39 +03:00
b1ed97a826 fix(ci): write build log to runner temp and enforce repo cwd
Some checks failed
Desktop CI / tests (push) Successful in 15s
Desktop Release / release (push) Failing after 2m34s
- run build step from git toplevel directory

- store build.log under %RUNNER_TEMP% to avoid missing dist path

- guard log dump when file is absent
2026-02-15 22:38:27 +03:00
5be8ab9af7 fix(ci): improve release build diagnostics and encoding
Some checks failed
Desktop CI / tests (push) Successful in 17s
Desktop Release / release (push) Failing after 1m27s
- run build.py with UTF-8 env in release workflow

- capture full build output to dist/build.log and print it on failure

- extend ISCC output decoding in build.py with UTF-16 fallbacks
2026-02-15 22:34:03 +03:00
0f07fe250c fix(ci-installer): remove setup icon dependency for ISCC
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 2m32s
- build Inno installer without SetupIconFile to avoid code 2 failures in runner

- drop MyIconFile define and pass only essential defines

- make task description ASCII-only to avoid encoding issues
2026-02-15 22:22:32 +03:00
fc0c98ee49 fix(installer): stabilize ISCC script and output decoding
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 2m35s
- remove fragile preprocessor icon block in .iss

- decode ISCC stdout/stderr with cp1251/cp866 fallbacks for readable diagnostics
2026-02-15 22:16:13 +03:00
e22eac6de3 fix(ci): force PySide6 stubs in updater test
Some checks are pending
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Has started running
- replace any preloaded PySide6 modules with local stubs before importing updater_gui

- prevent headless linux failures from libGL dependency during unit tests
2026-02-15 22:14:48 +03:00
bf7e5e599e fix(installer): improve ISCC diagnostics and icon handling
Some checks failed
Desktop CI / tests (push) Failing after 13s
Desktop Release / release (push) Failing after 2m34s
- make SetupIconFile optional in Inno script

- print full ISCC stdout/stderr on build failure

- copy icon.ico into dist artifacts explicitly
2026-02-15 22:11:24 +03:00
d1714a86c7 fix(ci): avoid non-zero exit from iscc help probe
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Failing after 2m35s
- stop calling iscc /? in Ensure Inno Setup step

- resolve compiler path via Get-Command/Test-Path and finish with exit 0
2026-02-15 22:04:54 +03:00
781bf679ff chore(version): bump to 2.1.1
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Failing after 12s
- set APP_VERSION to 2.1.1
2026-02-15 22:03:46 +03:00
813dafd6b8 fix(updater,ci): headless tests and immediate app shutdown
Some checks failed
Desktop CI / tests (push) Failing after 14s
Desktop Release / release (push) Failing after 15s
- stub PySide6 in test_updater_gui to run on linux runner without libGL

- close main app immediately after launching updater, without blocking OK dialog
2026-02-15 22:01:51 +03:00
965d09d47c feat(installer): add Inno Setup packaging to release
- add installer/AnabasisManager.iss for per-user install without admin rights

- extend build.py to produce setup.exe via ISCC

- publish setup.exe and checksums in release workflow
2026-02-15 22:01:47 +03:00
1b4760167f chore(version): bump to 2.1.0
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Successful in 2m45s
- set APP_VERSION to 2.1.0 after merging dev into master
2026-02-15 21:56:40 +03:00
039c1fa38a merge: dev into master
- merge updater improvements, update channels, and ci/release workflow updates
2026-02-15 21:56:28 +03:00
df3a4c49c5 fix(build): use absolute icon path for updater pyinstaller
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 2m44s
- pass absolute icon path to PyInstaller to avoid missing icon in updater spec dir

- remove stale updater spec and use --clean before updater build
2026-02-15 21:51:36 +03:00
8d4bc10cb7 feat(updater): stage3 resilient gui update flow
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added retry-based file copy, rollback restart, and version marker validation in updater GUI

- added build step to write dist/version.txt for post-update validation

- added unit tests for updater helpers
2026-02-15 21:46:36 +03:00
a6cee33cf6 feat: improve updater flow and release channels
Some checks failed
Desktop Dev Pre-release / prerelease (push) Failing after 2m18s
- added dedicated GUI updater executable and integrated launch path from main app

- added stable/beta update channel selection with persisted settings and checker support

- expanded CI/release validation to include updater and full test discovery
2026-02-15 21:41:18 +03:00
b30437faef ci(dev): add automated prerelease workflow
All checks were successful
Desktop Dev Pre-release / prerelease (push) Successful in 1m52s
- publish dev prereleases on each push to dev

- use tag format vX.Y.Z-<short_commit>

- upload versioned zip and checksum assets
2026-02-15 21:34:28 +03:00
18 changed files with 1791 additions and 304 deletions

View File

@@ -28,7 +28,7 @@ jobs:
- name: Validate syntax - name: Validate syntax
run: | run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
- name: Run tests - name: Run tests
run: | run: |

View File

@@ -0,0 +1,148 @@
name: Desktop Dev Pre-release
on:
push:
branches:
- dev
workflow_dispatch:
jobs:
prerelease:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract prerelease metadata
id: meta
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$commit = (git rev-parse --short HEAD).Trim()
$tag = "v$version-$commit"
$archive = "AnabasisManager-$version-$commit"
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "commit=$commit`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "tag=$tag`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "archive=$archive`n", $utf8NoBom)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
Write-Host "Detected tag: $tag"
- name: Stop if prerelease already exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Pre-release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Pre-release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Prepare prerelease artifacts
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.meta.outputs.version }}"
$archiveBase = "${{ steps.meta.outputs.archive }}"
$srcZip = "dist/AnabasisManager-$version.zip"
$dstZip = "dist/$archiveBase.zip"
if (-not (Test-Path $srcZip)) {
throw "Archive not found: $srcZip"
}
Copy-Item -Path $srcZip -Destination $dstZip -Force
$hash = (Get-FileHash -Path $dstZip -Algorithm SHA256).Hash.ToLower()
"$hash $archiveBase.zip" | Set-Content -Path "dist/$archiveBase.zip.sha256" -Encoding UTF8
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$tag = "${{ steps.meta.outputs.tag }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Pre-release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: ${{ steps.meta.outputs.tag }}
name: Anabasis Manager ${{ steps.meta.outputs.version }} (dev ${{ steps.meta.outputs.commit }})
prerelease: true
body: |
Development pre-release for commit ${{ steps.meta.outputs.commit }}
Version base: ${{ steps.meta.outputs.version }}
files: |
dist/${{ steps.meta.outputs.archive }}.zip
dist/${{ steps.meta.outputs.archive }}.zip.sha256

View File

@@ -38,6 +38,28 @@ jobs:
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller pip install -r requirements.txt pyinstaller
- name: Ensure Inno Setup 6
shell: powershell
run: |
$isccPath = ""
$inPath = Get-Command iscc.exe -ErrorAction SilentlyContinue
if ($inPath) {
$isccPath = $inPath.Source
Write-Host "Inno Setup compiler found in PATH."
} elseif (Test-Path "C:\Program Files (x86)\Inno Setup 6\ISCC.exe") {
"C:\Program Files (x86)\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
$isccPath = "C:\Program Files (x86)\Inno Setup 6\ISCC.exe"
Write-Host "Inno Setup compiler found in Program Files (x86)."
} elseif (Test-Path "C:\Program Files\Inno Setup 6\ISCC.exe") {
"C:\Program Files\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
$isccPath = "C:\Program Files\Inno Setup 6\ISCC.exe"
Write-Host "Inno Setup compiler found in Program Files."
} else {
throw "Inno Setup 6 is not installed on runner. Install Inno Setup and restart runner service."
}
Write-Host "Using ISCC: $isccPath"
exit 0
- name: Extract app version - name: Extract app version
id: extract_version id: extract_version
shell: powershell shell: powershell
@@ -86,24 +108,49 @@ jobs:
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
shell: powershell shell: powershell
run: | run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip - name: Build release zip
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
shell: powershell shell: powershell
env:
PYTHONUTF8: "1"
PYTHONIOENCODING: "utf-8"
run: | run: |
python build.py $ErrorActionPreference = "Continue"
$repoRoot = (git rev-parse --show-toplevel).Trim()
if (-not [string]::IsNullOrWhiteSpace($repoRoot)) {
Set-Location $repoRoot
}
$logDir = Join-Path $env:RUNNER_TEMP "anabasis-build"
New-Item -ItemType Directory -Force -Path $logDir | Out-Null
$buildLog = Join-Path $logDir "build.log"
python build.py *>&1 | Tee-Object -FilePath $buildLog
$code = $LASTEXITCODE
if ($code -ne 0) {
Write-Host "Build failed with exit code $code. Dumping build log:"
if (Test-Path $buildLog) {
Get-Content -Path $buildLog -Raw
} else {
Write-Host "Build log was not created: $buildLog"
}
exit $code
}
- name: Ensure archive exists - name: Ensure artifacts exist
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
shell: powershell shell: powershell
run: | run: |
$version = "${{ steps.extract_version.outputs.version }}" $version = "${{ steps.extract_version.outputs.version }}"
$archivePath = "dist/AnabasisManager-$version.zip" $archivePath = "dist/AnabasisManager-$version.zip"
$installerPath = "dist/AnabasisManager-setup-$version.exe"
if (-not (Test-Path $archivePath)) { if (-not (Test-Path $archivePath)) {
throw "Archive not found: $archivePath" throw "Archive not found: $archivePath"
} }
if (-not (Test-Path $installerPath)) {
throw "Installer not found: $installerPath"
}
- name: Generate SHA256 checksum - name: Generate SHA256 checksum
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
@@ -111,11 +158,14 @@ jobs:
run: | run: |
$version = "${{ steps.extract_version.outputs.version }}" $version = "${{ steps.extract_version.outputs.version }}"
$archiveName = "AnabasisManager-$version.zip" $archiveName = "AnabasisManager-$version.zip"
$archivePath = "dist/$archiveName" $installerName = "AnabasisManager-setup-$version.exe"
$checksumPath = "dist/$archiveName.sha256" foreach ($name in @($archiveName, $installerName)) {
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower() $path = "dist/$name"
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8 $checksumPath = "dist/$name.sha256"
Write-Host "Checksum created: $checksumPath" $hash = (Get-FileHash -Path $path -Algorithm SHA256).Hash.ToLower()
"$hash $name" | Set-Content -Path $checksumPath -Encoding UTF8
Write-Host "Checksum created: $checksumPath"
}
- name: Configure git identity - name: Configure git identity
if: env.CONTINUE == 'true' if: env.CONTINUE == 'true'
@@ -130,9 +180,10 @@ jobs:
run: | run: |
$version = "${{ steps.extract_version.outputs.version }}" $version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version" $tag = "v$version"
$sha = "${{ gitea.sha }}"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1) $tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) { if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag" git tag -a "$tag" -m "Release $tag" "$sha"
git push origin "$tag" git push origin "$tag"
} else { } else {
Write-Host "Tag $tag already exists on origin, skipping tag push." Write-Host "Tag $tag already exists on origin, skipping tag push."
@@ -146,9 +197,12 @@ jobs:
repository: ${{ gitea.repository }} repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }} token: ${{ secrets.API_TOKEN }}
tag_name: v${{ steps.extract_version.outputs.version }} tag_name: v${{ steps.extract_version.outputs.version }}
target_commitish: ${{ gitea.sha }}
name: Anabasis Manager ${{ steps.extract_version.outputs.version }} name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
body: | body: |
Desktop release v${{ steps.extract_version.outputs.version }} Desktop release v${{ steps.extract_version.outputs.version }}
files: | files: |
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256 dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe.sha256

View File

@@ -13,9 +13,12 @@
* Моментальная загрузка всех доступных чатов пользователя. * Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»). * Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед. * Быстрое обновление списка бесед.
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
* Визуальный прогресс-бар по ходу операции.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`). * **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно. * **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса. * **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
--- ---
@@ -47,7 +50,7 @@
3. **Установите зависимости:** 3. **Установите зависимости:**
```bash ```bash
pip install PySide6 vk_api pip install -r requirements.txt
``` ```
4. **Запустите приложение:** 4. **Запустите приложение:**
@@ -68,6 +71,12 @@
## 📂 Техническая информация ## 📂 Техническая информация
### Последние обновления
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
### Сборка проекта (для разработчиков) ### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`. Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
@@ -101,4 +110,4 @@ python build.py
--- ---
Проект распространяется под лицензией MIT. Проект распространяется под лицензией MIT.
Сэкономьте часы ручного труда с Anabasis VK Chat Manager. Сэкономьте часы ручного труда с Anabasis VK Chat Manager.

View File

@@ -1 +1 @@
APP_VERSION = "2.0.0" APP_VERSION = "2.2.4"

View File

@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
webview.start(private_mode=False, storage_path=storage_path) webview.start(private_mode=False, storage_path=storage_path)
def main():
# Supports both: `python auth_webview.py <auth_url> <output_path>`
# and: `python auth_webview.py --auth <auth_url> <output_path>`
args = sys.argv[1:]
if len(args) == 3 and args[0] == "--auth":
auth_url, output_path = args[1], args[2]
elif len(args) == 2:
auth_url, output_path = args[0], args[1]
else:
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
return 1
main_auth(auth_url, output_path)
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

198
build.py
View File

@@ -4,14 +4,18 @@ import subprocess
import sys import sys
from app_version import APP_VERSION from app_version import APP_VERSION
# --- Конфигурация --- # --- Configuration ---
APP_NAME = "AnabasisManager" APP_NAME = "AnabasisManager"
UPDATER_NAME = "AnabasisUpdater"
VERSION = APP_VERSION # Единая версия приложения VERSION = APP_VERSION # Единая версия приложения
MAIN_SCRIPT = "main.py" MAIN_SCRIPT = "main.py"
UPDATER_SCRIPT = "updater_gui.py"
ICON_PATH = "icon.ico" ICON_PATH = "icon.ico"
INSTALLER_SCRIPT = os.path.join("installer", "AnabasisManager.iss")
DIST_DIR = os.path.join("dist", APP_NAME) DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"} INSTALLER_NAME = f"{APP_NAME}-setup-{VERSION}.exe"
SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
REMOVE_LIST = [ REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll", "Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll", "Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
@@ -22,16 +26,46 @@ REMOVE_LIST = [
] ]
def write_version_marker():
marker_path = os.path.join(DIST_DIR, "version.txt")
try:
os.makedirs(DIST_DIR, exist_ok=True)
with open(marker_path, "w", encoding="utf-8") as f:
f.write(str(VERSION).strip() + "\n")
print(f"[OK] Version marker written: {marker_path}")
except Exception as e:
print(f"[ERROR] Failed to write version.txt: {e}")
sys.exit(1)
def copy_icon_to_dist():
icon_abs_path = os.path.abspath(ICON_PATH)
if not os.path.exists(icon_abs_path):
print("[WARN] icon.ico not found, skipping icon copy into dist.")
return
try:
os.makedirs("dist", exist_ok=True)
os.makedirs(DIST_DIR, exist_ok=True)
shutil.copy2(icon_abs_path, os.path.join("dist", "icon.ico"))
shutil.copy2(icon_abs_path, os.path.join(DIST_DIR, "icon.ico"))
print("[OK] Icon copied to dist/icon.ico and dist/AnabasisManager/icon.ico")
except Exception as e:
print(f"[ERROR] Failed to copy icon.ico into dist: {e}")
sys.exit(1)
def ensure_project_root(): def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)] missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing: if missing:
print("[ERROR] Скрипт нужно запускать из корня проекта.") print("[ERROR] Run this script from the project root.")
print(f"[ERROR] Не найдены: {', '.join(missing)}") print(f"[ERROR] Missing files: {', '.join(missing)}")
sys.exit(1) sys.exit(1)
def run_build(): def run_build():
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---") print(f"--- 1. Running PyInstaller for {APP_NAME} v{VERSION} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
command = [ command = [
"pyinstaller", "pyinstaller",
@@ -42,8 +76,8 @@ def run_build():
"--exclude-module", "PySide6.QtWebEngineWidgets", "--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick", "--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}", f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "", f"--icon={icon_abs_path}" if has_icon else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "", f"--add-data={icon_abs_path}{os.pathsep}." if has_icon else "",
f"--add-data=auth_webview.py{os.pathsep}.", f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT MAIN_SCRIPT
] ]
@@ -52,14 +86,44 @@ def run_build():
try: try:
subprocess.check_call(command) subprocess.check_call(command)
print("\n[OK] Сборка PyInstaller завершена.") print("\n[OK] PyInstaller build completed.")
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"\n[ERROR] Ошибка при сборке: {e}") print(f"\n[ERROR] Build failed: {e}")
sys.exit(1)
def run_updater_build():
print(f"\n--- 1.2 Building {UPDATER_NAME} ---")
icon_abs_path = os.path.abspath(ICON_PATH)
has_icon = os.path.exists(icon_abs_path)
updater_spec_dir = os.path.join("build", "updater_spec")
updater_spec_path = os.path.join(updater_spec_dir, f"{UPDATER_NAME}.spec")
if os.path.exists(updater_spec_path):
os.remove(updater_spec_path)
command = [
"pyinstaller",
"--noconfirm",
"--clean",
"--onefile",
"--windowed",
f"--name={UPDATER_NAME}",
"--distpath", DIST_DIR,
"--workpath", os.path.join("build", "updater"),
"--specpath", updater_spec_dir,
f"--icon={icon_abs_path}" if has_icon else "",
UPDATER_SCRIPT,
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print(f"[OK] {UPDATER_NAME} built.")
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to build {UPDATER_NAME}: {e}")
sys.exit(1) sys.exit(1)
def run_cleanup(): def run_cleanup():
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---") print(f"\n--- 2. Optimizing {APP_NAME} folder ---")
# Пытаемся найти папку PySide6 внутри сборки # Пытаемся найти папку PySide6 внутри сборки
pyside_path = os.path.join(DIST_DIR, "PySide6") pyside_path = os.path.join(DIST_DIR, "PySide6")
@@ -74,21 +138,116 @@ def run_cleanup():
shutil.rmtree(path) shutil.rmtree(path)
else: else:
os.remove(path) os.remove(path)
print(f"Удалено: {item}") print(f"Removed: {item}")
except Exception as e: except Exception as e:
print(f"Пропуск {item}: {e}") print(f"Skipped {item}: {e}")
def create_archive(): def create_archive():
print(f"\n--- 3. Создание архива {ARCHIVE_NAME}.zip ---") print(f"\n--- 3. Creating archive {ARCHIVE_NAME}.zip ---")
try: try:
# Создаем zip-архив из папки DIST_DIR # Создаем zip-архив из папки DIST_DIR
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем # base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR) shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
print(f"[OK] Архив создан: dist/{ARCHIVE_NAME}.zip") print(f"[OK] Archive created: dist/{ARCHIVE_NAME}.zip")
except Exception as e: except Exception as e:
print(f"[ERROR] Не удалось создать архив: {e}") print(f"[ERROR] Failed to create archive: {e}")
def _find_iscc():
candidates = []
iscc_env = os.getenv("ISCC_PATH", "").strip()
if iscc_env:
candidates.append(iscc_env)
candidates.append(shutil.which("iscc"))
candidates.append(shutil.which("ISCC.exe"))
candidates.append(r"C:\Program Files (x86)\Inno Setup 6\ISCC.exe")
candidates.append(r"C:\Program Files\Inno Setup 6\ISCC.exe")
for candidate in candidates:
if candidate and os.path.exists(candidate):
return candidate
return ""
def _decode_process_output(raw_bytes):
if raw_bytes is None:
return ""
if isinstance(raw_bytes, str):
return raw_bytes
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "cp1251", "cp866", "latin-1"):
try:
return raw_bytes.decode(enc)
except Exception:
continue
return raw_bytes.decode("utf-8", errors="replace")
def build_installer():
print(f"\n--- 4. Building installer {INSTALLER_NAME} ---")
if os.name != "nt":
print("[INFO] Inno Setup installer is built only on Windows. Step skipped.")
return
if not os.path.exists(INSTALLER_SCRIPT):
print(f"[ERROR] Installer script not found: {INSTALLER_SCRIPT}")
sys.exit(1)
if not os.path.exists(DIST_DIR):
print(f"[ERROR] Build output folder not found: {DIST_DIR}")
sys.exit(1)
iscc_path = _find_iscc()
if not iscc_path:
print("[ERROR] Inno Setup Compiler (ISCC.exe) not found.")
print("[ERROR] Install Inno Setup 6 or set ISCC_PATH environment variable.")
sys.exit(1)
project_root = os.path.abspath(".")
source_dir = os.path.abspath(DIST_DIR)
output_dir = os.path.abspath("dist")
iss_path = os.path.abspath(INSTALLER_SCRIPT)
icon_path = os.path.abspath(ICON_PATH)
print(f"[INFO] ISCC source dir: {source_dir}")
print(f"[INFO] ISCC output dir: {output_dir}")
print(f"[INFO] ISCC script: {iss_path}")
print(f"[INFO] ISCC icon path: {icon_path}")
if not os.path.exists(source_dir):
print(f"[ERROR] Source dir does not exist: {source_dir}")
sys.exit(1)
if not os.path.exists(iss_path):
print(f"[ERROR] Installer script does not exist: {iss_path}")
sys.exit(1)
if not os.path.exists(icon_path):
print(f"[ERROR] Icon file does not exist: {icon_path}")
sys.exit(1)
command = [
iscc_path,
f"/DMyAppVersion={VERSION}",
f"/DMyIconFile={icon_path}",
f"/O{output_dir}",
iss_path,
]
try:
completed = subprocess.run(
command,
capture_output=True,
cwd=project_root,
check=False,
)
stdout_text = _decode_process_output(completed.stdout)
stderr_text = _decode_process_output(completed.stderr)
if stdout_text:
print(stdout_text.rstrip())
if stderr_text:
print(stderr_text.rstrip())
if completed.returncode != 0:
raise RuntimeError(f"ISCC exited with code {completed.returncode}")
installer_path = os.path.join("dist", INSTALLER_NAME)
if not os.path.exists(installer_path):
print(f"[ERROR] Installer was not created: {installer_path}")
sys.exit(1)
print(f"[OK] Installer created: {installer_path}")
except Exception as e:
print(f"[ERROR] Failed to build installer: {e}")
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
@@ -99,10 +258,15 @@ if __name__ == "__main__":
shutil.rmtree(folder) shutil.rmtree(folder)
run_build() run_build()
run_updater_build()
run_cleanup() run_cleanup()
copy_icon_to_dist()
write_version_marker()
create_archive() create_archive()
build_installer()
print("\n" + "=" * 30) print("\n" + "=" * 30)
print("ПРОЦЕСС ЗАВЕРШЕН") print("BUILD COMPLETED")
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip") print(f"Release archive: dist/{ARCHIVE_NAME}.zip")
print(f"Installer: dist/{INSTALLER_NAME}")
print("=" * 30) print("=" * 30)

View File

@@ -0,0 +1,42 @@
#define MyAppName "Anabasis Manager"
#ifndef MyAppVersion
#define MyAppVersion "0.0.0"
#endif
#ifndef MyIconFile
#define MyIconFile "..\icon.ico"
#endif
[Setup]
AppId={{6CD9D6F2-4B95-4E9C-A8D8-2A9C8F6AA741}
AppName={#MyAppName}
AppVersion={#MyAppVersion}
AppPublisher=Benya
DefaultDirName={localappdata}\Programs\Anabasis Manager
DefaultGroupName=Anabasis Manager
DisableProgramGroupPage=yes
PrivilegesRequired=lowest
OutputDir=..\dist
OutputBaseFilename=AnabasisManager-setup-{#MyAppVersion}
Compression=lzma2
SolidCompression=yes
WizardStyle=modern
ArchitecturesInstallIn64BitMode=x64compatible
UninstallDisplayIcon={app}\AnabasisManager.exe
SetupIconFile={#MyIconFile}
[Languages]
Name: "russian"; MessagesFile: "compiler:Languages\Russian.isl"
Name: "english"; MessagesFile: "compiler:Default.isl"
[Tasks]
Name: "desktopicon"; Description: "Создать ярлык на рабочем столе"; GroupDescription: "Дополнительные задачи:"
[Files]
Source: "..\dist\AnabasisManager\*"; DestDir: "{app}"; Flags: ignoreversion recursesubdirs createallsubdirs
[Icons]
Name: "{group}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"
Name: "{autodesktop}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"; Tasks: desktopicon
[Run]
Filename: "{app}\AnabasisManager.exe"; Description: "Запустить Anabasis Manager"; Flags: nowait postinstall skipifsilent

618
main.py
View File

@@ -1,10 +1,21 @@
import sys import json
import json
import time
import shutil
import auth_webview
import os import os
import threading import shutil
import sys
import time
from PySide6.QtCore import QProcess
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QTabWidget, QDialog, QDialogButtonBox,
QProgressBar)
from vk_api.exceptions import VkApiError
import auth_webview
from app_version import APP_VERSION from app_version import APP_VERSION
from services import ( from services import (
AutoUpdateService, AutoUpdateService,
@@ -18,20 +29,14 @@ from services import (
) )
from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog
from ui.main_window import instructions_text from ui.main_window import instructions_text
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QSizePolicy, QTabWidget, QDialog, QDialogButtonBox,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess
# --- Управление токенами и настройками --- # --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager") APP_DATA_DIR = os.path.join(
QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation),
"AnabasisVKChatManager",
)
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
SETTINGS_FILE = os.path.join(APP_DATA_DIR, "settings.json")
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup") CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log") LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
@@ -42,7 +47,9 @@ AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
UPDATE_REPOSITORY = "" UPDATE_REPOSITORY = ""
# Full repository URL is preferred (supports GitHub/Gitea). # Full repository URL is preferred (supports GitHub/Gitea).
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8 UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
def get_resource_path(relative_path): def get_resource_path(relative_path):
@@ -52,6 +59,86 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска # Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path) return os.path.join(os.path.abspath("."), relative_path)
class BulkActionWorker(QObject):
progress = Signal(int, int, str)
finished = Signal(dict)
auth_error = Signal(str, object, str)
failed = Signal(str)
done = Signal()
def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages):
super().__init__()
self.vk_call_with_retry = vk_call_with_retry
self.vk = vk_api
self.action_type = action_type
self.selected_chats = selected_chats
self.user_infos = user_infos
self.visible_messages = visible_messages
self.total = len(self.selected_chats) * len(self.user_infos)
@staticmethod
def _is_auth_error(exc):
return VkService.is_auth_error(exc, str(exc).lower())
def _emit_progress(self, processed):
label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add")
self.progress.emit(processed, self.total, label)
def run(self):
results = []
processed = 0
try:
for chat in self.selected_chats:
peer_id = None
if self.action_type == "admin":
try:
peer_id = 2000000000 + int(chat["id"])
except (ValueError, TypeError):
for _user_id, user_info in self.user_infos.items():
results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})")
processed += 1
self._emit_progress(processed)
continue
for user_id, user_info in self.user_infos.items():
try:
if self.action_type == "remove":
self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
elif self.action_type == "add":
params = {"chat_id": chat["id"], "user_id": user_id}
if self.visible_messages:
params["visible_messages_count"] = 250
self.vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
elif self.action_type == "admin":
self.vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin",
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
else:
raise RuntimeError(f"Unknown action: {self.action_type}")
except VkApiError as exc:
if self._is_auth_error(exc):
context = "set_user_admin" if self.action_type == "admin" else "execute_user_action"
action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями"
self.auth_error.emit(context, exc, action_name)
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}")
finally:
processed += 1
self._emit_progress(processed)
self.finished.emit({"results": results, "processed": processed, "total": self.total})
except Exception as exc:
self.failed.emit(str(exc))
finally:
self.done.emit()
class VkChatManager(QMainWindow): class VkChatManager(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -78,10 +165,19 @@ class VkChatManager(QMainWindow):
self._auth_ui_busy = False self._auth_ui_busy = False
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0 self._last_auth_relogin_ts = 0.0
self._active_action_button = None
self._active_action_button_text = ""
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None self.update_checker = None
self.update_thread = None self.update_thread = None
self._update_in_progress = False
self._update_check_silent = False self._update_check_silent = False
self._bulk_worker_thread = None
self._bulk_worker = None
self._bulk_action_context = None
self._bulk_action_success_message_title = ""
self._bulk_clear_inputs_on_success = True
self.resolve_timer = QTimer(self) self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True) self.resolve_timer.setSingleShot(True)
@@ -90,6 +186,7 @@ class VkChatManager(QMainWindow):
self._cleanup_cache_if_needed() self._cleanup_cache_if_needed()
self._ensure_log_dir() self._ensure_log_dir()
self._load_settings()
self.init_ui() self.init_ui()
self.load_saved_token_on_startup() self.load_saved_token_on_startup()
self.setup_token_timer() self.setup_token_timer()
@@ -114,7 +211,7 @@ class VkChatManager(QMainWindow):
layout.addWidget(self.token_input) layout.addWidget(self.token_input)
self.token_timer_label = QLabel("Срок действия токена: Н") self.token_timer_label = QLabel("Срок действия токена: Н")
self.token_timer_label.setAlignment(Qt.AlignRight) self.token_timer_label.setAlignment(Qt.AlignmentFlag.AlignRight)
layout.addWidget(self.token_timer_label) layout.addWidget(self.token_timer_label)
self.auth_btn = QPushButton("Авторизоваться через VK") self.auth_btn = QPushButton("Авторизоваться через VK")
@@ -175,8 +272,14 @@ class VkChatManager(QMainWindow):
self.add_user_btn.setMinimumHeight(50) self.add_user_btn.setMinimumHeight(50)
self.add_user_btn.clicked.connect(self.add_user_to_chat) self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn) layout.addWidget(self.add_user_btn)
self.operation_progress = QProgressBar()
self.operation_progress.setRange(0, 100)
self.operation_progress.setValue(0)
self.operation_progress.setTextVisible(True)
self.operation_progress.hide()
layout.addWidget(self.operation_progress)
self.status_label = QLabel("Статус: не авторизован") self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignCenter) self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.status_label) layout.addWidget(self.status_label)
layout.addStretch(1) layout.addStretch(1)
@@ -230,6 +333,26 @@ class VkChatManager(QMainWindow):
tools_menu.addAction(check_updates_action) tools_menu.addAction(check_updates_action)
self.check_updates_action = check_updates_action self.check_updates_action = check_updates_action
channel_menu = tools_menu.addMenu("Канал обновлений")
self.update_channel_group = QActionGroup(self)
self.update_channel_group.setExclusive(True)
stable_channel_action = QAction("Релизы (stable)", self)
stable_channel_action.setCheckable(True)
stable_channel_action.setChecked(self.update_channel == "stable")
stable_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("stable"))
channel_menu.addAction(stable_channel_action)
self.update_channel_group.addAction(stable_channel_action)
self.update_channel_stable_action = stable_channel_action
beta_channel_action = QAction("Бета (pre-release)", self)
beta_channel_action.setCheckable(True)
beta_channel_action.setChecked(self.update_channel == "beta")
beta_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("beta"))
channel_menu.addAction(beta_channel_action)
self.update_channel_group.addAction(beta_channel_action)
self.update_channel_beta_action = beta_channel_action
logout_action = QAction("Выйти и очистить", self) logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш") logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear) logout_action.triggered.connect(self.logout_and_clear)
@@ -259,12 +382,12 @@ class VkChatManager(QMainWindow):
"Поддерживается проверка обновлений и автообновление Windows-сборки.<br><br>" "Поддерживается проверка обновлений и автообновление Windows-сборки.<br><br>"
f"Репозиторий: {repo_html}" f"Репозиторий: {repo_html}"
) )
content.setTextFormat(Qt.RichText) content.setTextFormat(Qt.TextFormat.RichText)
content.setTextInteractionFlags(Qt.TextBrowserInteraction) content.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction)
content.setOpenExternalLinks(True) content.setOpenExternalLinks(True)
content.setWordWrap(True) content.setWordWrap(True)
button_box = QDialogButtonBox(QDialogButtonBox.Ok, parent=dialog) button_box = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok, parent=dialog)
button_box.accepted.connect(dialog.accept) button_box.accepted.connect(dialog.accept)
layout = QVBoxLayout(dialog) layout = QVBoxLayout(dialog)
@@ -281,8 +404,8 @@ class VkChatManager(QMainWindow):
scroll_area = QScrollArea() scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True) scroll_area.setWidgetResizable(True)
scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded)
tab_layout.addWidget(scroll_area) tab_layout.addWidget(scroll_area)
@@ -300,41 +423,104 @@ class VkChatManager(QMainWindow):
if hasattr(self, "check_updates_action"): if hasattr(self, "check_updates_action"):
self.check_updates_action.setEnabled(not in_progress) self.check_updates_action.setEnabled(not in_progress)
def _normalize_update_channel(self, value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _load_settings(self):
self.update_channel = UPDATE_CHANNEL_DEFAULT
if not os.path.exists(SETTINGS_FILE):
return
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
settings = json.load(f)
self.update_channel = self._normalize_update_channel(settings.get("update_channel"))
except Exception as e:
self._log_event("settings_load", f"Ошибка загрузки настроек: {e}", level="WARN")
def _save_settings(self):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
settings = {
"update_channel": self.update_channel,
}
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(settings, f, ensure_ascii=False, indent=2)
except Exception as e:
self._log_event("settings_save", f"Ошибка сохранения настроек: {e}", level="WARN")
def set_update_channel(self, channel):
normalized = self._normalize_update_channel(channel)
if normalized == self.update_channel:
return
self.update_channel = normalized
self._save_settings()
self.status_label.setText(
f"Статус: канал обновлений переключен на {'бета' if normalized == 'beta' else 'релизы'}."
)
self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False): def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive(): if self._update_in_progress:
self.status_label.setText("Статус: проверка обновлений уже выполняется...")
return return
self._update_check_silent = silent_no_updates self._update_check_silent = silent_no_updates
self._update_in_progress = True
self._set_update_action_state(True) self._set_update_action_state(True)
self.status_label.setText("Статус: проверка обновлений...") channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT) self.update_checker = UpdateChecker(
self.update_repository_url,
APP_VERSION,
request_timeout=UPDATE_REQUEST_TIMEOUT,
channel=self.update_channel,
)
self.update_thread = QThread(self)
self.update_checker.moveToThread(self.update_thread)
self.update_thread.started.connect(self.update_checker.run)
self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) self.update_checker.check_finished.connect(self.update_thread.quit)
self.update_checker.check_failed.connect(self.update_thread.quit)
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
self.update_thread.finished.connect(self._on_update_thread_finished)
self.update_thread.finished.connect(self.update_thread.deleteLater)
self.update_thread.start() self.update_thread.start()
def _on_update_check_finished(self, result): def _on_update_check_finished(self, result):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
if result.get("has_update"): if result.get("has_update"):
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown" latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
self.status_label.setText(f"Статус: доступно обновление {latest_version}") self.status_label.setText(f"Статус: доступно обновление {latest_version}")
message_box = QMessageBox(self) message_box = QMessageBox(self)
message_box.setIcon(QMessageBox.Information) message_box.setIcon(QMessageBox.Icon.Information)
message_box.setWindowTitle("Доступно обновление") message_box.setWindowTitle("Доступно обновление")
message_box.setText( message_box.setText(
f"Текущая версия: {result.get('current_version')}\n" f"Текущая версия: {result.get('current_version')}\n"
f"Доступная версия: {latest_version}\n\n" f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?" "Открыть страницу загрузки?"
) )
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole) release_notes = (result.get("release_notes") or "").strip()
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole) if release_notes:
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole) preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()]
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole) preview_text = "\n".join(preview_lines[:8])
if len(preview_lines) > 8:
preview_text += "\n..."
message_box.setInformativeText(f"Что нового:\n{preview_text}")
message_box.setDetailedText(release_notes)
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
setup_button = None
installer_url = result.get("installer_url")
if installer_url:
setup_button = message_box.addButton("Скачать и установить (setup)", QMessageBox.ButtonRole.AcceptRole)
releases_button = message_box.addButton("Релизы", QMessageBox.ButtonRole.ActionRole)
cancel_button = message_box.addButton("Позже", QMessageBox.ButtonRole.RejectRole)
message_box.setDefaultButton(update_now_button) message_box.setDefaultButton(update_now_button)
message_box.exec() message_box.exec()
@@ -348,6 +534,9 @@ class VkChatManager(QMainWindow):
if release_url: if release_url:
QDesktopServices.openUrl(QUrl(release_url)) QDesktopServices.openUrl(QUrl(release_url))
return return
if setup_button is not None and clicked is setup_button and installer_url:
QDesktopServices.openUrl(QUrl(installer_url))
return
if clicked is download_button and download_url: if clicked is download_button and download_url:
QDesktopServices.openUrl(QUrl(download_url)) QDesktopServices.openUrl(QUrl(download_url))
elif clicked in (download_button, releases_button) and release_url: elif clicked in (download_button, releases_button) and release_url:
@@ -356,14 +545,12 @@ class VkChatManager(QMainWindow):
self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN") self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN")
return return
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}).") channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}, канал: {channel_label}).")
if not self._update_check_silent: if not self._update_check_silent:
QMessageBox.information(self, "Обновления", "Установлена актуальная версия.") QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
def _on_update_check_failed(self, error_text): def _on_update_check_failed(self, error_text):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
self._log_event("update_check_failed", error_text, level="WARN") self._log_event("update_check_failed", error_text, level="WARN")
if not self.update_repository_url: if not self.update_repository_url:
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).") self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
@@ -380,6 +567,12 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent: if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text) QMessageBox.warning(self, "Проверка обновлений", error_text)
def _on_update_thread_finished(self):
self._set_update_action_state(False)
self._update_in_progress = False
self.update_checker = None
self.update_thread = None
def setup_token_timer(self): def setup_token_timer(self):
self.token_countdown_timer = QTimer(self) self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -453,7 +646,7 @@ class VkChatManager(QMainWindow):
self.status_label.setText(status_text) self.status_label.setText(status_text)
if busy: if busy:
self._busy = True self._busy = True
QApplication.setOverrideCursor(Qt.WaitCursor) QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
for widget in [ for widget in [
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
@@ -468,6 +661,110 @@ class VkChatManager(QMainWindow):
else: else:
self.set_ui_state(True) self.set_ui_state(True)
def _start_operation_progress(self, total, label_text, action_button=None):
total = max(1, int(total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(0)
self.operation_progress.setFormat(f"{label_text}: %v/%m")
self.operation_progress.show()
self._active_action_button = action_button
self._active_action_button_text = action_button.text() if action_button else ""
if action_button is not None:
action_button.setText(f"{label_text} (0/{total})")
action_button.setEnabled(False)
def _update_operation_progress(self, processed, total, label_text):
total = max(1, int(total))
processed = max(0, min(int(processed), total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(processed)
self.operation_progress.setFormat(f"{label_text}: {processed}/{total}")
if self._active_action_button is not None:
self._active_action_button.setText(f"{label_text} ({processed}/{total})")
def _finish_operation_progress(self):
self.operation_progress.hide()
self.operation_progress.setValue(0)
self.operation_progress.setFormat("%p%")
if self._active_action_button is not None:
self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text())
self._active_action_button = None
self._active_action_button_text = ""
def _start_bulk_action_worker(
self,
action_type,
selected_chats,
user_infos,
action_label,
action_button=None,
success_message_title="Результаты",
clear_inputs_on_success=True,
):
total = max(1, len(selected_chats) * len(user_infos))
self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action"
self._bulk_action_success_message_title = success_message_title
self._bulk_clear_inputs_on_success = clear_inputs_on_success
self._log_event(
"bulk_action",
f"start action={action_type} chats={len(selected_chats)} users={len(user_infos)}",
)
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
self._start_operation_progress(total, action_label, action_button=action_button)
worker = BulkActionWorker(
vk_call_with_retry=self._vk_call_with_retry,
vk_api=self.vk,
action_type=action_type,
selected_chats=selected_chats,
user_infos=user_infos,
visible_messages=self.visible_messages_checkbox.isChecked(),
)
thread = QThread(self)
worker.moveToThread(thread)
thread.started.connect(worker.run)
worker.progress.connect(self._on_bulk_action_progress)
worker.finished.connect(self._on_bulk_action_finished)
worker.auth_error.connect(self._on_bulk_action_auth_error)
worker.failed.connect(self._on_bulk_action_failed)
worker.done.connect(self._on_bulk_action_done)
worker.done.connect(thread.quit)
worker.done.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
self._bulk_worker = worker
self._bulk_worker_thread = thread
thread.start()
def _on_bulk_action_progress(self, processed, total, label):
label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label)
self._update_operation_progress(processed, total, label_text)
self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...")
def _on_bulk_action_finished(self, payload):
results = payload.get("results", []) if isinstance(payload, dict) else []
processed = payload.get("processed") if isinstance(payload, dict) else None
total = payload.get("total") if isinstance(payload, dict) else None
if processed is not None and total is not None:
self._log_event("bulk_action", f"done processed={processed} total={total}")
QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results))
if self._bulk_clear_inputs_on_success:
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def _on_bulk_action_auth_error(self, context, exc, action_name):
self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name)
def _on_bulk_action_failed(self, error_text):
QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}")
def _on_bulk_action_done(self):
self._finish_operation_progress()
self._set_busy(False)
self._bulk_worker = None
self._bulk_worker_thread = None
def _ensure_log_dir(self): def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True) os.makedirs(APP_DATA_DIR, exist_ok=True)
@@ -478,8 +775,8 @@ class VkChatManager(QMainWindow):
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss") timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
with open(LOG_FILE, "a", encoding="utf-8") as f: with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [{level}] {context}: {message}\n") f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
except Exception: except Exception as exc:
pass sys.stderr.write(f"[WARN] log_write_failed: {exc}\n")
def _log_error(self, context, exc): def _log_error(self, context, exc):
self._log("ERROR", context, self._format_vk_error(exc)) self._log("ERROR", context, self._format_vk_error(exc))
@@ -496,8 +793,8 @@ class VkChatManager(QMainWindow):
if os.path.exists(LOG_BACKUP_FILE): if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE) os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE) os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception: except Exception as exc:
pass sys.stderr.write(f"[WARN] log_rotate_failed: {exc}\n")
def _format_vk_error(self, exc): def _format_vk_error(self, exc):
error = getattr(exc, "error", None) error = getattr(exc, "error", None)
@@ -537,9 +834,9 @@ class VkChatManager(QMainWindow):
self, self,
"Подтверждение выхода", "Подтверждение выхода",
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?", "Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
QMessageBox.Yes | QMessageBox.No QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
) )
if confirm != QMessageBox.Yes: if confirm != QMessageBox.StandardButton.Yes:
return return
self._clear_auth_state(stop_timer=True, remove_token_file=True) self._clear_auth_state(stop_timer=True, remove_token_file=True)
@@ -596,7 +893,7 @@ class VkChatManager(QMainWindow):
def _on_auth_process_error(self, process_error): def _on_auth_process_error(self, process_error):
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}" self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
# For failed starts Qt may not emit finished(), so release UI here. # For failed starts Qt may not emit finished(), so release UI here.
if self.auth_process and self.auth_process.state() == QProcess.NotRunning: if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:
output_path = self.auth_output_path output_path = self.auth_output_path
self.auth_output_path = None self.auth_output_path = None
self.auth_process = None self.auth_process = None
@@ -609,8 +906,8 @@ class VkChatManager(QMainWindow):
try: try:
if output_path and os.path.exists(output_path): if output_path and os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
def _on_auth_process_finished(self, exit_code, _exit_status): def _on_auth_process_finished(self, exit_code, _exit_status):
output_path = self.auth_output_path output_path = self.auth_output_path
@@ -647,8 +944,8 @@ class VkChatManager(QMainWindow):
try: try:
if os.path.exists(output_path): if os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
else: else:
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN") self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
@@ -656,10 +953,11 @@ class VkChatManager(QMainWindow):
self.handle_new_auth_token(token, expires_in) self.handle_new_auth_token(token, expires_in)
def start_auth(self, keep_status_text=False): def start_auth(self, keep_status_text=False):
if self.auth_process and self.auth_process.state() != QProcess.NotRunning: if self.auth_process and self.auth_process.state() != QProcess.ProcessState.NotRunning:
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.") self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
return return
self._log_event("auth_process", "start")
if keep_status_text and hasattr(self, "_relogin_status_text"): if keep_status_text and hasattr(self, "_relogin_status_text"):
status_text = self._relogin_status_text status_text = self._relogin_status_text
self._relogin_status_text = None self._relogin_status_text = None
@@ -680,8 +978,8 @@ class VkChatManager(QMainWindow):
try: try:
if os.path.exists(output_path): if os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить старый файл результата авторизации: {exc}", level="WARN")
program, args = self._build_auth_command(auth_url, output_path) program, args = self._build_auth_command(auth_url, output_path)
self.auth_output_path = output_path self.auth_output_path = output_path
@@ -701,14 +999,29 @@ class VkChatManager(QMainWindow):
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
return return
self._log_event("auth_process", f"success expires_in={expires_in}")
self.token = token self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время) # Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = token_store_save_token( try:
self.token, self.token_expiration_time = token_store_save_token(
TOKEN_FILE, self.token,
APP_DATA_DIR, TOKEN_FILE,
expires_in=expires_in, APP_DATA_DIR,
) expires_in=expires_in,
)
except Exception as e:
try:
expires_value = int(expires_in)
except (TypeError, ValueError):
expires_value = 0
self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0
self._log_event("token_store_save", str(e), level="WARN")
QMessageBox.warning(
self,
"Предупреждение",
"Не удалось безопасно сохранить токен на диске. "
"Текущая сессия активна, но после перезапуска потребуется повторная авторизация.",
)
self.token_input.setText(self.token[:50] + "...") self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован") self.status_label.setText("Статус: авторизован")
@@ -820,7 +1133,8 @@ class VkChatManager(QMainWindow):
try: try:
user = self.vk.users.get(user_ids=user_id)[0] user = self.vk.users.get(user_ids=user_id)[0]
return f"{user.get('first_name', '')} {user.get('last_name', '')}" return f"{user.get('first_name', '')} {user.get('last_name', '')}"
except Exception: except Exception as exc:
self._log_event("get_user_info", f"Не удалось получить имя пользователя {user_id}: {exc}", level="WARN")
return f"Пользователь {user_id}" return f"Пользователь {user_id}"
def _get_selected_chats(self): def _get_selected_chats(self):
@@ -832,7 +1146,7 @@ class VkChatManager(QMainWindow):
selected.append({'id': chat_id, 'title': title}) selected.append({'id': chat_id, 'title': title})
return selected return selected
def _execute_user_action(self, action_type): def _execute_user_action(self, action_type, action_button=None):
if not self.user_ids_to_process: if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
return return
@@ -880,9 +1194,9 @@ class VkChatManager(QMainWindow):
confirm_dialog = QMessageBox(self) confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение действия") confirm_dialog.setWindowTitle("Подтверждение действия")
confirm_dialog.setText(msg) confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question) confirm_dialog.setIcon(QMessageBox.Icon.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole)
confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию
confirm_dialog.exec() confirm_dialog.exec()
@@ -890,44 +1204,23 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
results = [] action_label = "исключение" if action_type == "remove" else "приглашение"
total = len(selected_chats) * len(user_infos) self._start_bulk_action_worker(
processed = 0 action_type=action_type,
try: selected_chats=selected_chats,
action_label = "исключение" if action_type == "remove" else "приглашение" user_infos=user_infos,
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") action_label=action_label,
for chat in selected_chats: action_button=action_button,
for user_id, user_info in user_infos.items(): success_message_title="Результаты",
try: clear_inputs_on_success=True,
if action_type == "remove": )
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id) return
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def remove_user(self): def remove_user(self):
self._execute_user_action("remove") self._execute_user_action("remove", action_button=self.remove_user_btn)
def add_user_to_chat(self): def add_user_to_chat(self):
self._execute_user_action("add") self._execute_user_action("add", action_button=self.add_user_btn)
def set_user_admin(self): def set_user_admin(self):
"""Назначает пользователя администратором чата.""" """Назначает пользователя администратором чата."""
@@ -955,9 +1248,9 @@ class VkChatManager(QMainWindow):
confirm_dialog = QMessageBox(self) confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение прав") confirm_dialog.setWindowTitle("Подтверждение прав")
confirm_dialog.setText(msg) confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question) confirm_dialog.setIcon(QMessageBox.Icon.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole)
confirm_dialog.setDefaultButton(no_button) confirm_dialog.setDefaultButton(no_button)
confirm_dialog.exec() confirm_dialog.exec()
@@ -965,46 +1258,16 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
# 4. Выполнение API запросов self._start_bulk_action_worker(
results = [] action_type="admin",
total = len(selected_chats) * len(user_infos) selected_chats=selected_chats,
processed = 0 user_infos=user_infos,
try: action_label="назначение админов",
self._set_busy(True, f"Статус: назначение админов (0/{total})...") action_button=None,
for chat in selected_chats: success_message_title="Результаты назначения",
# VK API требует peer_id. Для чатов это 2000000000 + local_id clear_inputs_on_success=True,
try: )
peer_id = 2000000000 + int(chat['id']) return
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here. # Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list): def _process_links_list(self, links_list):
@@ -1012,6 +1275,7 @@ class VkChatManager(QMainWindow):
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return return
self._log_event("resolve_ids", f"start count={len(links_list)}")
self.user_ids_to_process.clear() self.user_ids_to_process.clear()
resolved_ids = [] resolved_ids = []
failed_links = [] failed_links = []
@@ -1035,6 +1299,10 @@ class VkChatManager(QMainWindow):
self.user_ids_to_process = resolved_ids self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)." status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
self._log_event(
"resolve_ids",
f"done resolved={len(resolved_ids)} failed={len(failed_links)}",
)
if len(links_list) > 1: if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
@@ -1061,17 +1329,40 @@ class VkChatManager(QMainWindow):
try: try:
self._set_busy(True, "Статус: загрузка чатов...") self._set_busy(True, "Статус: загрузка чатов...")
self._log_event("load_chats", "start")
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk) conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
type_counts = {}
non_chat_samples = []
missing_title_count = 0
for conv in conversations: for conv in conversations:
if conv["conversation"]["peer"]["type"] != "chat": conv_info = conv.get("conversation", {})
peer = conv_info.get("peer", {})
peer_type = peer.get("type", "unknown")
type_counts[peer_type] = type_counts.get(peer_type, 0) + 1
if peer_type != "chat":
if len(non_chat_samples) < 30:
non_chat_samples.append(
{
"type": peer_type,
"peer_id": peer.get("id"),
"local_id": peer.get("local_id"),
"title": (conv_info.get("chat_settings") or {}).get("title", ""),
}
)
continue continue
chat_id = conv["conversation"]["peer"]["local_id"] chat_id = peer.get("local_id")
title = conv["conversation"]["chat_settings"]["title"] chat_settings = conv_info.get("chat_settings") or {}
title = chat_settings.get("title", "")
if not title:
missing_title_count += 1
self.chats.append({"id": chat_id, "title": title}) self.chats.append({"id": chat_id, "title": title})
checkbox = QCheckBox(f"{title} (id: {chat_id})") checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id) checkbox.setProperty("chat_id", chat_id)
if "группа магазинов" in title.casefold():
self._log_event("load_chats", f"chat_match title='{title}' id={chat_id}")
if "AG офис" in title: if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox) layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox) self.office_chat_checkboxes.append(checkbox)
@@ -1093,6 +1384,17 @@ class VkChatManager(QMainWindow):
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
self._log_event(
"load_chats",
(
f"done total={len(conversations)} "
f"chats={len(self.chats)} "
f"type_counts={type_counts} "
f"missing_titles={missing_title_count}"
),
)
if non_chat_samples:
self._log_event("load_chats", f"non_chat_samples={non_chat_samples}")
except VkApiError as e: except VkApiError as e:
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"): if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
return return
@@ -1129,20 +1431,17 @@ class VkChatManager(QMainWindow):
download_name=download_name, download_name=download_name,
) )
app_exe = sys.executable app_exe = sys.executable
script_path = AutoUpdateService.build_update_script( AutoUpdateService.launch_gui_updater(
app_dir=os.path.dirname(app_exe), app_exe=app_exe,
source_dir=source_dir, source_dir=source_dir,
exe_name=os.path.basename(app_exe), work_dir=work_dir,
target_pid=os.getpid(), target_pid=os.getpid(),
version=latest_version,
) )
AutoUpdateService.launch_update_script(script_path, work_dir)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}") self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information( self.status_label.setText("Статус: обновление запущено, закрываю приложение...")
self, self.close()
"Обновление запущено", QTimer.singleShot(0, QApplication.instance().quit)
"Обновление скачано. Приложение будет перезапущено.",
)
QApplication.instance().quit()
return True return True
except Exception as e: except Exception as e:
self._log_event("auto_update_failed", str(e), level="ERROR") self._log_event("auto_update_failed", str(e), level="ERROR")
@@ -1158,7 +1457,8 @@ if __name__ == "__main__":
idx = sys.argv.index("--auth") idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1] auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2] output_path = sys.argv[idx + 2]
except Exception: except Exception as exc:
sys.stderr.write(f"[ERROR] auth_cli_args_invalid: {exc}\n")
sys.exit(1) sys.exit(1)
auth_webview.main_auth(auth_url, output_path) auth_webview.main_auth(auth_url, output_path)
sys.exit(0) sys.exit(0)

View File

@@ -9,6 +9,18 @@ import zipfile
class AutoUpdateService: class AutoUpdateService:
@staticmethod
def _safe_extract_zip(archive, destination_dir):
destination_real = os.path.realpath(destination_dir)
for member in archive.infolist():
member_name = member.filename or ""
if not member_name:
continue
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
archive.extractall(destination_dir)
@staticmethod @staticmethod
def download_update_archive(download_url, destination_path): def download_update_archive(download_url, destination_path):
request = urllib.request.Request( request = urllib.request.Request(
@@ -104,12 +116,26 @@ class AutoUpdateService:
"if %ERRORLEVEL% EQU 0 (", "if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1", " set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (", " if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID% to exit >> \"%UPDATE_LOG%\"", " echo Timeout waiting for process %TARGET_PID%, attempting force stop >> \"%UPDATE_LOG%\"",
" goto :backup", " taskkill /PID %TARGET_PID% /T /F >nul 2>&1",
" timeout /t 2 /nobreak >nul",
" tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
" if %ERRORLEVEL% EQU 0 goto :pid_still_running",
" goto :wait_image_unlock",
" )", " )",
" timeout /t 1 /nobreak >nul", " timeout /t 1 /nobreak >nul",
" goto :wait_for_exit", " goto :wait_for_exit",
")", ")",
":wait_image_unlock",
"set /a IMG_LOOPS=0",
":check_image",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a IMG_LOOPS+=1",
" if %IMG_LOOPS% GEQ 60 goto :image_still_running",
" timeout /t 1 /nobreak >nul",
" goto :check_image",
")",
":backup", ":backup",
"timeout /t 1 /nobreak >nul", "timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1", "mkdir \"%BACKUP_DIR%\" >nul 2>&1",
@@ -134,6 +160,12 @@ class AutoUpdateService:
":backup_error", ":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"", "echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%", "exit /b %RC%",
":pid_still_running",
"echo Auto-update aborted: process %TARGET_PID% is still running after force stop. >> \"%UPDATE_LOG%\"",
"exit /b 4",
":image_still_running",
"echo Auto-update aborted: %EXE_NAME% still running and file lock may remain. >> \"%UPDATE_LOG%\"",
"exit /b 5",
] ]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n") f.write("\r\n".join(script_lines) + "\r\n")
@@ -152,6 +184,40 @@ class AutoUpdateService:
creationflags=creation_flags, creationflags=creation_flags,
) )
@staticmethod
def launch_gui_updater(app_exe, source_dir, work_dir, target_pid, version=""):
app_dir = os.path.dirname(app_exe)
exe_name = os.path.basename(app_exe)
updater_exe = os.path.join(app_dir, "AnabasisUpdater.exe")
if not os.path.exists(updater_exe):
raise RuntimeError("Файл AnabasisUpdater.exe не найден в папке приложения.")
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
[
updater_exe,
"--app-dir",
app_dir,
"--source-dir",
source_dir,
"--exe-name",
exe_name,
"--target-pid",
str(target_pid),
"--version",
str(version or ""),
"--work-dir",
str(work_dir or ""),
],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod @classmethod
def prepare_update(cls, download_url, checksum_url, download_name): def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_") work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
@@ -161,6 +227,6 @@ class AutoUpdateService:
cls.verify_update_checksum(zip_path, checksum_url, download_name) cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True) os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive: with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir) cls._safe_extract_zip(archive, unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir) source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir return work_dir, source_dir

View File

@@ -91,8 +91,8 @@ def save_token(token, token_file, app_data_dir, expires_in=0):
try: try:
stored_token = _encrypt_token(token) stored_token = _encrypt_token(token)
encrypted = True encrypted = True
except Exception: except Exception as exc:
pass raise RuntimeError("Failed to securely store token with DPAPI.") from exc
data = { data = {
"token": stored_token, "token": stored_token,

View File

@@ -5,7 +5,42 @@ import urllib.error
import urllib.request import urllib.request
from urllib.parse import urlparse from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text): def _version_key(version_text):
@@ -38,6 +73,91 @@ def _sanitize_repo_url(value):
return f"{parsed.scheme}://{parsed.netloc}{clean_path}" return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _normalize_update_channel(value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _select_release_from_list(releases):
for item in releases:
if not isinstance(item, dict):
continue
if item.get("draft"):
continue
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
if not tag_name:
continue
return item
return None
def _extract_release_payload(release_data, repository_url, current_version):
parsed = urlparse(repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
releases_url = f"{base_url}/{repo_path}/releases"
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
installer_url = ""
installer_name = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
url = asset.get("browser_download_url", "")
name = asset.get("name", "")
name_lower = name.lower()
if installer_url:
break
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
installer_url = url
installer_name = name
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
return {
"repository_url": repository_url,
"latest_version": latest_version,
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"release_notes": release_notes,
"download_url": download_url,
"download_name": download_name,
"installer_url": installer_url,
"installer_name": installer_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, current_version),
}
def detect_update_repository_url(configured_url="", configured_repo=""): def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url: if env_url:
@@ -74,11 +194,12 @@ class UpdateChecker(QObject):
check_finished = Signal(dict) check_finished = Signal(dict)
check_failed = Signal(str) check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8): def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
super().__init__() super().__init__()
self.repository_url = repository_url self.repository_url = repository_url
self.current_version = current_version self.current_version = current_version
self.request_timeout = request_timeout self.request_timeout = request_timeout
self.channel = _normalize_update_channel(channel)
def run(self): def run(self):
if not self.repository_url: if not self.repository_url:
@@ -92,10 +213,17 @@ class UpdateChecker(QObject):
self.check_failed.emit("Некорректный URL репозитория обновлений.") self.check_failed.emit("Некорректный URL репозитория обновлений.")
return return
use_beta_channel = self.channel == "beta"
if parsed.netloc.lower().endswith("github.com"): if parsed.netloc.lower().endswith("github.com"):
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" if use_beta_channel:
api_url = f"https://api.github.com/repos/{repo_path}/releases"
else:
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else: else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" if use_beta_channel:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases" releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request( request = urllib.request.Request(
api_url, api_url,
@@ -106,7 +234,7 @@ class UpdateChecker(QObject):
) )
try: try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response: with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
release_data = json.loads(response.read().decode("utf-8")) response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return return
@@ -117,47 +245,20 @@ class UpdateChecker(QObject):
self.check_failed.emit(f"Не удалось проверить обновления: {e}") self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return return
latest_tag = release_data.get("tag_name") or release_data.get("name") or "" release_data = response_data
latest_version = latest_tag.lstrip("vV").strip() if use_beta_channel:
html_url = release_data.get("html_url") or releases_url if not isinstance(response_data, list):
assets = release_data.get("assets") or [] self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
download_url = "" return
download_name = "" release_data = _select_release_from_list(response_data)
checksum_url = "" if not release_data:
for asset in assets: self.check_failed.emit("В канале beta не найдено доступных релизов.")
url = asset.get("browser_download_url", "") return
if url.lower().endswith(".zip"): elif not isinstance(response_data, dict):
download_url = url self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
download_name = asset.get("name", "") return
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
payload["release_channel"] = self.channel
payload["releases_url"] = releases_url
self.check_finished.emit(payload)

View File

@@ -1,60 +0,0 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.build_update_script", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,97 @@
import unittest
from types import SimpleNamespace
from unittest import mock
class _DummySignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _DummyThread:
created = 0
def __init__(self, _parent=None):
type(self).created += 1
self.started = _DummySignal()
self.finished = _DummySignal()
def start(self):
self.started.emit()
def quit(self):
self.finished.emit()
def deleteLater(self):
return None
class _DummyChecker:
created = 0
def __init__(self, *_args, **_kwargs):
type(self).created += 1
self.check_finished = _DummySignal()
self.check_failed = _DummySignal()
def moveToThread(self, _thread):
return None
def run(self):
return None
def deleteLater(self):
return None
class UpdateReentryRuntimeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
import main # noqa: PLC0415
except Exception as exc:
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
cls.main = main
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
_DummyChecker.created = 0
_DummyThread.created = 0
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
manager._update_in_progress = False
manager._update_check_silent = False
manager.update_channel = "stable"
manager.update_repository_url = "https://example.com/org/repo"
manager.update_checker = None
manager.update_thread = None
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
manager._log_event = lambda *_args, **_kwargs: None
manager._set_update_action_state = lambda *_args, **_kwargs: None
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertTrue(manager._update_in_progress)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
first_thread = manager.update_thread
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
self.assertIs(manager.update_thread, first_thread)
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
self.assertFalse(manager._update_in_progress)
self.assertIsNone(manager.update_checker)
self.assertIsNone(manager.update_thread)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,53 @@
import unittest
import importlib.util
from pathlib import Path
MODULE_PATH = Path("services/update_service.py")
SPEC = importlib.util.spec_from_file_location("update_service_under_test", MODULE_PATH)
update_service = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(update_service)
class UpdateServiceTests(unittest.TestCase):
def test_normalize_update_channel(self):
self.assertEqual(update_service._normalize_update_channel("stable"), "stable")
self.assertEqual(update_service._normalize_update_channel("beta"), "beta")
self.assertEqual(update_service._normalize_update_channel("pre-release"), "beta")
self.assertEqual(update_service._normalize_update_channel("unknown"), "stable")
self.assertEqual(update_service._normalize_update_channel(""), "stable")
def test_select_release_from_list_skips_drafts(self):
releases = [
{"tag_name": "v2.0.0", "draft": True},
{"tag_name": "", "draft": False},
{"tag_name": "v1.9.0-beta.1", "draft": False},
]
selected = update_service._select_release_from_list(releases)
self.assertIsNotNone(selected)
self.assertEqual(selected["tag_name"], "v1.9.0-beta.1")
def test_extract_release_payload_uses_zip_and_checksum(self):
release_data = {
"tag_name": "v1.7.2",
"html_url": "https://example.com/release/v1.7.2",
"assets": [
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
{"name": "AnabasisManager-setup-1.7.2.exe", "browser_download_url": "https://example.com/setup.exe"},
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
],
}
payload = update_service._extract_release_payload(
release_data=release_data,
repository_url="https://git.daemonlord.ru/benya/AnabasisChatRemove",
current_version="1.7.1",
)
self.assertEqual(payload["latest_version"], "1.7.2")
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
self.assertEqual(payload["installer_url"], "https://example.com/setup.exe")
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
self.assertTrue(payload["has_update"])
if __name__ == "__main__":
unittest.main()

107
tests/test_updater_gui.py Normal file
View File

@@ -0,0 +1,107 @@
import importlib.util
import sys
import tempfile
import unittest
from pathlib import Path
import types
def _install_pyside6_stubs():
pyside6_module = types.ModuleType("PySide6")
pyside6_module.__path__ = [] # treat as package
qtcore_module = types.ModuleType("PySide6.QtCore")
qtgui_module = types.ModuleType("PySide6.QtGui")
qtwidgets_module = types.ModuleType("PySide6.QtWidgets")
class _Signal:
def __init__(self, *args, **kwargs):
pass
def connect(self, *args, **kwargs):
pass
class _QObject:
pass
class _QThread:
def __init__(self, *args, **kwargs):
pass
class _QTimer:
@staticmethod
def singleShot(*args, **kwargs):
pass
class _QUrl:
@staticmethod
def fromLocalFile(path):
return path
class _QDesktopServices:
@staticmethod
def openUrl(*args, **kwargs):
return True
class _Widget:
def __init__(self, *args, **kwargs):
pass
qtcore_module.QObject = _QObject
qtcore_module.Qt = type("Qt", (), {})
qtcore_module.QThread = _QThread
qtcore_module.Signal = _Signal
qtcore_module.QTimer = _QTimer
qtcore_module.QUrl = _QUrl
qtgui_module.QDesktopServices = _QDesktopServices
qtwidgets_module.QApplication = _Widget
qtwidgets_module.QLabel = _Widget
qtwidgets_module.QProgressBar = _Widget
qtwidgets_module.QVBoxLayout = _Widget
qtwidgets_module.QWidget = _Widget
qtwidgets_module.QPushButton = _Widget
qtwidgets_module.QHBoxLayout = _Widget
# Force stubs even if real PySide6 was imported earlier in the process.
for mod_name in list(sys.modules.keys()):
if mod_name == "PySide6" or mod_name.startswith("PySide6."):
del sys.modules[mod_name]
sys.modules["PySide6"] = pyside6_module
sys.modules["PySide6.QtCore"] = qtcore_module
sys.modules["PySide6.QtGui"] = qtgui_module
sys.modules["PySide6.QtWidgets"] = qtwidgets_module
MODULE_PATH = Path("updater_gui.py")
_install_pyside6_stubs()
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
updater_gui = importlib.util.module_from_spec(SPEC)
SPEC.loader.exec_module(updater_gui)
class UpdaterGuiTests(unittest.TestCase):
def test_read_version_marker(self):
with tempfile.TemporaryDirectory() as tmp_dir:
marker = Path(tmp_dir) / "version.txt"
marker.write_text("2.0.1\n", encoding="utf-8")
value = updater_gui._read_version_marker(tmp_dir)
self.assertEqual(value, "2.0.1")
def test_mirror_tree_skips_selected_file(self):
with tempfile.TemporaryDirectory() as src_tmp, tempfile.TemporaryDirectory() as dst_tmp:
src = Path(src_tmp)
dst = Path(dst_tmp)
(src / "keep.txt").write_text("ok", encoding="utf-8")
(src / "skip.bin").write_text("x", encoding="utf-8")
(src / "sub").mkdir()
(src / "sub" / "nested.txt").write_text("nested", encoding="utf-8")
updater_gui._mirror_tree(str(src), str(dst), skip_names={"skip.bin"})
self.assertTrue((dst / "keep.txt").exists())
self.assertTrue((dst / "sub" / "nested.txt").exists())
self.assertFalse((dst / "skip.bin").exists())
if __name__ == "__main__":
unittest.main()

276
updater_gui.py Normal file
View File

@@ -0,0 +1,276 @@
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import time
from PySide6.QtCore import QObject, Qt, QThread, Signal, QTimer, QUrl
from PySide6.QtGui import QDesktopServices
from PySide6.QtWidgets import QApplication, QLabel, QProgressBar, QVBoxLayout, QWidget, QPushButton, QHBoxLayout
def _write_log(log_path, message):
try:
os.makedirs(os.path.dirname(log_path), exist_ok=True)
with open(log_path, "a", encoding="utf-8") as f:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{ts}] {message.rstrip()}\n")
except Exception:
pass
def _is_pid_running(pid):
if pid <= 0:
return False
try:
completed = subprocess.run(
["tasklist", "/FI", f"PID eq {pid}"],
capture_output=True,
text=True,
timeout=5,
check=False,
)
return str(pid) in (completed.stdout or "")
except Exception:
return False
def _copy_file_with_retries(source_file, target_file, retries=20, delay=0.5):
last_error = None
for _ in range(max(1, retries)):
try:
os.makedirs(os.path.dirname(target_file), exist_ok=True)
shutil.copy2(source_file, target_file)
return
except Exception as exc:
last_error = exc
time.sleep(delay)
raise last_error if last_error else RuntimeError(f"Не удалось скопировать файл: {source_file}")
def _mirror_tree(src_dir, dst_dir, skip_names=None, retries=20, delay=0.5):
skip_set = {name.lower() for name in (skip_names or [])}
os.makedirs(dst_dir, exist_ok=True)
for root, dirs, files in os.walk(src_dir):
rel = os.path.relpath(root, src_dir)
target_root = dst_dir if rel == "." else os.path.join(dst_dir, rel)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
if file_name.lower() in skip_set:
continue
source_file = os.path.join(root, file_name)
target_file = os.path.join(target_root, file_name)
_copy_file_with_retries(source_file, target_file, retries=retries, delay=delay)
def _read_version_marker(base_dir):
marker_path = os.path.join(base_dir, "version.txt")
if not os.path.exists(marker_path):
return ""
try:
with open(marker_path, "r", encoding="utf-8") as f:
return f.read().strip()
except Exception:
return ""
class UpdateWorker(QObject):
status = Signal(int, str)
failed = Signal(str)
done = Signal()
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.app_dir = app_dir
self.source_dir = source_dir
self.exe_name = exe_name
self.target_pid = int(target_pid or 0)
self.version = version or ""
self.work_dir = work_dir or ""
self.log_path = os.path.join(app_dir, "update_error.log")
def _start_app(self):
app_exe = os.path.join(self.app_dir, self.exe_name)
if not os.path.exists(app_exe):
raise RuntimeError(f"Не найден файл приложения: {app_exe}")
creation_flags = 0
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
subprocess.Popen([app_exe], cwd=self.app_dir, creationflags=creation_flags)
def run(self):
backup_dir = os.path.join(tempfile.gettempdir(), f"anabasis_backup_{int(time.time())}")
skip_names = {"anabasisupdater.exe"}
prev_version = _read_version_marker(self.app_dir)
source_version = _read_version_marker(self.source_dir)
expected_version = (self.version or "").strip()
try:
self.status.emit(1, "Ожидание завершения приложения...")
wait_loops = 0
while _is_pid_running(self.target_pid):
time.sleep(1)
wait_loops += 1
if wait_loops >= 180:
self.status.emit(1, "Принудительное завершение зависшего процесса...")
subprocess.run(
["taskkill", "/PID", str(self.target_pid), "/T", "/F"],
capture_output=True,
text=True,
timeout=10,
check=False,
)
time.sleep(2)
if _is_pid_running(self.target_pid):
raise RuntimeError(f"Процесс {self.target_pid} не завершился.")
break
self.status.emit(2, "Проверка содержимого обновления...")
source_app_exe = os.path.join(self.source_dir, self.exe_name)
if not os.path.exists(source_app_exe):
raise RuntimeError(f"В обновлении отсутствует {self.exe_name}")
if expected_version and source_version and source_version != expected_version:
raise RuntimeError(
f"Версия пакета ({source_version}) не совпадает с ожидаемой ({expected_version})."
)
self.status.emit(3, "Создание резервной копии...")
_mirror_tree(self.app_dir, backup_dir, skip_names=skip_names)
self.status.emit(4, "Применение обновления...")
_mirror_tree(self.source_dir, self.app_dir, skip_names=skip_names, retries=30, delay=0.6)
self.status.emit(5, "Проверка установленной версии...")
installed_version = _read_version_marker(self.app_dir)
if expected_version and installed_version and installed_version != expected_version:
raise RuntimeError(
f"После обновления версия {installed_version}, ожидалась {expected_version}."
)
if expected_version and prev_version and prev_version == expected_version:
_write_log(self.log_path, f"Предупреждение: версия до обновления уже была {expected_version}.")
self.status.emit(6, "Запуск обновленного приложения...")
self._start_app()
_write_log(self.log_path, f"Update success to version {expected_version or source_version or 'unknown'}")
self.status.emit(7, "Очистка временных файлов...")
try:
shutil.rmtree(backup_dir, ignore_errors=True)
if self.work_dir and os.path.isdir(self.work_dir):
shutil.rmtree(self.work_dir, ignore_errors=True)
except Exception:
pass
self.done.emit()
except Exception as exc:
_write_log(self.log_path, f"Update failed: {exc}")
try:
self.status.emit(6, "Восстановление из резервной копии...")
if os.path.isdir(backup_dir):
_mirror_tree(backup_dir, self.app_dir, skip_names=skip_names, retries=20, delay=0.5)
_write_log(self.log_path, "Rollback completed.")
try:
self._start_app()
_write_log(self.log_path, "Restored app started after rollback.")
except Exception as start_exc:
_write_log(self.log_path, f"Failed to start app after rollback: {start_exc}")
except Exception as rollback_exc:
_write_log(self.log_path, f"Rollback failed: {rollback_exc}")
self.failed.emit(str(exc))
class UpdaterWindow(QWidget):
def __init__(self, app_dir, source_dir, exe_name, target_pid, version, work_dir=""):
super().__init__()
self.setWindowTitle("Anabasis Updater")
self.setMinimumWidth(480)
self.log_path = os.path.join(app_dir, "update_error.log")
self.label = QLabel("Подготовка обновления...")
self.label.setWordWrap(True)
self.progress = QProgressBar()
self.progress.setRange(0, 7)
self.progress.setValue(0)
self.open_log_btn = QPushButton("Открыть лог")
self.open_log_btn.setEnabled(False)
self.open_log_btn.clicked.connect(self.open_log)
self.close_btn = QPushButton("Закрыть")
self.close_btn.setEnabled(False)
self.close_btn.clicked.connect(self.close)
layout = QVBoxLayout(self)
layout.addWidget(self.label)
layout.addWidget(self.progress)
actions = QHBoxLayout()
actions.addStretch(1)
actions.addWidget(self.open_log_btn)
actions.addWidget(self.close_btn)
layout.addLayout(actions)
self.thread = QThread(self)
self.worker = UpdateWorker(app_dir, source_dir, exe_name, target_pid, version, work_dir=work_dir)
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.worker.run)
self.worker.status.connect(self.on_status)
self.worker.failed.connect(self.on_failed)
self.worker.done.connect(self.on_done)
self.worker.done.connect(self.thread.quit)
self.worker.failed.connect(self.thread.quit)
self.thread.start()
def on_status(self, step, text):
self.label.setText(text)
self.progress.setValue(max(0, min(7, int(step))))
def on_done(self):
self.label.setText("Обновление успешно применено. Приложение запущено.")
self.progress.setValue(7)
self.open_log_btn.setEnabled(True)
QTimer.singleShot(900, self.close)
def on_failed(self, error_text):
self.label.setText(
"Не удалось применить обновление.\n"
f"Причина: {error_text}\n"
"Подробности сохранены в update_error.log."
)
self.open_log_btn.setEnabled(True)
self.close_btn.setEnabled(True)
def open_log(self):
if os.path.exists(self.log_path):
QDesktopServices.openUrl(QUrl.fromLocalFile(self.log_path))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--app-dir", required=True)
parser.add_argument("--source-dir", required=True)
parser.add_argument("--exe-name", required=True)
parser.add_argument("--target-pid", required=True)
parser.add_argument("--version", default="")
parser.add_argument("--work-dir", default="")
return parser.parse_args()
def main():
args = parse_args()
app = QApplication(sys.argv)
app.setStyle("Fusion")
window = UpdaterWindow(
app_dir=args.app_dir,
source_dir=args.source_dir,
exe_name=args.exe_name,
target_pid=args.target_pid,
version=args.version,
work_dir=args.work_dir,
)
window.show()
return app.exec()
if __name__ == "__main__":
sys.exit(main())