47 Commits

Author SHA1 Message Date
147988242f merge: release 2.0.0
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m57s
2026-02-15 21:18:01 +03:00
44deba1382 chore(release): bump version to 2.0.0 2026-02-15 21:17:22 +03:00
eda8d43b9c refactor(ui): simplify instructions and stabilize About dialog
- remove duplicated inline instruction text

- switch About dialog to explicit QDialog with clickable link
2026-02-15 21:17:21 +03:00
cf6d6bcbd0 ci(release): gate by release existence, not tag
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 1m55s
- initialize CONTINUE=true at flow start

- keep stop condition only when release with same tag already exists
2026-02-15 21:11:18 +03:00
61948a51c6 ci(release): detect existing releases via list endpoint
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 15s
- check /releases list by tag_name instead of /releases/tags

- skip git tag push when tag already exists on origin
2026-02-15 21:10:10 +03:00
97c52c5a51 ci(tests): run full suite in Desktop CI
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 1m53s
- switch unittest to discover test_*.py

- include all current test modules in py_compile
2026-02-15 21:02:35 +03:00
862c2c8899 ci(release): run full test suite in workflow
Some checks are pending
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Has started running
- execute unittest discovery for all test_*.py files

- include all current test modules in py_compile check
2026-02-15 21:02:04 +03:00
1013a1ce38 ci(release): fix tag existence check for remote
Some checks are pending
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Has started running
- detect tag by non-empty ls-remote output instead of exit code
2026-02-15 21:00:36 +03:00
f15e71996b ci(release): skip duplicate Gitea release creation
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 14s
- check tag existence on origin via ls-remote

- stop workflow when release for tag already exists
2026-02-15 20:57:01 +03:00
34272d01c8 merge: release 1.7.1
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 1m48s
Reviewed-on: #1
2026-02-15 20:51:41 +03:00
4e6502bab7 Merge branch 'master' into dev
All checks were successful
Desktop CI / tests (pull_request) Successful in 11s
2026-02-15 20:51:22 +03:00
89237590c7 chore(release): bump version to 1.7.1
All checks were successful
Desktop CI / tests (pull_request) Successful in 12s
2026-02-15 20:50:36 +03:00
aca2bdfa85 fix(imports): restore shutil in main module
- fix unresolved reference for cache cleanup path
2026-02-15 20:48:39 +03:00
9d40f0017e refactor(ui): cleanup legacy code and add About dialog
- remove duplicate legacy implementations from main window

- add Help -> About with clickable repository link
2026-02-15 20:47:41 +03:00
798eacbf9a merge: release 1.7.0
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 1m49s
2026-02-15 20:38:32 +03:00
a9a394cf7d fix(update): improve updater script reliability for 1.7.0
- quote env vars in cmd script for paths with spaces

- add update_error.log diagnostics and timeout while waiting for app exit

- bump APP_VERSION to 1.7.0 and update updater tests
2026-02-15 20:38:24 +03:00
e1e2f8f0e8 refactor: вынес сервисы и ui-компоненты
- вынес token/chat/update логику в services

- вынес диалог и текст инструкции в ui

