Compare commits
37 Commits
v2.0.0-df3
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5a3e4c188e | ||
|
|
ac2013bcca | ||
| c77ca4652b | |||
| 4ec24c6d0f | |||
| 72edfffd9e | |||
| db5d901435 | |||
| cd5e6e1f6b | |||
| 4b3347a069 | |||
| e0628b1792 | |||
| 201184700f | |||
| 5253c942e8 | |||
| c645d964bf | |||
| 13890fbbfc | |||
| d7494c1092 | |||
| 67f6910435 | |||
| 2c502fe3bf | |||
| 02078282bc | |||
| b1ed97a826 | |||
| 5be8ab9af7 | |||
| 0f07fe250c | |||
| fc0c98ee49 | |||
| e22eac6de3 | |||
| bf7e5e599e | |||
| d1714a86c7 | |||
| 781bf679ff | |||
| 813dafd6b8 | |||
| 965d09d47c | |||
| 1b4760167f | |||
| 039c1fa38a | |||
| 147988242f | |||
| cf6d6bcbd0 | |||
| 61948a51c6 | |||
| 97c52c5a51 | |||
| 862c2c8899 | |||
| 1013a1ce38 | |||
| f15e71996b | |||
| 34272d01c8 |
@@ -28,7 +28,7 @@ jobs:
|
||||
|
||||
- name: Validate syntax
|
||||
run: |
|
||||
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
|
||||
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
|
||||
@@ -38,6 +38,28 @@ jobs:
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements.txt pyinstaller
|
||||
|
||||
- name: Ensure Inno Setup 6
|
||||
shell: powershell
|
||||
run: |
|
||||
$isccPath = ""
|
||||
$inPath = Get-Command iscc.exe -ErrorAction SilentlyContinue
|
||||
if ($inPath) {
|
||||
$isccPath = $inPath.Source
|
||||
Write-Host "Inno Setup compiler found in PATH."
|
||||
} elseif (Test-Path "C:\Program Files (x86)\Inno Setup 6\ISCC.exe") {
|
||||
"C:\Program Files (x86)\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||
$isccPath = "C:\Program Files (x86)\Inno Setup 6\ISCC.exe"
|
||||
Write-Host "Inno Setup compiler found in Program Files (x86)."
|
||||
} elseif (Test-Path "C:\Program Files\Inno Setup 6\ISCC.exe") {
|
||||
"C:\Program Files\Inno Setup 6" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||
$isccPath = "C:\Program Files\Inno Setup 6\ISCC.exe"
|
||||
Write-Host "Inno Setup compiler found in Program Files."
|
||||
} else {
|
||||
throw "Inno Setup 6 is not installed on runner. Install Inno Setup and restart runner service."
|
||||
}
|
||||
Write-Host "Using ISCC: $isccPath"
|
||||
exit 0
|
||||
|
||||
- name: Extract app version
|
||||
id: extract_version
|
||||
shell: powershell
|
||||
@@ -47,24 +69,40 @@ jobs:
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
|
||||
Write-Host "Detected version: $version"
|
||||
|
||||
- name: Stop if version already released
|
||||
id: stop
|
||||
- name: Initialize release flow
|
||||
id: flow_init
|
||||
shell: powershell
|
||||
run: |
|
||||
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||
exit 0
|
||||
|
||||
- name: Stop if release already exists
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$tag = "v$version"
|
||||
git show-ref --tags --quiet --verify "refs/tags/$tag"
|
||||
$tagExists = ($LASTEXITCODE -eq 0)
|
||||
$global:LASTEXITCODE = 0
|
||||
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
|
||||
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
|
||||
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||
if ($tagExists) {
|
||||
Write-Host "Version $tag already released, stopping job."
|
||||
try {
|
||||
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
|
||||
$found = $false
|
||||
foreach ($release in $response) {
|
||||
if ($release.tag_name -eq $tag) {
|
||||
$found = $true
|
||||
break
|
||||
}
|
||||
}
|
||||
if ($found) {
|
||||
Write-Host "Release $tag already exists, stopping job."
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
|
||||
} else {
|
||||
Write-Host "Version $tag not released yet, continuing workflow..."
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||
Write-Host "Release $tag not found, continuing workflow..."
|
||||
}
|
||||
} catch {
|
||||
Write-Host "Failed to query releases list, continuing workflow..."
|
||||
}
|
||||
exit 0
|
||||
|
||||
- name: Run tests
|
||||
if: env.CONTINUE == 'true'
|
||||
@@ -76,18 +114,43 @@ jobs:
|
||||
- name: Build release zip
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
env:
|
||||
PYTHONUTF8: "1"
|
||||
PYTHONIOENCODING: "utf-8"
|
||||
run: |
|
||||
python build.py
|
||||
$ErrorActionPreference = "Continue"
|
||||
$repoRoot = (git rev-parse --show-toplevel).Trim()
|
||||
if (-not [string]::IsNullOrWhiteSpace($repoRoot)) {
|
||||
Set-Location $repoRoot
|
||||
}
|
||||
$logDir = Join-Path $env:RUNNER_TEMP "anabasis-build"
|
||||
New-Item -ItemType Directory -Force -Path $logDir | Out-Null
|
||||
$buildLog = Join-Path $logDir "build.log"
|
||||
python build.py *>&1 | Tee-Object -FilePath $buildLog
|
||||
$code = $LASTEXITCODE
|
||||
if ($code -ne 0) {
|
||||
Write-Host "Build failed with exit code $code. Dumping build log:"
|
||||
if (Test-Path $buildLog) {
|
||||
Get-Content -Path $buildLog -Raw
|
||||
} else {
|
||||
Write-Host "Build log was not created: $buildLog"
|
||||
}
|
||||
exit $code
|
||||
}
|
||||
|
||||
- name: Ensure archive exists
|
||||
- name: Ensure artifacts exist
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$archivePath = "dist/AnabasisManager-$version.zip"
|
||||
$installerPath = "dist/AnabasisManager-setup-$version.exe"
|
||||
if (-not (Test-Path $archivePath)) {
|
||||
throw "Archive not found: $archivePath"
|
||||
}
|
||||
if (-not (Test-Path $installerPath)) {
|
||||
throw "Installer not found: $installerPath"
|
||||
}
|
||||
|
||||
- name: Generate SHA256 checksum
|
||||
if: env.CONTINUE == 'true'
|
||||
@@ -95,11 +158,14 @@ jobs:
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$archiveName = "AnabasisManager-$version.zip"
|
||||
$archivePath = "dist/$archiveName"
|
||||
$checksumPath = "dist/$archiveName.sha256"
|
||||
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
|
||||
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
|
||||
$installerName = "AnabasisManager-setup-$version.exe"
|
||||
foreach ($name in @($archiveName, $installerName)) {
|
||||
$path = "dist/$name"
|
||||
$checksumPath = "dist/$name.sha256"
|
||||
$hash = (Get-FileHash -Path $path -Algorithm SHA256).Hash.ToLower()
|
||||
"$hash $name" | Set-Content -Path $checksumPath -Encoding UTF8
|
||||
Write-Host "Checksum created: $checksumPath"
|
||||
}
|
||||
|
||||
- name: Configure git identity
|
||||
if: env.CONTINUE == 'true'
|
||||
@@ -114,8 +180,14 @@ jobs:
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$tag = "v$version"
|
||||
git tag "$tag"
|
||||
$sha = "${{ gitea.sha }}"
|
||||
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
|
||||
if ([string]::IsNullOrWhiteSpace($tagLine)) {
|
||||
git tag -a "$tag" -m "Release $tag" "$sha"
|
||||
git push origin "$tag"
|
||||
} else {
|
||||
Write-Host "Tag $tag already exists on origin, skipping tag push."
|
||||
}
|
||||
|
||||
- name: Create Gitea Release
|
||||
if: env.CONTINUE == 'true'
|
||||
@@ -125,9 +197,12 @@ jobs:
|
||||
repository: ${{ gitea.repository }}
|
||||
token: ${{ secrets.API_TOKEN }}
|
||||
tag_name: v${{ steps.extract_version.outputs.version }}
|
||||
target_commitish: ${{ gitea.sha }}
|
||||
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
|
||||
body: |
|
||||
Desktop release v${{ steps.extract_version.outputs.version }}
|
||||
files: |
|
||||
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
|
||||
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
|
||||
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe
|
||||
dist/AnabasisManager-setup-${{ steps.extract_version.outputs.version }}.exe.sha256
|
||||
|
||||
13
README.md
13
README.md
@@ -13,9 +13,12 @@
|
||||
* Моментальная загрузка всех доступных чатов пользователя.
|
||||
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
|
||||
* Быстрое обновление списка бесед.
|
||||
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
|
||||
* Визуальный прогресс-бар по ходу операции.
|
||||
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
|
||||
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
|
||||
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса.
|
||||
* **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
|
||||
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
|
||||
|
||||
---
|
||||
|
||||
@@ -47,7 +50,7 @@
|
||||
|
||||
3. **Установите зависимости:**
|
||||
```bash
|
||||
pip install PySide6 vk_api
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
4. **Запустите приложение:**
|
||||
@@ -68,6 +71,12 @@
|
||||
|
||||
## 📂 Техническая информация
|
||||
|
||||
### Последние обновления
|
||||
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
|
||||
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
|
||||
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
|
||||
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
|
||||
|
||||
### Сборка проекта (для разработчиков)
|
||||
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
|
||||
|
||||
|
||||
@@ -1 +1 @@
|
||||
APP_VERSION = "2.0.0"
|
||||
APP_VERSION = "2.2.5"
|
||||
|
||||
@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
|
||||
webview.start(private_mode=False, storage_path=storage_path)
|
||||
|
||||
|
||||
def main():
|
||||
# Supports both: `python auth_webview.py <auth_url> <output_path>`
|
||||
# and: `python auth_webview.py --auth <auth_url> <output_path>`
|
||||
args = sys.argv[1:]
|
||||
if len(args) == 3 and args[0] == "--auth":
|
||||
auth_url, output_path = args[1], args[2]
|
||||
elif len(args) == 2:
|
||||
auth_url, output_path = args[0], args[1]
|
||||
else:
|
||||
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
|
||||
return 1
|
||||
|
||||
main_auth(auth_url, output_path)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
sys.exit(main())
|
||||
|
||||
154
build.py
154
build.py
@@ -4,15 +4,17 @@ import subprocess
|
||||
import sys
|
||||
from app_version import APP_VERSION
|
||||
|
||||
# --- Конфигурация ---
|
||||
# --- Configuration ---
|
||||
APP_NAME = "AnabasisManager"
|
||||
UPDATER_NAME = "AnabasisUpdater"
|
||||
VERSION = APP_VERSION # Единая версия приложения
|
||||
MAIN_SCRIPT = "main.py"
|
||||
UPDATER_SCRIPT = "updater_gui.py"
|
||||
ICON_PATH = "icon.ico"
|
||||
INSTALLER_SCRIPT = os.path.join("installer", "AnabasisManager.iss")
|
||||
DIST_DIR = os.path.join("dist", APP_NAME)
|
||||
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
|
||||
INSTALLER_NAME = f"{APP_NAME}-setup-{VERSION}.exe"
|
||||
SAFE_CLEAN_ROOT_FILES = {"main.py", "updater_gui.py", "requirements.txt", "build.py"}
|
||||
REMOVE_LIST = [
|
||||
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
|
||||
@@ -30,22 +32,38 @@ def write_version_marker():
|
||||
os.makedirs(DIST_DIR, exist_ok=True)
|
||||
with open(marker_path, "w", encoding="utf-8") as f:
|
||||
f.write(str(VERSION).strip() + "\n")
|
||||
print(f"[OK] Обновлен маркер версии: {marker_path}")
|
||||
print(f"[OK] Version marker written: {marker_path}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Не удалось записать version.txt: {e}")
|
||||
print(f"[ERROR] Failed to write version.txt: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def copy_icon_to_dist():
|
||||
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||
if not os.path.exists(icon_abs_path):
|
||||
print("[WARN] icon.ico not found, skipping icon copy into dist.")
|
||||
return
|
||||
try:
|
||||
os.makedirs("dist", exist_ok=True)
|
||||
os.makedirs(DIST_DIR, exist_ok=True)
|
||||
shutil.copy2(icon_abs_path, os.path.join("dist", "icon.ico"))
|
||||
shutil.copy2(icon_abs_path, os.path.join(DIST_DIR, "icon.ico"))
|
||||
print("[OK] Icon copied to dist/icon.ico and dist/AnabasisManager/icon.ico")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to copy icon.ico into dist: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def ensure_project_root():
|
||||
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
|
||||
if missing:
|
||||
print("[ERROR] Скрипт нужно запускать из корня проекта.")
|
||||
print(f"[ERROR] Не найдены: {', '.join(missing)}")
|
||||
print("[ERROR] Run this script from the project root.")
|
||||
print(f"[ERROR] Missing files: {', '.join(missing)}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_build():
|
||||
print(f"--- 1. Запуск PyInstaller для {APP_NAME} v{VERSION} ---")
|
||||
print(f"--- 1. Running PyInstaller for {APP_NAME} v{VERSION} ---")
|
||||
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||
has_icon = os.path.exists(icon_abs_path)
|
||||
|
||||
@@ -68,14 +86,14 @@ def run_build():
|
||||
|
||||
try:
|
||||
subprocess.check_call(command)
|
||||
print("\n[OK] Сборка PyInstaller завершена.")
|
||||
print("\n[OK] PyInstaller build completed.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"\n[ERROR] Ошибка при сборке: {e}")
|
||||
print(f"\n[ERROR] Build failed: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_updater_build():
|
||||
print(f"\n--- 1.2 Сборка {UPDATER_NAME} ---")
|
||||
print(f"\n--- 1.2 Building {UPDATER_NAME} ---")
|
||||
icon_abs_path = os.path.abspath(ICON_PATH)
|
||||
has_icon = os.path.exists(icon_abs_path)
|
||||
updater_spec_dir = os.path.join("build", "updater_spec")
|
||||
@@ -98,14 +116,14 @@ def run_updater_build():
|
||||
command = [arg for arg in command if arg]
|
||||
try:
|
||||
subprocess.check_call(command)
|
||||
print(f"[OK] {UPDATER_NAME} собран.")
|
||||
print(f"[OK] {UPDATER_NAME} built.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"[ERROR] Ошибка при сборке {UPDATER_NAME}: {e}")
|
||||
print(f"[ERROR] Failed to build {UPDATER_NAME}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def run_cleanup():
|
||||
print(f"\n--- 2. Оптимизация папки {APP_NAME} ---")
|
||||
print(f"\n--- 2. Optimizing {APP_NAME} folder ---")
|
||||
|
||||
# Пытаемся найти папку PySide6 внутри сборки
|
||||
pyside_path = os.path.join(DIST_DIR, "PySide6")
|
||||
@@ -120,21 +138,116 @@ def run_cleanup():
|
||||
shutil.rmtree(path)
|
||||
else:
|
||||
os.remove(path)
|
||||
print(f"Удалено: {item}")
|
||||
print(f"Removed: {item}")
|
||||
except Exception as e:
|
||||
print(f"Пропуск {item}: {e}")
|
||||
print(f"Skipped {item}: {e}")
|
||||
|
||||
|
||||
def create_archive():
|
||||
print(f"\n--- 3. Создание архива {ARCHIVE_NAME}.zip ---")
|
||||
print(f"\n--- 3. Creating archive {ARCHIVE_NAME}.zip ---")
|
||||
|
||||
try:
|
||||
# Создаем zip-архив из папки DIST_DIR
|
||||
# base_name - имя файла без расширения, format - 'zip', root_dir - что упаковываем
|
||||
shutil.make_archive(os.path.join("dist", ARCHIVE_NAME), 'zip', DIST_DIR)
|
||||
print(f"[OK] Архив создан: dist/{ARCHIVE_NAME}.zip")
|
||||
print(f"[OK] Archive created: dist/{ARCHIVE_NAME}.zip")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Не удалось создать архив: {e}")
|
||||
print(f"[ERROR] Failed to create archive: {e}")
|
||||
|
||||
|
||||
def _find_iscc():
|
||||
candidates = []
|
||||
iscc_env = os.getenv("ISCC_PATH", "").strip()
|
||||
if iscc_env:
|
||||
candidates.append(iscc_env)
|
||||
candidates.append(shutil.which("iscc"))
|
||||
candidates.append(shutil.which("ISCC.exe"))
|
||||
candidates.append(r"C:\Program Files (x86)\Inno Setup 6\ISCC.exe")
|
||||
candidates.append(r"C:\Program Files\Inno Setup 6\ISCC.exe")
|
||||
for candidate in candidates:
|
||||
if candidate and os.path.exists(candidate):
|
||||
return candidate
|
||||
return ""
|
||||
|
||||
|
||||
def _decode_process_output(raw_bytes):
|
||||
if raw_bytes is None:
|
||||
return ""
|
||||
if isinstance(raw_bytes, str):
|
||||
return raw_bytes
|
||||
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "cp1251", "cp866", "latin-1"):
|
||||
try:
|
||||
return raw_bytes.decode(enc)
|
||||
except Exception:
|
||||
continue
|
||||
return raw_bytes.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def build_installer():
|
||||
print(f"\n--- 4. Building installer {INSTALLER_NAME} ---")
|
||||
if os.name != "nt":
|
||||
print("[INFO] Inno Setup installer is built only on Windows. Step skipped.")
|
||||
return
|
||||
if not os.path.exists(INSTALLER_SCRIPT):
|
||||
print(f"[ERROR] Installer script not found: {INSTALLER_SCRIPT}")
|
||||
sys.exit(1)
|
||||
if not os.path.exists(DIST_DIR):
|
||||
print(f"[ERROR] Build output folder not found: {DIST_DIR}")
|
||||
sys.exit(1)
|
||||
iscc_path = _find_iscc()
|
||||
if not iscc_path:
|
||||
print("[ERROR] Inno Setup Compiler (ISCC.exe) not found.")
|
||||
print("[ERROR] Install Inno Setup 6 or set ISCC_PATH environment variable.")
|
||||
sys.exit(1)
|
||||
|
||||
project_root = os.path.abspath(".")
|
||||
source_dir = os.path.abspath(DIST_DIR)
|
||||
output_dir = os.path.abspath("dist")
|
||||
iss_path = os.path.abspath(INSTALLER_SCRIPT)
|
||||
icon_path = os.path.abspath(ICON_PATH)
|
||||
print(f"[INFO] ISCC source dir: {source_dir}")
|
||||
print(f"[INFO] ISCC output dir: {output_dir}")
|
||||
print(f"[INFO] ISCC script: {iss_path}")
|
||||
print(f"[INFO] ISCC icon path: {icon_path}")
|
||||
if not os.path.exists(source_dir):
|
||||
print(f"[ERROR] Source dir does not exist: {source_dir}")
|
||||
sys.exit(1)
|
||||
if not os.path.exists(iss_path):
|
||||
print(f"[ERROR] Installer script does not exist: {iss_path}")
|
||||
sys.exit(1)
|
||||
if not os.path.exists(icon_path):
|
||||
print(f"[ERROR] Icon file does not exist: {icon_path}")
|
||||
sys.exit(1)
|
||||
command = [
|
||||
iscc_path,
|
||||
f"/DMyAppVersion={VERSION}",
|
||||
f"/DMyIconFile={icon_path}",
|
||||
f"/O{output_dir}",
|
||||
iss_path,
|
||||
]
|
||||
try:
|
||||
completed = subprocess.run(
|
||||
command,
|
||||
capture_output=True,
|
||||
cwd=project_root,
|
||||
check=False,
|
||||
)
|
||||
stdout_text = _decode_process_output(completed.stdout)
|
||||
stderr_text = _decode_process_output(completed.stderr)
|
||||
if stdout_text:
|
||||
print(stdout_text.rstrip())
|
||||
if stderr_text:
|
||||
print(stderr_text.rstrip())
|
||||
if completed.returncode != 0:
|
||||
raise RuntimeError(f"ISCC exited with code {completed.returncode}")
|
||||
installer_path = os.path.join("dist", INSTALLER_NAME)
|
||||
if not os.path.exists(installer_path):
|
||||
print(f"[ERROR] Installer was not created: {installer_path}")
|
||||
sys.exit(1)
|
||||
print(f"[OK] Installer created: {installer_path}")
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Failed to build installer: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -147,10 +260,13 @@ if __name__ == "__main__":
|
||||
run_build()
|
||||
run_updater_build()
|
||||
run_cleanup()
|
||||
copy_icon_to_dist()
|
||||
write_version_marker()
|
||||
create_archive()
|
||||
build_installer()
|
||||
|
||||
print("\n" + "=" * 30)
|
||||
print("ПРОЦЕСС ЗАВЕРШЕН")
|
||||
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
|
||||
print("BUILD COMPLETED")
|
||||
print(f"Release archive: dist/{ARCHIVE_NAME}.zip")
|
||||
print(f"Installer: dist/{INSTALLER_NAME}")
|
||||
print("=" * 30)
|
||||
|
||||
42
installer/AnabasisManager.iss
Normal file
42
installer/AnabasisManager.iss
Normal file
@@ -0,0 +1,42 @@
|
||||
#define MyAppName "Anabasis Manager"
|
||||
#ifndef MyAppVersion
|
||||
#define MyAppVersion "0.0.0"
|
||||
#endif
|
||||
#ifndef MyIconFile
|
||||
#define MyIconFile "..\icon.ico"
|
||||
#endif
|
||||
|
||||
[Setup]
|
||||
AppId={{6CD9D6F2-4B95-4E9C-A8D8-2A9C8F6AA741}
|
||||
AppName={#MyAppName}
|
||||
AppVersion={#MyAppVersion}
|
||||
AppPublisher=Benya
|
||||
DefaultDirName={localappdata}\Programs\Anabasis Manager
|
||||
DefaultGroupName=Anabasis Manager
|
||||
DisableProgramGroupPage=yes
|
||||
PrivilegesRequired=lowest
|
||||
OutputDir=..\dist
|
||||
OutputBaseFilename=AnabasisManager-setup-{#MyAppVersion}
|
||||
Compression=lzma2
|
||||
SolidCompression=yes
|
||||
WizardStyle=modern
|
||||
ArchitecturesInstallIn64BitMode=x64compatible
|
||||
UninstallDisplayIcon={app}\AnabasisManager.exe
|
||||
SetupIconFile={#MyIconFile}
|
||||
|
||||
[Languages]
|
||||
Name: "russian"; MessagesFile: "compiler:Languages\Russian.isl"
|
||||
Name: "english"; MessagesFile: "compiler:Default.isl"
|
||||
|
||||
[Tasks]
|
||||
Name: "desktopicon"; Description: "Создать ярлык на рабочем столе"; GroupDescription: "Дополнительные задачи:"
|
||||
|
||||
[Files]
|
||||
Source: "..\dist\AnabasisManager\*"; DestDir: "{app}"; Flags: ignoreversion recursesubdirs createallsubdirs
|
||||
|
||||
[Icons]
|
||||
Name: "{group}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"
|
||||
Name: "{autodesktop}\Anabasis Manager"; Filename: "{app}\AnabasisManager.exe"; Tasks: desktopicon
|
||||
|
||||
[Run]
|
||||
Filename: "{app}\AnabasisManager.exe"; Description: "Запустить Anabasis Manager"; Flags: nowait postinstall skipifsilent
|
||||
435
main.py
435
main.py
@@ -1,13 +1,12 @@
|
||||
import json
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from PySide6.QtCore import QProcess
|
||||
from PySide6.QtCore import QStandardPaths
|
||||
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
|
||||
from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer
|
||||
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
|
||||
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
|
||||
QPushButton, QVBoxLayout, QWidget, QMessageBox,
|
||||
@@ -50,6 +49,7 @@ UPDATE_REPOSITORY = ""
|
||||
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
|
||||
UPDATE_CHANNEL_DEFAULT = "stable"
|
||||
UPDATE_REQUEST_TIMEOUT = 8
|
||||
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
|
||||
|
||||
|
||||
def get_resource_path(relative_path):
|
||||
@@ -59,6 +59,86 @@ def get_resource_path(relative_path):
|
||||
# Для cx_Freeze и обычного запуска
|
||||
return os.path.join(os.path.abspath("."), relative_path)
|
||||
|
||||
class BulkActionWorker(QObject):
|
||||
progress = Signal(int, int, str)
|
||||
finished = Signal(dict)
|
||||
auth_error = Signal(str, object, str)
|
||||
failed = Signal(str)
|
||||
done = Signal()
|
||||
|
||||
def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages):
|
||||
super().__init__()
|
||||
self.vk_call_with_retry = vk_call_with_retry
|
||||
self.vk = vk_api
|
||||
self.action_type = action_type
|
||||
self.selected_chats = selected_chats
|
||||
self.user_infos = user_infos
|
||||
self.visible_messages = visible_messages
|
||||
self.total = len(self.selected_chats) * len(self.user_infos)
|
||||
|
||||
@staticmethod
|
||||
def _is_auth_error(exc):
|
||||
return VkService.is_auth_error(exc, str(exc).lower())
|
||||
|
||||
def _emit_progress(self, processed):
|
||||
label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add")
|
||||
self.progress.emit(processed, self.total, label)
|
||||
|
||||
def run(self):
|
||||
results = []
|
||||
processed = 0
|
||||
try:
|
||||
for chat in self.selected_chats:
|
||||
peer_id = None
|
||||
if self.action_type == "admin":
|
||||
try:
|
||||
peer_id = 2000000000 + int(chat["id"])
|
||||
except (ValueError, TypeError):
|
||||
for _user_id, user_info in self.user_infos.items():
|
||||
results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})")
|
||||
processed += 1
|
||||
self._emit_progress(processed)
|
||||
continue
|
||||
|
||||
for user_id, user_info in self.user_infos.items():
|
||||
try:
|
||||
if self.action_type == "remove":
|
||||
self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id)
|
||||
results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.")
|
||||
elif self.action_type == "add":
|
||||
params = {"chat_id": chat["id"], "user_id": user_id}
|
||||
if self.visible_messages:
|
||||
params["visible_messages_count"] = 250
|
||||
self.vk_call_with_retry(self.vk.messages.addChatUser, **params)
|
||||
results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.")
|
||||
elif self.action_type == "admin":
|
||||
self.vk_call_with_retry(
|
||||
self.vk.messages.setMemberRole,
|
||||
peer_id=peer_id,
|
||||
member_id=user_id,
|
||||
role="admin",
|
||||
)
|
||||
results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.")
|
||||
else:
|
||||
raise RuntimeError(f"Unknown action: {self.action_type}")
|
||||
except VkApiError as exc:
|
||||
if self._is_auth_error(exc):
|
||||
context = "set_user_admin" if self.action_type == "admin" else "execute_user_action"
|
||||
action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями"
|
||||
self.auth_error.emit(context, exc, action_name)
|
||||
return
|
||||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}")
|
||||
finally:
|
||||
processed += 1
|
||||
self._emit_progress(processed)
|
||||
|
||||
self.finished.emit({"results": results, "processed": processed, "total": self.total})
|
||||
except Exception as exc:
|
||||
self.failed.emit(str(exc))
|
||||
finally:
|
||||
self.done.emit()
|
||||
|
||||
|
||||
class VkChatManager(QMainWindow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
@@ -85,11 +165,19 @@ class VkChatManager(QMainWindow):
|
||||
self._auth_ui_busy = False
|
||||
self._auth_relogin_in_progress = False
|
||||
self._last_auth_relogin_ts = 0.0
|
||||
self._active_action_button = None
|
||||
self._active_action_button_text = ""
|
||||
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
|
||||
self.update_channel = UPDATE_CHANNEL_DEFAULT
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
self._update_in_progress = False
|
||||
self._update_check_silent = False
|
||||
self._bulk_worker_thread = None
|
||||
self._bulk_worker = None
|
||||
self._bulk_action_context = None
|
||||
self._bulk_action_success_message_title = ""
|
||||
self._bulk_clear_inputs_on_success = True
|
||||
|
||||
self.resolve_timer = QTimer(self)
|
||||
self.resolve_timer.setSingleShot(True)
|
||||
@@ -184,6 +272,12 @@ class VkChatManager(QMainWindow):
|
||||
self.add_user_btn.setMinimumHeight(50)
|
||||
self.add_user_btn.clicked.connect(self.add_user_to_chat)
|
||||
layout.addWidget(self.add_user_btn)
|
||||
self.operation_progress = QProgressBar()
|
||||
self.operation_progress.setRange(0, 100)
|
||||
self.operation_progress.setValue(0)
|
||||
self.operation_progress.setTextVisible(True)
|
||||
self.operation_progress.hide()
|
||||
layout.addWidget(self.operation_progress)
|
||||
self.status_label = QLabel("Статус: не авторизован")
|
||||
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
layout.addWidget(self.status_label)
|
||||
@@ -369,10 +463,12 @@ class VkChatManager(QMainWindow):
|
||||
self._log_event("update_channel", f"update_channel={self.update_channel}")
|
||||
|
||||
def check_for_updates(self, silent_no_updates=False):
|
||||
if self.update_thread and self.update_thread.is_alive():
|
||||
if self._update_in_progress:
|
||||
self.status_label.setText("Статус: проверка обновлений уже выполняется...")
|
||||
return
|
||||
|
||||
self._update_check_silent = silent_no_updates
|
||||
self._update_in_progress = True
|
||||
self._set_update_action_state(True)
|
||||
channel_label = "бета" if self.update_channel == "beta" else "релизы"
|
||||
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
|
||||
@@ -383,16 +479,20 @@ class VkChatManager(QMainWindow):
|
||||
request_timeout=UPDATE_REQUEST_TIMEOUT,
|
||||
channel=self.update_channel,
|
||||
)
|
||||
self.update_thread = QThread(self)
|
||||
self.update_checker.moveToThread(self.update_thread)
|
||||
self.update_thread.started.connect(self.update_checker.run)
|
||||
self.update_checker.check_finished.connect(self._on_update_check_finished)
|
||||
self.update_checker.check_failed.connect(self._on_update_check_failed)
|
||||
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
|
||||
self.update_checker.check_finished.connect(self.update_thread.quit)
|
||||
self.update_checker.check_failed.connect(self.update_thread.quit)
|
||||
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
|
||||
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
|
||||
self.update_thread.finished.connect(self._on_update_thread_finished)
|
||||
self.update_thread.finished.connect(self.update_thread.deleteLater)
|
||||
self.update_thread.start()
|
||||
|
||||
def _on_update_check_finished(self, result):
|
||||
self._set_update_action_state(False)
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
|
||||
if result.get("has_update"):
|
||||
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
|
||||
self.status_label.setText(f"Статус: доступно обновление {latest_version}")
|
||||
@@ -405,8 +505,20 @@ class VkChatManager(QMainWindow):
|
||||
f"Доступная версия: {latest_version}\n\n"
|
||||
"Открыть страницу загрузки?"
|
||||
)
|
||||
release_notes = (result.get("release_notes") or "").strip()
|
||||
if release_notes:
|
||||
preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()]
|
||||
preview_text = "\n".join(preview_lines[:8])
|
||||
if len(preview_lines) > 8:
|
||||
preview_text += "\n..."
|
||||
message_box.setInformativeText(f"Что нового:\n{preview_text}")
|
||||
message_box.setDetailedText(release_notes)
|
||||
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
|
||||
download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
|
||||
setup_button = None
|
||||
installer_url = result.get("installer_url")
|
||||
if installer_url:
|
||||
setup_button = message_box.addButton("Скачать и установить (setup)", QMessageBox.ButtonRole.AcceptRole)
|
||||
releases_button = message_box.addButton("Релизы", QMessageBox.ButtonRole.ActionRole)
|
||||
cancel_button = message_box.addButton("Позже", QMessageBox.ButtonRole.RejectRole)
|
||||
message_box.setDefaultButton(update_now_button)
|
||||
@@ -422,6 +534,9 @@ class VkChatManager(QMainWindow):
|
||||
if release_url:
|
||||
QDesktopServices.openUrl(QUrl(release_url))
|
||||
return
|
||||
if setup_button is not None and clicked is setup_button and installer_url:
|
||||
QDesktopServices.openUrl(QUrl(installer_url))
|
||||
return
|
||||
if clicked is download_button and download_url:
|
||||
QDesktopServices.openUrl(QUrl(download_url))
|
||||
elif clicked in (download_button, releases_button) and release_url:
|
||||
@@ -436,9 +551,6 @@ class VkChatManager(QMainWindow):
|
||||
QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
|
||||
|
||||
def _on_update_check_failed(self, error_text):
|
||||
self._set_update_action_state(False)
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
self._log_event("update_check_failed", error_text, level="WARN")
|
||||
if not self.update_repository_url:
|
||||
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
|
||||
@@ -455,6 +567,12 @@ class VkChatManager(QMainWindow):
|
||||
if not self._update_check_silent:
|
||||
QMessageBox.warning(self, "Проверка обновлений", error_text)
|
||||
|
||||
def _on_update_thread_finished(self):
|
||||
self._set_update_action_state(False)
|
||||
self._update_in_progress = False
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
|
||||
def setup_token_timer(self):
|
||||
self.token_countdown_timer = QTimer(self)
|
||||
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
|
||||
@@ -543,6 +661,110 @@ class VkChatManager(QMainWindow):
|
||||
else:
|
||||
self.set_ui_state(True)
|
||||
|
||||
def _start_operation_progress(self, total, label_text, action_button=None):
|
||||
total = max(1, int(total))
|
||||
self.operation_progress.setRange(0, total)
|
||||
self.operation_progress.setValue(0)
|
||||
self.operation_progress.setFormat(f"{label_text}: %v/%m")
|
||||
self.operation_progress.show()
|
||||
self._active_action_button = action_button
|
||||
self._active_action_button_text = action_button.text() if action_button else ""
|
||||
if action_button is not None:
|
||||
action_button.setText(f"{label_text} (0/{total})")
|
||||
action_button.setEnabled(False)
|
||||
|
||||
def _update_operation_progress(self, processed, total, label_text):
|
||||
total = max(1, int(total))
|
||||
processed = max(0, min(int(processed), total))
|
||||
self.operation_progress.setRange(0, total)
|
||||
self.operation_progress.setValue(processed)
|
||||
self.operation_progress.setFormat(f"{label_text}: {processed}/{total}")
|
||||
if self._active_action_button is not None:
|
||||
self._active_action_button.setText(f"{label_text} ({processed}/{total})")
|
||||
|
||||
def _finish_operation_progress(self):
|
||||
self.operation_progress.hide()
|
||||
self.operation_progress.setValue(0)
|
||||
self.operation_progress.setFormat("%p%")
|
||||
if self._active_action_button is not None:
|
||||
self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text())
|
||||
self._active_action_button = None
|
||||
self._active_action_button_text = ""
|
||||
|
||||
def _start_bulk_action_worker(
|
||||
self,
|
||||
action_type,
|
||||
selected_chats,
|
||||
user_infos,
|
||||
action_label,
|
||||
action_button=None,
|
||||
success_message_title="Результаты",
|
||||
clear_inputs_on_success=True,
|
||||
):
|
||||
total = max(1, len(selected_chats) * len(user_infos))
|
||||
self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action"
|
||||
self._bulk_action_success_message_title = success_message_title
|
||||
self._bulk_clear_inputs_on_success = clear_inputs_on_success
|
||||
self._log_event(
|
||||
"bulk_action",
|
||||
f"start action={action_type} chats={len(selected_chats)} users={len(user_infos)}",
|
||||
)
|
||||
|
||||
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
|
||||
self._start_operation_progress(total, action_label, action_button=action_button)
|
||||
|
||||
worker = BulkActionWorker(
|
||||
vk_call_with_retry=self._vk_call_with_retry,
|
||||
vk_api=self.vk,
|
||||
action_type=action_type,
|
||||
selected_chats=selected_chats,
|
||||
user_infos=user_infos,
|
||||
visible_messages=self.visible_messages_checkbox.isChecked(),
|
||||
)
|
||||
thread = QThread(self)
|
||||
worker.moveToThread(thread)
|
||||
thread.started.connect(worker.run)
|
||||
worker.progress.connect(self._on_bulk_action_progress)
|
||||
worker.finished.connect(self._on_bulk_action_finished)
|
||||
worker.auth_error.connect(self._on_bulk_action_auth_error)
|
||||
worker.failed.connect(self._on_bulk_action_failed)
|
||||
worker.done.connect(self._on_bulk_action_done)
|
||||
worker.done.connect(thread.quit)
|
||||
worker.done.connect(worker.deleteLater)
|
||||
thread.finished.connect(thread.deleteLater)
|
||||
self._bulk_worker = worker
|
||||
self._bulk_worker_thread = thread
|
||||
thread.start()
|
||||
|
||||
def _on_bulk_action_progress(self, processed, total, label):
|
||||
label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label)
|
||||
self._update_operation_progress(processed, total, label_text)
|
||||
self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...")
|
||||
|
||||
def _on_bulk_action_finished(self, payload):
|
||||
results = payload.get("results", []) if isinstance(payload, dict) else []
|
||||
processed = payload.get("processed") if isinstance(payload, dict) else None
|
||||
total = payload.get("total") if isinstance(payload, dict) else None
|
||||
if processed is not None and total is not None:
|
||||
self._log_event("bulk_action", f"done processed={processed} total={total}")
|
||||
QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results))
|
||||
if self._bulk_clear_inputs_on_success:
|
||||
self.vk_url_input.clear()
|
||||
self.user_ids_to_process.clear()
|
||||
self.set_ui_state(self.token is not None)
|
||||
|
||||
def _on_bulk_action_auth_error(self, context, exc, action_name):
|
||||
self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name)
|
||||
|
||||
def _on_bulk_action_failed(self, error_text):
|
||||
QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}")
|
||||
|
||||
def _on_bulk_action_done(self):
|
||||
self._finish_operation_progress()
|
||||
self._set_busy(False)
|
||||
self._bulk_worker = None
|
||||
self._bulk_worker_thread = None
|
||||
|
||||
def _ensure_log_dir(self):
|
||||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||||
|
||||
@@ -553,8 +775,8 @@ class VkChatManager(QMainWindow):
|
||||
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"[WARN] log_write_failed: {exc}\n")
|
||||
|
||||
def _log_error(self, context, exc):
|
||||
self._log("ERROR", context, self._format_vk_error(exc))
|
||||
@@ -571,8 +793,8 @@ class VkChatManager(QMainWindow):
|
||||
if os.path.exists(LOG_BACKUP_FILE):
|
||||
os.remove(LOG_BACKUP_FILE)
|
||||
os.replace(LOG_FILE, LOG_BACKUP_FILE)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"[WARN] log_rotate_failed: {exc}\n")
|
||||
|
||||
def _format_vk_error(self, exc):
|
||||
error = getattr(exc, "error", None)
|
||||
@@ -684,8 +906,8 @@ class VkChatManager(QMainWindow):
|
||||
try:
|
||||
if output_path and os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
|
||||
|
||||
def _on_auth_process_finished(self, exit_code, _exit_status):
|
||||
output_path = self.auth_output_path
|
||||
@@ -722,8 +944,8 @@ class VkChatManager(QMainWindow):
|
||||
try:
|
||||
if os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
|
||||
else:
|
||||
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
|
||||
|
||||
@@ -735,6 +957,7 @@ class VkChatManager(QMainWindow):
|
||||
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
|
||||
return
|
||||
|
||||
self._log_event("auth_process", "start")
|
||||
if keep_status_text and hasattr(self, "_relogin_status_text"):
|
||||
status_text = self._relogin_status_text
|
||||
self._relogin_status_text = None
|
||||
@@ -755,8 +978,8 @@ class VkChatManager(QMainWindow):
|
||||
try:
|
||||
if os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
self._log_event("auth_result_cleanup", f"Не удалось удалить старый файл результата авторизации: {exc}", level="WARN")
|
||||
|
||||
program, args = self._build_auth_command(auth_url, output_path)
|
||||
self.auth_output_path = output_path
|
||||
@@ -776,14 +999,29 @@ class VkChatManager(QMainWindow):
|
||||
self._auth_relogin_in_progress = False
|
||||
return
|
||||
|
||||
self._log_event("auth_process", f"success expires_in={expires_in}")
|
||||
self.token = token
|
||||
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
|
||||
try:
|
||||
self.token_expiration_time = token_store_save_token(
|
||||
self.token,
|
||||
TOKEN_FILE,
|
||||
APP_DATA_DIR,
|
||||
expires_in=expires_in,
|
||||
)
|
||||
except Exception as e:
|
||||
try:
|
||||
expires_value = int(expires_in)
|
||||
except (TypeError, ValueError):
|
||||
expires_value = 0
|
||||
self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0
|
||||
self._log_event("token_store_save", str(e), level="WARN")
|
||||
QMessageBox.warning(
|
||||
self,
|
||||
"Предупреждение",
|
||||
"Не удалось безопасно сохранить токен на диске. "
|
||||
"Текущая сессия активна, но после перезапуска потребуется повторная авторизация.",
|
||||
)
|
||||
|
||||
self.token_input.setText(self.token[:50] + "...")
|
||||
self.status_label.setText("Статус: авторизован")
|
||||
@@ -895,7 +1133,8 @@ class VkChatManager(QMainWindow):
|
||||
try:
|
||||
user = self.vk.users.get(user_ids=user_id)[0]
|
||||
return f"{user.get('first_name', '')} {user.get('last_name', '')}"
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
self._log_event("get_user_info", f"Не удалось получить имя пользователя {user_id}: {exc}", level="WARN")
|
||||
return f"Пользователь {user_id}"
|
||||
|
||||
def _get_selected_chats(self):
|
||||
@@ -907,7 +1146,7 @@ class VkChatManager(QMainWindow):
|
||||
selected.append({'id': chat_id, 'title': title})
|
||||
return selected
|
||||
|
||||
def _execute_user_action(self, action_type):
|
||||
def _execute_user_action(self, action_type, action_button=None):
|
||||
if not self.user_ids_to_process:
|
||||
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
|
||||
return
|
||||
@@ -965,44 +1204,23 @@ class VkChatManager(QMainWindow):
|
||||
if confirm_dialog.clickedButton() != yes_button:
|
||||
return
|
||||
|
||||
results = []
|
||||
total = len(selected_chats) * len(user_infos)
|
||||
processed = 0
|
||||
try:
|
||||
action_label = "исключение" if action_type == "remove" else "приглашение"
|
||||
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
|
||||
for chat in selected_chats:
|
||||
for user_id, user_info in user_infos.items():
|
||||
try:
|
||||
if action_type == "remove":
|
||||
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
|
||||
results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.")
|
||||
else:
|
||||
params = {'chat_id': chat['id'], 'user_id': user_id}
|
||||
if self.visible_messages_checkbox.isChecked():
|
||||
params['visible_messages_count'] = 250
|
||||
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
|
||||
results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.")
|
||||
except VkApiError as e:
|
||||
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
|
||||
self._start_bulk_action_worker(
|
||||
action_type=action_type,
|
||||
selected_chats=selected_chats,
|
||||
user_infos=user_infos,
|
||||
action_label=action_label,
|
||||
action_button=action_button,
|
||||
success_message_title="Результаты",
|
||||
clear_inputs_on_success=True,
|
||||
)
|
||||
return
|
||||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||||
finally:
|
||||
processed += 1
|
||||
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
|
||||
finally:
|
||||
self._set_busy(False)
|
||||
|
||||
QMessageBox.information(self, "Результаты", "\n".join(results))
|
||||
self.vk_url_input.clear()
|
||||
self.user_ids_to_process.clear()
|
||||
self.set_ui_state(self.token is not None)
|
||||
|
||||
def remove_user(self):
|
||||
self._execute_user_action("remove")
|
||||
self._execute_user_action("remove", action_button=self.remove_user_btn)
|
||||
|
||||
def add_user_to_chat(self):
|
||||
self._execute_user_action("add")
|
||||
self._execute_user_action("add", action_button=self.add_user_btn)
|
||||
|
||||
def set_user_admin(self):
|
||||
"""Назначает пользователя администратором чата."""
|
||||
@@ -1040,46 +1258,16 @@ class VkChatManager(QMainWindow):
|
||||
if confirm_dialog.clickedButton() != yes_button:
|
||||
return
|
||||
|
||||
# 4. Выполнение API запросов
|
||||
results = []
|
||||
total = len(selected_chats) * len(user_infos)
|
||||
processed = 0
|
||||
try:
|
||||
self._set_busy(True, f"Статус: назначение админов (0/{total})...")
|
||||
for chat in selected_chats:
|
||||
# VK API требует peer_id. Для чатов это 2000000000 + local_id
|
||||
try:
|
||||
peer_id = 2000000000 + int(chat['id'])
|
||||
except ValueError:
|
||||
results.append(f"✗ Ошибка ID чата: {chat['id']}")
|
||||
continue
|
||||
|
||||
for user_id, user_info in user_infos.items():
|
||||
try:
|
||||
self._vk_call_with_retry(
|
||||
self.vk.messages.setMemberRole,
|
||||
peer_id=peer_id,
|
||||
member_id=user_id,
|
||||
role="admin"
|
||||
self._start_bulk_action_worker(
|
||||
action_type="admin",
|
||||
selected_chats=selected_chats,
|
||||
user_infos=user_infos,
|
||||
action_label="назначение админов",
|
||||
action_button=None,
|
||||
success_message_title="Результаты назначения",
|
||||
clear_inputs_on_success=True,
|
||||
)
|
||||
results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.")
|
||||
except VkApiError as e:
|
||||
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
|
||||
return
|
||||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||||
finally:
|
||||
processed += 1
|
||||
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
|
||||
finally:
|
||||
self._set_busy(False)
|
||||
|
||||
# 5. Вывод результата
|
||||
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
|
||||
|
||||
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
|
||||
self.vk_url_input.clear()
|
||||
self.user_ids_to_process.clear()
|
||||
self.set_ui_state(self.token is not None)
|
||||
|
||||
# Refactor overrides: keep logic in service modules and thin UI orchestration here.
|
||||
def _process_links_list(self, links_list):
|
||||
@@ -1087,6 +1275,7 @@ class VkChatManager(QMainWindow):
|
||||
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
|
||||
return
|
||||
|
||||
self._log_event("resolve_ids", f"start count={len(links_list)}")
|
||||
self.user_ids_to_process.clear()
|
||||
resolved_ids = []
|
||||
failed_links = []
|
||||
@@ -1110,6 +1299,10 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
self.user_ids_to_process = resolved_ids
|
||||
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
|
||||
self._log_event(
|
||||
"resolve_ids",
|
||||
f"done resolved={len(resolved_ids)} failed={len(failed_links)}",
|
||||
)
|
||||
if len(links_list) > 1:
|
||||
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
|
||||
|
||||
@@ -1136,17 +1329,40 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
try:
|
||||
self._set_busy(True, "Статус: загрузка чатов...")
|
||||
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
|
||||
self._log_event("load_chats", "start")
|
||||
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk, log_func=self._log_event)
|
||||
type_counts = {}
|
||||
non_chat_samples = []
|
||||
missing_title_count = 0
|
||||
for conv in conversations:
|
||||
if conv["conversation"]["peer"]["type"] != "chat":
|
||||
conv_info = conv.get("conversation", {})
|
||||
peer = conv_info.get("peer", {})
|
||||
peer_type = peer.get("type", "unknown")
|
||||
type_counts[peer_type] = type_counts.get(peer_type, 0) + 1
|
||||
if peer_type != "chat":
|
||||
if len(non_chat_samples) < 30:
|
||||
non_chat_samples.append(
|
||||
{
|
||||
"type": peer_type,
|
||||
"peer_id": peer.get("id"),
|
||||
"local_id": peer.get("local_id"),
|
||||
"title": (conv_info.get("chat_settings") or {}).get("title", ""),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
chat_id = conv["conversation"]["peer"]["local_id"]
|
||||
title = conv["conversation"]["chat_settings"]["title"]
|
||||
chat_id = peer.get("local_id")
|
||||
chat_settings = conv_info.get("chat_settings") or {}
|
||||
title = chat_settings.get("title", "")
|
||||
if not title:
|
||||
missing_title_count += 1
|
||||
self.chats.append({"id": chat_id, "title": title})
|
||||
checkbox = QCheckBox(f"{title} (id: {chat_id})")
|
||||
checkbox.setProperty("chat_id", chat_id)
|
||||
|
||||
if "группа магазинов" in title.casefold():
|
||||
self._log_event("load_chats", f"chat_match title='{title}' id={chat_id}")
|
||||
|
||||
if "AG офис" in title:
|
||||
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
|
||||
self.office_chat_checkboxes.append(checkbox)
|
||||
@@ -1168,6 +1384,17 @@ class VkChatManager(QMainWindow):
|
||||
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
|
||||
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
|
||||
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
|
||||
self._log_event(
|
||||
"load_chats",
|
||||
(
|
||||
f"done total={len(conversations)} "
|
||||
f"chats={len(self.chats)} "
|
||||
f"type_counts={type_counts} "
|
||||
f"missing_titles={missing_title_count}"
|
||||
),
|
||||
)
|
||||
if non_chat_samples:
|
||||
self._log_event("load_chats", f"non_chat_samples={non_chat_samples}")
|
||||
except VkApiError as e:
|
||||
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
|
||||
return
|
||||
@@ -1212,12 +1439,9 @@ class VkChatManager(QMainWindow):
|
||||
version=latest_version,
|
||||
)
|
||||
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
|
||||
QMessageBox.information(
|
||||
self,
|
||||
"Обновление запущено",
|
||||
"Обновление скачано. Открылось окно обновления.",
|
||||
)
|
||||
QApplication.instance().quit()
|
||||
self.status_label.setText("Статус: обновление запущено, закрываю приложение...")
|
||||
self.close()
|
||||
QTimer.singleShot(0, QApplication.instance().quit)
|
||||
return True
|
||||
except Exception as e:
|
||||
self._log_event("auto_update_failed", str(e), level="ERROR")
|
||||
@@ -1233,7 +1457,8 @@ if __name__ == "__main__":
|
||||
idx = sys.argv.index("--auth")
|
||||
auth_url = sys.argv[idx + 1]
|
||||
output_path = sys.argv[idx + 2]
|
||||
except Exception:
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"[ERROR] auth_cli_args_invalid: {exc}\n")
|
||||
sys.exit(1)
|
||||
auth_webview.main_auth(auth_url, output_path)
|
||||
sys.exit(0)
|
||||
|
||||
@@ -9,6 +9,18 @@ import zipfile
|
||||
|
||||
|
||||
class AutoUpdateService:
|
||||
@staticmethod
|
||||
def _safe_extract_zip(archive, destination_dir):
|
||||
destination_real = os.path.realpath(destination_dir)
|
||||
for member in archive.infolist():
|
||||
member_name = member.filename or ""
|
||||
if not member_name:
|
||||
continue
|
||||
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
|
||||
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
|
||||
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
|
||||
archive.extractall(destination_dir)
|
||||
|
||||
@staticmethod
|
||||
def download_update_archive(download_url, destination_path):
|
||||
request = urllib.request.Request(
|
||||
@@ -215,6 +227,6 @@ class AutoUpdateService:
|
||||
cls.verify_update_checksum(zip_path, checksum_url, download_name)
|
||||
os.makedirs(unpack_dir, exist_ok=True)
|
||||
with zipfile.ZipFile(zip_path, "r") as archive:
|
||||
archive.extractall(unpack_dir)
|
||||
cls._safe_extract_zip(archive, unpack_dir)
|
||||
source_dir = cls.locate_extracted_root(unpack_dir)
|
||||
return work_dir, source_dir
|
||||
|
||||
@@ -1,6 +1,15 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
def _safe_log(log_func, context, message):
|
||||
if not log_func:
|
||||
return
|
||||
try:
|
||||
log_func(context, message)
|
||||
except TypeError:
|
||||
log_func(f"{context}: {message}")
|
||||
|
||||
|
||||
def resolve_user_ids(vk_call_with_retry, vk_api, links):
|
||||
resolved_ids = []
|
||||
failed_links = []
|
||||
@@ -23,24 +32,74 @@ def resolve_user_ids(vk_call_with_retry, vk_api, links):
|
||||
return resolved_ids, failed_links
|
||||
|
||||
|
||||
def load_chat_conversations(vk_call_with_retry, vk_api):
|
||||
def load_chat_conversations(vk_call_with_retry, vk_api, log_func=None):
|
||||
conversations = []
|
||||
start_from = None
|
||||
seen_start_tokens = set()
|
||||
total_count = None
|
||||
page_num = 0
|
||||
while True:
|
||||
params = {"count": 200, "filter": "all"}
|
||||
if start_from:
|
||||
if start_from in seen_start_tokens:
|
||||
_safe_log(log_func, "load_chats_page", f"stop duplicate next_from={start_from}")
|
||||
break
|
||||
params["start_from"] = start_from
|
||||
seen_start_tokens.add(start_from)
|
||||
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
|
||||
page_num += 1
|
||||
if total_count is None:
|
||||
total_count = response.get("count")
|
||||
page_items = response.get("items", [])
|
||||
_safe_log(
|
||||
log_func,
|
||||
"load_chats_page",
|
||||
f"page={page_num} items={len(page_items)} next_from={response.get('next_from')} total={total_count}",
|
||||
)
|
||||
if not page_items:
|
||||
break
|
||||
conversations.extend(page_items)
|
||||
start_from = response.get("next_from")
|
||||
if not start_from:
|
||||
break
|
||||
|
||||
if total_count is not None and total_count > len(conversations):
|
||||
_safe_log(
|
||||
log_func,
|
||||
"load_chats_fallback",
|
||||
f"start offset pagination total={total_count} current={len(conversations)}",
|
||||
)
|
||||
seen_keys = set()
|
||||
for conv in conversations:
|
||||
peer = (conv.get("conversation") or {}).get("peer", {})
|
||||
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
|
||||
seen_keys.add(key)
|
||||
|
||||
offset = len(conversations)
|
||||
safety_pages = 0
|
||||
while offset < total_count:
|
||||
params = {"count": 200, "filter": "all", "offset": offset}
|
||||
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
|
||||
page_items = response.get("items", [])
|
||||
_safe_log(
|
||||
log_func,
|
||||
"load_chats_fallback",
|
||||
f"offset={offset} items={len(page_items)} total={response.get('count')}",
|
||||
)
|
||||
if not page_items:
|
||||
break
|
||||
for item in page_items:
|
||||
peer = (item.get("conversation") or {}).get("peer", {})
|
||||
key = (peer.get("type"), peer.get("id") or peer.get("local_id"))
|
||||
if key in seen_keys:
|
||||
continue
|
||||
seen_keys.add(key)
|
||||
conversations.append(item)
|
||||
offset += len(page_items)
|
||||
safety_pages += 1
|
||||
if safety_pages > 50:
|
||||
_safe_log(log_func, "load_chats_fallback", "stop safety_pages>50")
|
||||
break
|
||||
|
||||
return conversations
|
||||
|
||||
|
||||
@@ -91,8 +91,8 @@ def save_token(token, token_file, app_data_dir, expires_in=0):
|
||||
try:
|
||||
stored_token = _encrypt_token(token)
|
||||
encrypted = True
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
raise RuntimeError("Failed to securely store token with DPAPI.") from exc
|
||||
|
||||
data = {
|
||||
"token": stored_token,
|
||||
|
||||
@@ -5,7 +5,42 @@ import urllib.error
|
||||
import urllib.request
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from PySide6.QtCore import QObject, Signal
|
||||
try:
|
||||
from PySide6.QtCore import QObject, Signal
|
||||
except Exception:
|
||||
class _FallbackBoundSignal:
|
||||
def __init__(self):
|
||||
self._callbacks = []
|
||||
|
||||
def connect(self, callback):
|
||||
if callback is not None:
|
||||
self._callbacks.append(callback)
|
||||
|
||||
def emit(self, *args, **kwargs):
|
||||
for callback in list(self._callbacks):
|
||||
callback(*args, **kwargs)
|
||||
|
||||
class _FallbackSignalDescriptor:
|
||||
def __init__(self):
|
||||
self._storage_name = ""
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
self._storage_name = f"__fallback_signal_{name}"
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
if instance is None:
|
||||
return self
|
||||
signal = instance.__dict__.get(self._storage_name)
|
||||
if signal is None:
|
||||
signal = _FallbackBoundSignal()
|
||||
instance.__dict__[self._storage_name] = signal
|
||||
return signal
|
||||
|
||||
class QObject:
|
||||
pass
|
||||
|
||||
def Signal(*_args, **_kwargs):
|
||||
return _FallbackSignalDescriptor()
|
||||
|
||||
|
||||
def _version_key(version_text):
|
||||
@@ -67,10 +102,13 @@ def _extract_release_payload(release_data, repository_url, current_version):
|
||||
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
||||
latest_version = latest_tag.lstrip("vV").strip()
|
||||
html_url = release_data.get("html_url") or releases_url
|
||||
release_notes = (release_data.get("body") or "").strip()
|
||||
assets = release_data.get("assets") or []
|
||||
download_url = ""
|
||||
download_name = ""
|
||||
checksum_url = ""
|
||||
installer_url = ""
|
||||
installer_name = ""
|
||||
for asset in assets:
|
||||
url = asset.get("browser_download_url", "")
|
||||
if url.lower().endswith(".zip"):
|
||||
@@ -81,6 +119,16 @@ def _extract_release_payload(release_data, repository_url, current_version):
|
||||
download_url = assets[0].get("browser_download_url", "")
|
||||
download_name = assets[0].get("name", "")
|
||||
|
||||
for asset in assets:
|
||||
url = asset.get("browser_download_url", "")
|
||||
name = asset.get("name", "")
|
||||
name_lower = name.lower()
|
||||
if installer_url:
|
||||
break
|
||||
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
|
||||
installer_url = url
|
||||
installer_name = name
|
||||
|
||||
for asset in assets:
|
||||
name = asset.get("name", "").lower()
|
||||
if not name:
|
||||
@@ -100,8 +148,11 @@ def _extract_release_payload(release_data, repository_url, current_version):
|
||||
"current_version": current_version,
|
||||
"latest_tag": latest_tag,
|
||||
"release_url": html_url,
|
||||
"release_notes": release_notes,
|
||||
"download_url": download_url,
|
||||
"download_name": download_name,
|
||||
"installer_url": installer_url,
|
||||
"installer_name": installer_name,
|
||||
"checksum_url": checksum_url,
|
||||
"has_update": _is_newer_version(latest_version, current_version),
|
||||
}
|
||||
|
||||
@@ -1,60 +0,0 @@
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class AuthReloginSmokeTests(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.main_source = Path("main.py").read_text(encoding="utf-8")
|
||||
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
|
||||
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
|
||||
|
||||
def test_auth_command_builder_handles_frozen_and_source(self):
|
||||
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
|
||||
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
|
||||
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
|
||||
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
|
||||
|
||||
def test_auth_runs_via_qprocess(self):
|
||||
self.assertIn("process = QProcess(self)", self.main_source)
|
||||
self.assertIn("process.start(program, args)", self.main_source)
|
||||
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
|
||||
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
|
||||
|
||||
def test_force_relogin_has_backoff_and_event_log(self):
|
||||
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
|
||||
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
|
||||
self.assertIn("force_relogin_backoff", self.main_source)
|
||||
self.assertIn("force_relogin", self.main_source)
|
||||
|
||||
def test_auth_error_paths_trigger_force_relogin(self):
|
||||
self.assertIn(
|
||||
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
|
||||
self.main_source,
|
||||
)
|
||||
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
|
||||
self.assertIn('"load_chats",', self.main_source)
|
||||
self.assertIn('"execute_user_action",', self.main_source)
|
||||
self.assertIn('"set_user_admin",', self.main_source)
|
||||
|
||||
def test_tab_checkbox_lists_use_existing_attributes(self):
|
||||
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
|
||||
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
|
||||
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
|
||||
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
|
||||
|
||||
def test_update_check_actions_exist(self):
|
||||
self.assertIn("from app_version import APP_VERSION", self.main_source)
|
||||
self.assertIn("from services import (", self.main_source)
|
||||
self.assertIn("UpdateChecker", self.main_source)
|
||||
self.assertIn("detect_update_repository_url", self.main_source)
|
||||
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
|
||||
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
|
||||
self.assertIn("class UpdateChecker(QObject):", self.update_source)
|
||||
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
|
||||
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
|
||||
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
114
tests/test_main_contracts.py
Normal file
114
tests/test_main_contracts.py
Normal file
@@ -0,0 +1,114 @@
|
||||
import ast
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class MainContractsTests(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
|
||||
cls.module = ast.parse(cls.main_source)
|
||||
cls.vk_chat_manager = cls._find_class("VkChatManager")
|
||||
|
||||
@classmethod
|
||||
def _find_class(cls, class_name):
|
||||
for node in cls.module.body:
|
||||
if isinstance(node, ast.ClassDef) and node.name == class_name:
|
||||
return node
|
||||
raise AssertionError(f"Class {class_name} not found")
|
||||
|
||||
def _find_method(self, method_name):
|
||||
for node in self.vk_chat_manager.body:
|
||||
if isinstance(node, ast.FunctionDef) and node.name == method_name:
|
||||
return node
|
||||
self.fail(f"Method {method_name} not found")
|
||||
|
||||
def _iter_nodes(self, node):
|
||||
return ast.walk(node)
|
||||
|
||||
def test_auth_error_contexts_contains_only_supported_contexts(self):
|
||||
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
|
||||
for node in self.module.body:
|
||||
if isinstance(node, ast.Assign):
|
||||
for target in node.targets:
|
||||
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
|
||||
actual = set(ast.literal_eval(node.value))
|
||||
self.assertSetEqual(actual, expected_contexts)
|
||||
return
|
||||
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
|
||||
|
||||
def test_check_for_updates_has_reentry_guard(self):
|
||||
method = self._find_method("check_for_updates")
|
||||
has_guard = False
|
||||
for node in method.body:
|
||||
if not isinstance(node, ast.If):
|
||||
continue
|
||||
test = node.test
|
||||
if (
|
||||
isinstance(test, ast.Attribute)
|
||||
and isinstance(test.value, ast.Name)
|
||||
and test.value.id == "self"
|
||||
and test.attr == "_update_in_progress"
|
||||
):
|
||||
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
|
||||
if has_guard:
|
||||
break
|
||||
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
|
||||
|
||||
def test_check_for_updates_connects_thread_finish_handler(self):
|
||||
method = self._find_method("check_for_updates")
|
||||
for node in self._iter_nodes(method):
|
||||
if not isinstance(node, ast.Call):
|
||||
continue
|
||||
func = node.func
|
||||
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
|
||||
continue
|
||||
value = func.value
|
||||
if not (
|
||||
isinstance(value, ast.Attribute)
|
||||
and value.attr == "finished"
|
||||
and isinstance(value.value, ast.Attribute)
|
||||
and value.value.attr == "update_thread"
|
||||
and isinstance(value.value.value, ast.Name)
|
||||
and value.value.value.id == "self"
|
||||
):
|
||||
continue
|
||||
if len(node.args) != 1:
|
||||
continue
|
||||
arg = node.args[0]
|
||||
if (
|
||||
isinstance(arg, ast.Attribute)
|
||||
and arg.attr == "_on_update_thread_finished"
|
||||
and isinstance(arg.value, ast.Name)
|
||||
and arg.value.id == "self"
|
||||
):
|
||||
return
|
||||
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
|
||||
|
||||
def test_on_update_thread_finished_clears_update_state(self):
|
||||
method = self._find_method("_on_update_thread_finished")
|
||||
assignments = {}
|
||||
for node in method.body:
|
||||
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
|
||||
continue
|
||||
target = node.targets[0]
|
||||
if (
|
||||
isinstance(target, ast.Attribute)
|
||||
and isinstance(target.value, ast.Name)
|
||||
and target.value.id == "self"
|
||||
):
|
||||
assignments[target.attr] = node.value
|
||||
|
||||
self.assertIn("_update_in_progress", assignments)
|
||||
self.assertIn("update_checker", assignments)
|
||||
self.assertIn("update_thread", assignments)
|
||||
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
|
||||
self.assertIs(assignments["_update_in_progress"].value, False)
|
||||
self.assertIsInstance(assignments["update_checker"], ast.Constant)
|
||||
self.assertIsNone(assignments["update_checker"].value)
|
||||
self.assertIsInstance(assignments["update_thread"], ast.Constant)
|
||||
self.assertIsNone(assignments["update_thread"].value)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
97
tests/test_update_reentry_runtime.py
Normal file
97
tests/test_update_reentry_runtime.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
|
||||
class _DummySignal:
|
||||
def __init__(self):
|
||||
self._callbacks = []
|
||||
|
||||
def connect(self, callback):
|
||||
if callback is not None:
|
||||
self._callbacks.append(callback)
|
||||
|
||||
def emit(self, *args, **kwargs):
|
||||
for callback in list(self._callbacks):
|
||||
callback(*args, **kwargs)
|
||||
|
||||
|
||||
class _DummyThread:
|
||||
created = 0
|
||||
|
||||
def __init__(self, _parent=None):
|
||||
type(self).created += 1
|
||||
self.started = _DummySignal()
|
||||
self.finished = _DummySignal()
|
||||
|
||||
def start(self):
|
||||
self.started.emit()
|
||||
|
||||
def quit(self):
|
||||
self.finished.emit()
|
||||
|
||||
def deleteLater(self):
|
||||
return None
|
||||
|
||||
|
||||
class _DummyChecker:
|
||||
created = 0
|
||||
|
||||
def __init__(self, *_args, **_kwargs):
|
||||
type(self).created += 1
|
||||
self.check_finished = _DummySignal()
|
||||
self.check_failed = _DummySignal()
|
||||
|
||||
def moveToThread(self, _thread):
|
||||
return None
|
||||
|
||||
def run(self):
|
||||
return None
|
||||
|
||||
def deleteLater(self):
|
||||
return None
|
||||
|
||||
|
||||
class UpdateReentryRuntimeTests(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
try:
|
||||
import main # noqa: PLC0415
|
||||
except Exception as exc:
|
||||
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
|
||||
cls.main = main
|
||||
|
||||
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
|
||||
_DummyChecker.created = 0
|
||||
_DummyThread.created = 0
|
||||
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
|
||||
manager._update_in_progress = False
|
||||
manager._update_check_silent = False
|
||||
manager.update_channel = "stable"
|
||||
manager.update_repository_url = "https://example.com/org/repo"
|
||||
manager.update_checker = None
|
||||
manager.update_thread = None
|
||||
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
|
||||
manager._log_event = lambda *_args, **_kwargs: None
|
||||
manager._set_update_action_state = lambda *_args, **_kwargs: None
|
||||
|
||||
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
|
||||
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
|
||||
self.assertTrue(manager._update_in_progress)
|
||||
self.assertEqual(_DummyChecker.created, 1)
|
||||
self.assertEqual(_DummyThread.created, 1)
|
||||
first_thread = manager.update_thread
|
||||
|
||||
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
|
||||
self.assertEqual(_DummyChecker.created, 1)
|
||||
self.assertEqual(_DummyThread.created, 1)
|
||||
self.assertIs(manager.update_thread, first_thread)
|
||||
|
||||
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
|
||||
self.assertFalse(manager._update_in_progress)
|
||||
self.assertIsNone(manager.update_checker)
|
||||
self.assertIsNone(manager.update_thread)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -33,6 +33,7 @@ class UpdateServiceTests(unittest.TestCase):
|
||||
"assets": [
|
||||
{"name": "notes.txt", "browser_download_url": "https://example.com/notes.txt"},
|
||||
{"name": "AnabasisManager-win64.zip", "browser_download_url": "https://example.com/app.zip"},
|
||||
{"name": "AnabasisManager-setup-1.7.2.exe", "browser_download_url": "https://example.com/setup.exe"},
|
||||
{"name": "AnabasisManager-win64.zip.sha256", "browser_download_url": "https://example.com/app.zip.sha256"},
|
||||
],
|
||||
}
|
||||
@@ -43,6 +44,7 @@ class UpdateServiceTests(unittest.TestCase):
|
||||
)
|
||||
self.assertEqual(payload["latest_version"], "1.7.2")
|
||||
self.assertEqual(payload["download_url"], "https://example.com/app.zip")
|
||||
self.assertEqual(payload["installer_url"], "https://example.com/setup.exe")
|
||||
self.assertEqual(payload["checksum_url"], "https://example.com/app.zip.sha256")
|
||||
self.assertTrue(payload["has_update"])
|
||||
|
||||
|
||||
@@ -1,10 +1,79 @@
|
||||
import importlib.util
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
import types
|
||||
|
||||
|
||||
def _install_pyside6_stubs():
|
||||
pyside6_module = types.ModuleType("PySide6")
|
||||
pyside6_module.__path__ = [] # treat as package
|
||||
qtcore_module = types.ModuleType("PySide6.QtCore")
|
||||
qtgui_module = types.ModuleType("PySide6.QtGui")
|
||||
qtwidgets_module = types.ModuleType("PySide6.QtWidgets")
|
||||
|
||||
class _Signal:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def connect(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
class _QObject:
|
||||
pass
|
||||
|
||||
class _QThread:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
class _QTimer:
|
||||
@staticmethod
|
||||
def singleShot(*args, **kwargs):
|
||||
pass
|
||||
|
||||
class _QUrl:
|
||||
@staticmethod
|
||||
def fromLocalFile(path):
|
||||
return path
|
||||
|
||||
class _QDesktopServices:
|
||||
@staticmethod
|
||||
def openUrl(*args, **kwargs):
|
||||
return True
|
||||
|
||||
class _Widget:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
qtcore_module.QObject = _QObject
|
||||
qtcore_module.Qt = type("Qt", (), {})
|
||||
qtcore_module.QThread = _QThread
|
||||
qtcore_module.Signal = _Signal
|
||||
qtcore_module.QTimer = _QTimer
|
||||
qtcore_module.QUrl = _QUrl
|
||||
qtgui_module.QDesktopServices = _QDesktopServices
|
||||
qtwidgets_module.QApplication = _Widget
|
||||
qtwidgets_module.QLabel = _Widget
|
||||
qtwidgets_module.QProgressBar = _Widget
|
||||
qtwidgets_module.QVBoxLayout = _Widget
|
||||
qtwidgets_module.QWidget = _Widget
|
||||
qtwidgets_module.QPushButton = _Widget
|
||||
qtwidgets_module.QHBoxLayout = _Widget
|
||||
|
||||
# Force stubs even if real PySide6 was imported earlier in the process.
|
||||
for mod_name in list(sys.modules.keys()):
|
||||
if mod_name == "PySide6" or mod_name.startswith("PySide6."):
|
||||
del sys.modules[mod_name]
|
||||
|
||||
sys.modules["PySide6"] = pyside6_module
|
||||
sys.modules["PySide6.QtCore"] = qtcore_module
|
||||
sys.modules["PySide6.QtGui"] = qtgui_module
|
||||
sys.modules["PySide6.QtWidgets"] = qtwidgets_module
|
||||
|
||||
|
||||
MODULE_PATH = Path("updater_gui.py")
|
||||
_install_pyside6_stubs()
|
||||
SPEC = importlib.util.spec_from_file_location("updater_gui_under_test", MODULE_PATH)
|
||||
updater_gui = importlib.util.module_from_spec(SPEC)
|
||||
SPEC.loader.exec_module(updater_gui)
|
||||
|
||||
Reference in New Issue
Block a user