- добавил и обновил тесты для нового слоя
2026-02-15 20:32:36 +03:00
4d84d2ebe5 refactor(core): split vk and update services into modules 2026-02-15 20:04:21 +03:00
02350cfca1 chore(release): bump version to 1.6.4
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 1m51s
2026-02-15 19:44:05 +03:00
68fa841857 feat(update): stage 3 rollback on failed apply/start 2026-02-15 19:43:15 +03:00
bca9007463 chore(release): bump version to 1.6.3
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m54s
2026-02-15 18:52:06 +03:00
52b1301982 fix(update): wait for app exit before file replacement
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 13s
2026-02-15 18:51:00 +03:00
90b3b4fc9d chore(release): bump version to 1.6.2
All checks were successful
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Successful in 1m57s
2026-02-15 18:46:21 +03:00
190e67c931 feat(update): stage 2 sha256 verification for auto-update
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 16s
2026-02-15 18:42:37 +03:00
2eb4c52b81 ci(release): set release title to Anabasis Manager <version>
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m16s
2026-02-15 17:36:02 +03:00
3d73a504d2 ci(release): use v-prefixed semantic tags 2026-02-15 17:35:37 +03:00
1524271be7 ci(release): write outputs/env as utf8 no bom for powershell
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 2m5s
2026-02-15 17:31:37 +03:00
561cf43e09 ci(release): handle missing tag exit code in powershell step
All checks were successful
Desktop CI / tests (push) Successful in 14s
Desktop Release / release (push) Successful in 24s
2026-02-15 17:30:22 +03:00
e8930f7550 ci(release): switch windows release steps from bash to powershell
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 1m14s
2026-02-15 17:28:29 +03:00
c8da0f9191 ci(release): use preinstalled Python on self-hosted windows runner
Some checks failed
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Failing after 14s
2026-02-15 17:20:42 +03:00
37ce500fd2 ci(release): remove node bootstrap step, require system node
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 25s
2026-02-15 17:19:24 +03:00
098a84e5bd ci(release): restore powershell node bootstrap
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 2m42s
2026-02-15 16:42:08 +03:00
5aa17c1a84 ci(release): avoid powershell policy by using cmd node bootstrap
All checks were successful
Desktop CI / tests (push) Successful in 13s
2026-02-15 16:38:40 +03:00
dde14f3714 ci(release): bootstrap node before js-based actions
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 5s
2026-02-15 16:37:42 +03:00
fa5d4c6993 ci(release): use windows runner label
Some checks failed
Desktop CI / tests (push) Successful in 20s
Desktop Release / release (push) Failing after 1m21s
2026-02-15 16:35:21 +03:00
f9e0225243 ci: add Gitea CI and desktop release workflow
Some checks failed
Desktop CI / tests (push) Successful in 1m29s
Desktop Release / release (push) Has been cancelled
2026-02-15 15:27:21 +03:00
c42b23bea5 chore(release): bump version to 1.6.1 2026-02-15 15:25:21 +03:00
b52cdea425 feat(update): stage 1 auto-update (one-click) 2026-02-15 15:24:45 +03:00
b7fad78a71 refactor(release): bump to 1.6.0 and unify version source 2026-02-15 15:17:30 +03:00
e590a6cde0 release: 1.5.1 fixes, relogin and updater 2026-02-15 15:13:13 +03:00
aad6e8c5af feat(auth): pywebview вместо webengine
- добавлен auth_webview.py и режим --auth

- build.py обновлён, WebEngine исключён

- pywebview добавлен в requirements
2026-02-04 00:02:32 +03:00
4629037890 feat(app): улучшения UX, логирование и безопасность 2026-02-03 22:26:41 +03:00
ad24cb6fca feat(chat): добавлена функция назначения администраторов чатов
- Добавлено верхнее меню "Инструменты".
- Реализован метод set_user_admin с вызовом API messages.setMemberRole.
- Добавлена конвертация локального chat_id в peer_id (2000000000+id) для корректной работы метода.
- Добавлены диалоги подтверждения и отчет о результатах выполнения.
2026-02-01 05:33:08 +03:00
9b263dd85f feat(build): автоматизация сборки и поддержка бессрочных токенов
- Исправлена ошибка ImportError: QtWebEngineCore путем перехода на PyInstaller.
- Добавлен скрипт build.py для автоматической сборки, очистки DLL и создания ZIP-архива.
- Реализована поддержка бессрочного доступа (offline_access) для VK Access Token.
- Обновлен README.md: добавлены разделы для разработчиков и описание структуры данных.
- Оптимизирован размер билда за счет удаления неиспользуемых библиотек Qt и папок локализации.
2026-02-01 05:32:51 +03:00
d7eaec4ba4 fix(chat): исправление логина и загрузки списков чатов
Signed-off-by: benya <benya@daemonlord.ru>
2026-02-01 05:32:40 +03:00
Alex
30fc78e89b feat(build): Добавлена кросс-платформенная поддержка в setup.py
Модифицирован скрипт сборки на основе cx_Freeze для обеспечения совместимости с основными операционными системами (Windows, macOS, Linux). Ранее скрипт был настроен преимущественно для Windows.

Ключевые изменения:
- **Динамическое имя файла:** Исполняемый файл получает расширение `.exe` только при сборке на Windows.
- **Разделение сборок:** Для каждой целевой ОС создается своя папка (например, `build_linux`), что позволяет хранить сборки для разных систем одновременно.
- **Платформо-зависимые опции:** Учтены особенности сборки для каждой ОС, включая `base="Win32GUI"` для Windows и `base=None` для Linux.

edit: changed .gitignore
2026-02-01 05:32:32 +03:00
Alex
6225fb15d4 feat(ui): массовые операции, вкладки и улучшение UX 2026-02-01 05:31:56 +03:00
23 changed files with 2392 additions and 622 deletions

35
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,35 @@
name: Desktop CI
on:
push:
branches:
- master
pull_request:
jobs:
tests:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: https://git.daemonlord.ru/actions/setup-python@v5
with:
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
- name: Run tests
run: |
python -m unittest discover -s tests -p "test_*.py" -v

View File

@@ -0,0 +1,154 @@
name: Desktop Release
on:
push:
branches:
- master
jobs:
release:
runs-on: windows
steps:
- name: Checkout
uses: https://git.daemonlord.ru/actions/checkout@v4
with:
fetch-depth: 0
tags: true
- name: Ensure Python 3.13
shell: powershell
run: |
if (Get-Command python -ErrorAction SilentlyContinue) {
python --version
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
if (-not $pyExe) {
throw "Python 3.13 launcher is available, but interpreter was not found."
}
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
python --version
} else {
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
}
- name: Install dependencies
shell: powershell
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt pyinstaller
- name: Extract app version
id: extract_version
shell: powershell
run: |
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
Write-Host "Detected version: $version"
- name: Initialize release flow
id: flow_init
shell: powershell
run: |
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
exit 0
- name: Stop if release already exists
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
try {
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
$found = $false
foreach ($release in $response) {
if ($release.tag_name -eq $tag) {
$found = $true
break
}
}
if ($found) {
Write-Host "Release $tag already exists, stopping job."
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
} else {
Write-Host "Release $tag not found, continuing workflow..."
}
} catch {
Write-Host "Failed to query releases list, continuing workflow..."
}
- name: Run tests
if: env.CONTINUE == 'true'
shell: powershell
run: |
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m unittest discover -s tests -p "test_*.py" -v
- name: Build release zip
if: env.CONTINUE == 'true'
shell: powershell
run: |
python build.py
- name: Ensure archive exists
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archivePath = "dist/AnabasisManager-$version.zip"
if (-not (Test-Path $archivePath)) {
throw "Archive not found: $archivePath"
}
- name: Generate SHA256 checksum
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$archiveName = "AnabasisManager-$version.zip"
$archivePath = "dist/$archiveName"
$checksumPath = "dist/$archiveName.sha256"
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
Write-Host "Checksum created: $checksumPath"
- name: Configure git identity
if: env.CONTINUE == 'true'
shell: powershell
run: |
git config user.name "gitea-actions"
git config user.email "gitea-actions@daemonlord.ru"
- name: Create git tag
if: env.CONTINUE == 'true'
shell: powershell
run: |
$version = "${{ steps.extract_version.outputs.version }}"
$tag = "v$version"
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
if ([string]::IsNullOrWhiteSpace($tagLine)) {
git tag "$tag"
git push origin "$tag"
} else {
Write-Host "Tag $tag already exists on origin, skipping tag push."
}
- name: Create Gitea Release
if: env.CONTINUE == 'true'
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
with:
server_url: https://git.daemonlord.ru
repository: ${{ gitea.repository }}
token: ${{ secrets.API_TOKEN }}
tag_name: v${{ steps.extract_version.outputs.version }}
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
body: |
Desktop release v${{ steps.extract_version.outputs.version }}
files: |
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256

14
.gitignore vendored
View File

@@ -1,3 +1,13 @@
/.venv/ /.venv/
/.venv1/ /setup.py
/.venv3/ /build_cx/
/build_linux/
/build_win32/
/build_darwin/
.idea/
__pycache__/
*.py[cod]
tests/__pycache__/
build/
dist/
AnabasisManager.spec

18
LICENSE Normal file
View File

@@ -0,0 +1,18 @@
MIT License
Copyright (c) 2025 Aleksandr Denisov
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the
following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO
EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.

104
README.md Normal file
View File

@@ -0,0 +1,104 @@
# 🚀 Anabasis VK Chat Manager
**Anabasis VK Chat Manager** — специализированное десктопное приложение для HR-менеджеров и администраторов сообществ, предназначенное для автоматизации управления участниками в чатах ВКонтакте. Избавьтесь от рутины и управляйте всеми беседами из одного удобного интерфейса.
---
## ✨ Основные возможности
* **🔐 Безопасная авторизация:** Вход через официальный VK OAuth во встроенном защищенном браузере.
* **💾 Умное сохранение сессий:** Поддержка Persistent Cookies — не нужно вводить пароль при каждом запуске.
* **⏳ Таймер токена:** Наглядное отображение времени действия сессии прямо в интерфейсе.
* **📊 Массовые операции:**
* Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса.
---
## 📦 Установка и запуск
### Вариант 1: Готовый билд (Windows)
1. Перейдите в раздел **Releases** на GitHub.
2. Скачайте архив формата `AnabasisManager-1.x.zip`.
3. Распакуйте архив в удобную папку.
4. Запустите файл **AnabasisManager.exe**.
### Вариант 2: Запуск из исходного кода
Вам потребуется **Python 3.10** или выше.
1. **Клонируйте репозиторий:**
```bash
git clone [https://github.com/your-username/AnabasisVKChatManager.git](https://github.com/your-username/AnabasisVKChatManager.git)
cd AnabasisVKChatManager
```
2. **Настройте виртуальное окружение:**
```bash
python -m venv venv
# Для Windows:
.\venv\Scripts\activate
# Для macOS/Linux:
source venv/bin/activate
```
3. **Установите зависимости:**
```bash
pip install PySide6 vk_api
```
4. **Запустите приложение:**
```bash
python main.py
```
---
## 🕹 Инструкция по использованию
1. **Вход:** Нажмите кнопку «Авторизоваться через VK». Введите данные в открывшемся окне браузера.
2. **Выбор целей:** Отметьте галочками чаты, в которых нужно произвести изменения.
3. **Данные пользователя:** Вставьте ссылку на профиль VK человека, которого нужно добавить или удалить.
4. **Действие:** Нажмите кнопку нужной операции. Следите за прогрессом в окне системных сообщений.
---
## 📂 Техническая информация
### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
**Команда для сборки:**
```bash
python build.py
```
Скрипт автоматически:
Собирает .exe через PyInstaller с использованием --collect-all для модулей WebEngine.
Удаляет лишние библиотеки (PDF, Multimedia, Designer) и папки переводов, сокращая размер сборки на ~100 МБ.
Создает готовый ZIP-архив с актуальной версией в названии.
Хранение данных
Приложение использует системные папки AppData для изоляции пользовательских данных:
Windows: %APPDATA%/AnabasisVKChatManager
macOS: ~/Library/Application Support/AnabasisVKChatManager
В этих папках хранятся token.json (доступ к API) и web_engine_cache/ (сессия браузера).
---
## 📜 Лицензия
---
Проект распространяется под лицензией MIT.
Сэкономьте часы ручного труда с Anabasis VK Chat Manager.

1
app_version.py Normal file
View File

@@ -0,0 +1 @@
APP_VERSION = "2.0.0"

78
auth_webview.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import sys
import time
import json
import threading
from urllib.parse import urlparse, parse_qs, unquote
import webview
def extract_token(url_string):
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
return token, expires_in
def main_auth(auth_url, output_path):
def poll_url():
try:
url = window.get_current_url()
except Exception:
url = None
if url:
token, expires_in = extract_token(url)
if token:
data = {"token": token, "expires_in": expires_in}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f)
window.destroy()
return
threading.Timer(0.5, poll_url).start()
def on_loaded():
threading.Timer(0.5, poll_url).start()
window = webview.create_window("VK Авторизация", auth_url)
window.events.loaded += on_loaded
storage_path = os.path.join(os.path.dirname(output_path), "webview_profile")
webview.start(private_mode=False, storage_path=storage_path)
if __name__ == "__main__":
main()

108
build.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import shutil
import subprocess
import sys
from app_version import APP_VERSION
# --- Конфигурация ---
APP_NAME = "AnabasisManager"
VERSION = APP_VERSION # Единая версия приложения
MAIN_SCRIPT = "main.py"
ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"}
REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
"Qt6Charts.dll", "Qt6Multimedia.dll", "Qt63DCore.dll",
"translations",
"Qt6QuickTemplates2.dll"
]
def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing:
print("[ERROR] Скрипт нужно запускать из корня проекта.")
print(f"[ERROR] Не найдены: {', '.join(missing)}")
sys.exit(1)
def run_build():
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
command = [
"pyinstaller",
"--noconfirm",
"--onedir",
"--windowed",
"--exclude-module", "PySide6.QtWebEngineCore",
"--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "",
f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT
]
command = [arg for arg in command if arg]
try:
subprocess.check_call(command)
print("\n[OK] Сборка PyInstaller завершена.")
except subprocess.CalledProcessError as e:
print(f"\n[ERROR] Ошибка при сборке: {e}")
sys.exit(1)
def run_cleanup():
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
# Пытаемся найти папку PySide6 внутри сборки
pyside_path = os.path.join(DIST_DIR, "PySide6")
if not os.path.exists(pyside_path):
pyside_path = DIST_DIR
for item in REMOVE_LIST:
path = os.path.join(pyside_path, item)
if os.path.exists(path):
try:
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
print(f"Удалено: {item}")
except Exception as e:
print(f"Пропуск {item}: {e}")
def create_archive():
print(f"\n--- 3. Создание архива {ARCHIVE_NAME}.zip ---")
try:
# Создаем zip-архив из папки DIST_DIR
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
print(f"[OK] Архив создан: dist/{ARCHIVE_NAME}.zip")
except Exception as e:
print(f"[ERROR] Не удалось создать архив: {e}")
if __name__ == "__main__":
ensure_project_root()
# Предварительная очистка
for folder in ["build", "dist"]:
if os.path.exists(folder):
shutil.rmtree(folder)
run_build()
run_cleanup()
create_archive()
print("\n" + "=" * 30)
print("ПРОЦЕСС ЗАВЕРШЕН")
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
print("=" * 30)

BIN
icon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

1661
main.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +1,3 @@
PySide6~=6.9.1 PySide6~=6.10.2
vk-api~=11.9.9 vk-api~=11.9.9
pywebview

5
services/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
from .auto_update_service import AutoUpdateService
from .chat_actions import load_chat_conversations, resolve_user_ids
from .token_store import load_token, save_token
from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService

View File

@@ -0,0 +1,166 @@
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import urllib.request
import zipfile
class AutoUpdateService:
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
@staticmethod
def download_update_text(url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
@classmethod
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = cls.download_update_text(checksum_url)
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = cls.sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
@staticmethod
def locate_extracted_root(extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
@staticmethod
def build_update_script(app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal EnableExtensions",
f"set \"APP_DIR={app_dir}\"",
f"set \"SRC_DIR={source_dir}\"",
f"set \"EXE_NAME={exe_name}\"",
f"set \"TARGET_PID={target_pid}\"",
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
" exit /b 3",
")",
"set /a WAIT_LOOPS=0",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID% to exit >> \"%UPDATE_LOG%\"",
" goto :backup",
" )",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
":backup",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"echo Update success >> \"%UPDATE_LOG%\"",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
@staticmethod
def launch_update_script(script_path, work_dir):
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod
def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
cls.download_update_archive(download_url, zip_path)
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

46
services/chat_actions.py Normal file
View File

@@ -0,0 +1,46 @@
from urllib.parse import urlparse
def resolve_user_ids(vk_call_with_retry, vk_api, links):
resolved_ids = []
failed_links = []
for link in links:
try:
path = urlparse(link).path
screen_name = path.split("/")[-1] if path else ""
if not screen_name and len(path.split("/")) > 1:
screen_name = path.split("/")[-2]
if not screen_name:
failed_links.append((link, None))
continue
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get("type") == "user":
resolved_ids.append(resolved_object["object_id"])
else:
failed_links.append((link, None))
except Exception as e:
failed_links.append((link, e))
return resolved_ids, failed_links
def load_chat_conversations(vk_call_with_retry, vk_api):
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
return conversations

136
services/token_store.py Normal file
View File

@@ -0,0 +1,136 @@
import base64
import ctypes
import json
import os
import time
from ctypes import wintypes
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def save_token(token, token_file, app_data_dir, expires_in=0):
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(app_data_dir, exist_ok=True)
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception:
pass
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted,
}
with open(token_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return expiration_time
def load_token(token_file):
if not os.path.exists(token_file):
return None, None
with open(token_file, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception:
try:
os.remove(token_file)
except Exception:
pass
return None, None
expiration_time = data.get("expiration_time")
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
try:
os.remove(token_file)
except Exception:
pass
return None, None

163
services/update_service.py Normal file
View File

@@ -0,0 +1,163 @@
import json
import os
import re
import urllib.error
import urllib.request
from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
cfg_url = _sanitize_repo_url(configured_url)
if cfg_url:
return cfg_url
cfg_repo = _sanitize_repo_url(configured_repo)
if cfg_repo:
return cfg_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if not match:
return ""
remote = match.group(1).strip()
if remote.startswith("git@"):
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
self.request_timeout = request_timeout
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
if parsed.netloc.lower().endswith("github.com"):
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
release_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
self.check_finished.emit(
{
"repository_url": self.repository_url,
"latest_version": latest_version,
"current_version": self.current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"download_url": download_url,
"download_name": download_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, self.current_version),
}
)

59
services/vk_service.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import sys
import time
from vk_api import VkApi
from vk_api.exceptions import VkApiError
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path, entry_script_path=None):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
script_path = entry_script_path or os.path.abspath(__file__)
return sys.executable, [script_path, "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)

View File

@@ -0,0 +1,60 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.build_update_script", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,51 @@
import hashlib
import importlib.util
import tempfile
import unittest
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"auto_update_service",
Path("services/auto_update_service.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
AutoUpdateService = _MODULE.AutoUpdateService
class AutoUpdateServiceTests(unittest.TestCase):
def test_extract_sha256_from_text(self):
digest = "a" * 64
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
extracted = AutoUpdateService.extract_sha256_from_text(
text,
"AnabasisManager-1.0.0-win.zip",
)
self.assertEqual(extracted, digest)
def test_sha256_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "payload.bin"
payload = b"anabasis"
path.write_bytes(payload)
expected = hashlib.sha256(payload).hexdigest()
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
def test_build_update_script_contains_core_vars(self):
script = AutoUpdateService.build_update_script(
app_dir=r"C:\Apps\AnabasisManager",
source_dir=r"C:\Temp\Extracted",
exe_name="AnabasisManager.exe",
target_pid=1234,
)
script_text = Path(script).read_text(encoding="utf-8")
self.assertIn("set \"APP_DIR=", script_text)
self.assertIn("set \"SRC_DIR=", script_text)
self.assertIn("set \"EXE_NAME=", script_text)
self.assertIn("set \"TARGET_PID=", script_text)
self.assertIn(":rollback", script_text)
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,65 @@
import unittest
import importlib.util
from types import SimpleNamespace
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"chat_actions",
Path("services/chat_actions.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_chat_conversations = _MODULE.load_chat_conversations
resolve_user_ids = _MODULE.resolve_user_ids
class ChatActionsTests(unittest.TestCase):
def test_resolve_user_ids_mixed_results(self):
mapping = {
"id1": {"type": "user", "object_id": 1},
"id2": {"type": "group", "object_id": 2},
}
def call_with_retry(func, **kwargs):
return func(**kwargs)
def resolve_screen_name(screen_name):
if screen_name == "boom":
raise RuntimeError("boom")
return mapping.get(screen_name)
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
links = [
"https://vk.com/id1",
"https://vk.com/id2",
"https://vk.com/boom",
"https://vk.com/",
]
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
self.assertEqual(resolved, [1])
self.assertEqual(len(failed), 3)
self.assertEqual(failed[0][0], "https://vk.com/id2")
self.assertIsNone(failed[0][1])
def test_load_chat_conversations_paginated(self):
pages = [
{"items": [{"id": 1}], "next_from": "page-2"},
{"items": [{"id": 2}]},
]
def get_conversations(**kwargs):
if kwargs.get("start_from") == "page-2":
return pages[1]
return pages[0]
def call_with_retry(func, **kwargs):
return func(**kwargs)
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
items = load_chat_conversations(call_with_retry, vk_api)
self.assertEqual(items, [{"id": 1}, {"id": 2}])
if __name__ == "__main__":
unittest.main()

53
tests/test_token_store.py Normal file
View File

@@ -0,0 +1,53 @@
import tempfile
import unittest
import importlib.util
from pathlib import Path
from unittest.mock import patch
_SPEC = importlib.util.spec_from_file_location(
"token_store",
Path("services/token_store.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_token = _MODULE.load_token
save_token = _MODULE.save_token
class TokenStoreTests(unittest.TestCase):
def test_save_and_load_non_expiring_token(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
expiration = save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=0,
)
token, loaded_expiration = load_token(str(token_file))
self.assertEqual(expiration, 0)
self.assertEqual(token, "abc123")
self.assertEqual(loaded_expiration, 0)
def test_expired_token_is_removed(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
with patch.object(_MODULE.time, "time", return_value=1000):
save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=1,
)
with patch.object(_MODULE.time, "time", return_value=2000):
token, expiration = load_token(str(token_file))
self.assertIsNone(token)
self.assertIsNone(expiration)
if __name__ == "__main__":
unittest.main()

25
ui/dialogs.py Normal file
View File

@@ -0,0 +1,25 @@
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]

9
ui/main_window.py Normal file
View File

@@ -0,0 +1,9 @@
def instructions_text():
return (
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)