Compare commits
39 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 147988242f | |||
| 44deba1382 | |||
| eda8d43b9c | |||
| cf6d6bcbd0 | |||
| 61948a51c6 | |||
| 97c52c5a51 | |||
| 862c2c8899 | |||
| 1013a1ce38 | |||
| f15e71996b | |||
| 34272d01c8 | |||
| 4e6502bab7 | |||
| 89237590c7 | |||
| aca2bdfa85 | |||
| 9d40f0017e | |||
| 798eacbf9a | |||
| a9a394cf7d | |||
| e1e2f8f0e8 | |||
| 4d84d2ebe5 | |||
| 02350cfca1 | |||
| 68fa841857 | |||
| bca9007463 | |||
| 52b1301982 | |||
| 90b3b4fc9d | |||
| 190e67c931 | |||
| 2eb4c52b81 | |||
| 3d73a504d2 | |||
| 1524271be7 | |||
| 561cf43e09 | |||
| e8930f7550 | |||
| c8da0f9191 | |||
| 37ce500fd2 | |||
| 098a84e5bd | |||
| 5aa17c1a84 | |||
| dde14f3714 | |||
| fa5d4c6993 | |||
| f9e0225243 | |||
| c42b23bea5 | |||
| b52cdea425 | |||
| b7fad78a71 |
35
.gitea/workflows/ci.yml
Normal file
35
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
name: Desktop CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
tests:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
|
||||||
|
- name: Set up Python
|
||||||
|
uses: https://git.daemonlord.ru/actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: "3.13"
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt
|
||||||
|
|
||||||
|
- name: Validate syntax
|
||||||
|
run: |
|
||||||
|
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: |
|
||||||
|
python -m unittest discover -s tests -p "test_*.py" -v
|
||||||
154
.gitea/workflows/release.yml
Normal file
154
.gitea/workflows/release.yml
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
name: Desktop Release
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- master
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: windows
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||||
|
with:
|
||||||
|
fetch-depth: 0
|
||||||
|
tags: true
|
||||||
|
|
||||||
|
- name: Ensure Python 3.13
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
if (Get-Command python -ErrorAction SilentlyContinue) {
|
||||||
|
python --version
|
||||||
|
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
|
||||||
|
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
|
||||||
|
if (-not $pyExe) {
|
||||||
|
throw "Python 3.13 launcher is available, but interpreter was not found."
|
||||||
|
}
|
||||||
|
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||||
|
python --version
|
||||||
|
} else {
|
||||||
|
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m pip install --upgrade pip
|
||||||
|
pip install -r requirements.txt pyinstaller
|
||||||
|
|
||||||
|
- name: Extract app version
|
||||||
|
id: extract_version
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
|
||||||
|
Write-Host "Detected version: $version"
|
||||||
|
|
||||||
|
- name: Initialize release flow
|
||||||
|
id: flow_init
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||||
|
exit 0
|
||||||
|
|
||||||
|
- name: Stop if release already exists
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$tag = "v$version"
|
||||||
|
$apiUrl = "https://git.daemonlord.ru/api/v1/repos/${{ gitea.repository }}/releases?page=1&limit=100"
|
||||||
|
$headers = @{ Authorization = "token ${{ secrets.API_TOKEN }}" }
|
||||||
|
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||||
|
try {
|
||||||
|
$response = Invoke-RestMethod -Uri $apiUrl -Headers $headers -Method Get
|
||||||
|
$found = $false
|
||||||
|
foreach ($release in $response) {
|
||||||
|
if ($release.tag_name -eq $tag) {
|
||||||
|
$found = $true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ($found) {
|
||||||
|
Write-Host "Release $tag already exists, stopping job."
|
||||||
|
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
|
||||||
|
} else {
|
||||||
|
Write-Host "Release $tag not found, continuing workflow..."
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
Write-Host "Failed to query releases list, continuing workflow..."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
|
||||||
|
python -m unittest discover -s tests -p "test_*.py" -v
|
||||||
|
|
||||||
|
- name: Build release zip
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
python build.py
|
||||||
|
|
||||||
|
- name: Ensure archive exists
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$archivePath = "dist/AnabasisManager-$version.zip"
|
||||||
|
if (-not (Test-Path $archivePath)) {
|
||||||
|
throw "Archive not found: $archivePath"
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Generate SHA256 checksum
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$archiveName = "AnabasisManager-$version.zip"
|
||||||
|
$archivePath = "dist/$archiveName"
|
||||||
|
$checksumPath = "dist/$archiveName.sha256"
|
||||||
|
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
|
||||||
|
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
|
||||||
|
Write-Host "Checksum created: $checksumPath"
|
||||||
|
|
||||||
|
- name: Configure git identity
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
git config user.name "gitea-actions"
|
||||||
|
git config user.email "gitea-actions@daemonlord.ru"
|
||||||
|
|
||||||
|
- name: Create git tag
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
shell: powershell
|
||||||
|
run: |
|
||||||
|
$version = "${{ steps.extract_version.outputs.version }}"
|
||||||
|
$tag = "v$version"
|
||||||
|
$tagLine = (git ls-remote --tags origin "refs/tags/$tag" | Select-Object -First 1)
|
||||||
|
if ([string]::IsNullOrWhiteSpace($tagLine)) {
|
||||||
|
git tag "$tag"
|
||||||
|
git push origin "$tag"
|
||||||
|
} else {
|
||||||
|
Write-Host "Tag $tag already exists on origin, skipping tag push."
|
||||||
|
}
|
||||||
|
|
||||||
|
- name: Create Gitea Release
|
||||||
|
if: env.CONTINUE == 'true'
|
||||||
|
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
|
||||||
|
with:
|
||||||
|
server_url: https://git.daemonlord.ru
|
||||||
|
repository: ${{ gitea.repository }}
|
||||||
|
token: ${{ secrets.API_TOKEN }}
|
||||||
|
tag_name: v${{ steps.extract_version.outputs.version }}
|
||||||
|
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
|
||||||
|
body: |
|
||||||
|
Desktop release v${{ steps.extract_version.outputs.version }}
|
||||||
|
files: |
|
||||||
|
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
|
||||||
|
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
|
||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -10,3 +10,4 @@ __pycache__/
|
|||||||
tests/__pycache__/
|
tests/__pycache__/
|
||||||
build/
|
build/
|
||||||
dist/
|
dist/
|
||||||
|
AnabasisManager.spec
|
||||||
|
|||||||
1
app_version.py
Normal file
1
app_version.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
APP_VERSION = "2.0.0"
|
||||||
3
build.py
3
build.py
@@ -2,10 +2,11 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
|
from app_version import APP_VERSION
|
||||||
|
|
||||||
# --- Конфигурация ---
|
# --- Конфигурация ---
|
||||||
APP_NAME = "AnabasisManager"
|
APP_NAME = "AnabasisManager"
|
||||||
VERSION = "1.5.1" # Ваша версия
|
VERSION = APP_VERSION # Единая версия приложения
|
||||||
MAIN_SCRIPT = "main.py"
|
MAIN_SCRIPT = "main.py"
|
||||||
ICON_PATH = "icon.ico"
|
ICON_PATH = "icon.ico"
|
||||||
DIST_DIR = os.path.join("dist", APP_NAME)
|
DIST_DIR = os.path.join("dist", APP_NAME)
|
||||||
|
|||||||
719
main.py
719
main.py
@@ -1,28 +1,33 @@
|
|||||||
import sys
|
import sys
|
||||||
import base64
|
|
||||||
import ctypes
|
|
||||||
import shutil
|
|
||||||
from vk_api import VkApi
|
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
import shutil
|
||||||
import auth_webview
|
import auth_webview
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import threading
|
import threading
|
||||||
import urllib.error
|
from app_version import APP_VERSION
|
||||||
import urllib.request
|
from services import (
|
||||||
|
AutoUpdateService,
|
||||||
|
UpdateChecker,
|
||||||
|
VkService,
|
||||||
|
detect_update_repository_url,
|
||||||
|
load_chat_conversations,
|
||||||
|
load_token as token_store_load_token,
|
||||||
|
resolve_user_ids as chat_resolve_user_ids,
|
||||||
|
save_token as token_store_save_token,
|
||||||
|
)
|
||||||
|
from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog
|
||||||
|
from ui.main_window import instructions_text
|
||||||
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
|
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
|
||||||
QPushButton, QVBoxLayout, QWidget, QMessageBox,
|
QPushButton, QVBoxLayout, QWidget, QMessageBox,
|
||||||
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
|
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
|
||||||
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
|
QSizePolicy, QTabWidget, QDialog, QDialogButtonBox,
|
||||||
QProgressBar)
|
QProgressBar)
|
||||||
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject
|
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
|
||||||
from PySide6.QtGui import QIcon, QAction, QDesktopServices
|
from PySide6.QtGui import QIcon, QAction, QDesktopServices
|
||||||
from urllib.parse import urlparse, parse_qs, unquote
|
|
||||||
from vk_api.exceptions import VkApiError
|
from vk_api.exceptions import VkApiError
|
||||||
from PySide6.QtCore import QStandardPaths
|
from PySide6.QtCore import QStandardPaths
|
||||||
from PySide6.QtCore import QProcess
|
from PySide6.QtCore import QProcess
|
||||||
from ctypes import wintypes
|
|
||||||
|
|
||||||
# --- Управление токенами и настройками ---
|
# --- Управление токенами и настройками ---
|
||||||
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
|
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
|
||||||
@@ -33,7 +38,6 @@ LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
|
|||||||
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
|
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
|
||||||
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
|
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
|
||||||
AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
|
AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
|
||||||
APP_VERSION = "1.5.1"
|
|
||||||
# Legacy owner/repo format for GitHub-only fallback.
|
# Legacy owner/repo format for GitHub-only fallback.
|
||||||
UPDATE_REPOSITORY = ""
|
UPDATE_REPOSITORY = ""
|
||||||
# Full repository URL is preferred (supports GitHub/Gitea).
|
# Full repository URL is preferred (supports GitHub/Gitea).
|
||||||
@@ -41,138 +45,6 @@ UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
|
|||||||
UPDATE_REQUEST_TIMEOUT = 8
|
UPDATE_REQUEST_TIMEOUT = 8
|
||||||
|
|
||||||
|
|
||||||
class _DataBlob(ctypes.Structure):
|
|
||||||
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
|
|
||||||
|
|
||||||
|
|
||||||
def _version_key(version_text):
|
|
||||||
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
|
|
||||||
if not parts:
|
|
||||||
return (0, 0, 0)
|
|
||||||
while len(parts) < 3:
|
|
||||||
parts.append(0)
|
|
||||||
return tuple(parts[:3])
|
|
||||||
|
|
||||||
|
|
||||||
def _is_newer_version(latest_version, current_version):
|
|
||||||
latest_key = _version_key(latest_version)
|
|
||||||
current_key = _version_key(current_version)
|
|
||||||
return latest_key > current_key
|
|
||||||
|
|
||||||
|
|
||||||
def _sanitize_repo_url(value):
|
|
||||||
value = (value or "").strip()
|
|
||||||
if not value:
|
|
||||||
return ""
|
|
||||||
if "://" not in value and value.count("/") == 1:
|
|
||||||
return f"https://github.com/{value}"
|
|
||||||
parsed = urlparse(value)
|
|
||||||
if not parsed.scheme or not parsed.netloc:
|
|
||||||
return ""
|
|
||||||
clean_path = parsed.path.rstrip("/")
|
|
||||||
if clean_path.endswith(".git"):
|
|
||||||
clean_path = clean_path[:-4]
|
|
||||||
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
|
|
||||||
|
|
||||||
|
|
||||||
def _detect_update_repository_url():
|
|
||||||
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
|
|
||||||
if env_url:
|
|
||||||
return env_url
|
|
||||||
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
|
|
||||||
if env_repo:
|
|
||||||
return env_repo
|
|
||||||
configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL)
|
|
||||||
if configured_url:
|
|
||||||
return configured_url
|
|
||||||
configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY)
|
|
||||||
if configured_repo:
|
|
||||||
return configured_repo
|
|
||||||
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
|
|
||||||
if not os.path.exists(git_config_path):
|
|
||||||
return ""
|
|
||||||
try:
|
|
||||||
with open(git_config_path, "r", encoding="utf-8") as f:
|
|
||||||
content = f.read()
|
|
||||||
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
|
|
||||||
if match:
|
|
||||||
remote = match.group(1).strip()
|
|
||||||
if remote.startswith("git@"):
|
|
||||||
# git@host:owner/repo(.git)
|
|
||||||
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
|
|
||||||
if ssh_match:
|
|
||||||
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
|
|
||||||
return _sanitize_repo_url(remote)
|
|
||||||
except Exception:
|
|
||||||
return ""
|
|
||||||
return ""
|
|
||||||
|
|
||||||
|
|
||||||
_crypt32 = None
|
|
||||||
_kernel32 = None
|
|
||||||
if os.name == "nt":
|
|
||||||
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
|
|
||||||
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
|
||||||
_crypt32.CryptProtectData.argtypes = [
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
wintypes.LPCWSTR,
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
ctypes.c_void_p,
|
|
||||||
ctypes.c_void_p,
|
|
||||||
wintypes.DWORD,
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
]
|
|
||||||
_crypt32.CryptProtectData.restype = wintypes.BOOL
|
|
||||||
_crypt32.CryptUnprotectData.argtypes = [
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
ctypes.POINTER(wintypes.LPWSTR),
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
ctypes.c_void_p,
|
|
||||||
ctypes.c_void_p,
|
|
||||||
wintypes.DWORD,
|
|
||||||
ctypes.POINTER(_DataBlob),
|
|
||||||
]
|
|
||||||
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
|
|
||||||
|
|
||||||
|
|
||||||
def _crypt_protect_data(data, description=""):
|
|
||||||
buffer = ctypes.create_string_buffer(data)
|
|
||||||
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
|
||||||
data_out = _DataBlob()
|
|
||||||
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
|
|
||||||
raise ctypes.WinError(ctypes.get_last_error())
|
|
||||||
try:
|
|
||||||
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
|
||||||
finally:
|
|
||||||
_kernel32.LocalFree(data_out.pbData)
|
|
||||||
|
|
||||||
|
|
||||||
def _crypt_unprotect_data(data):
|
|
||||||
buffer = ctypes.create_string_buffer(data)
|
|
||||||
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
|
||||||
data_out = _DataBlob()
|
|
||||||
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
|
|
||||||
raise ctypes.WinError(ctypes.get_last_error())
|
|
||||||
try:
|
|
||||||
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
|
||||||
finally:
|
|
||||||
_kernel32.LocalFree(data_out.pbData)
|
|
||||||
|
|
||||||
|
|
||||||
def _encrypt_token(token):
|
|
||||||
if os.name != "nt":
|
|
||||||
raise RuntimeError("DPAPI is available only on Windows.")
|
|
||||||
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
|
|
||||||
return base64.b64encode(encrypted_bytes).decode("ascii")
|
|
||||||
|
|
||||||
|
|
||||||
def _decrypt_token(token_data):
|
|
||||||
if os.name != "nt":
|
|
||||||
raise RuntimeError("DPAPI is available only on Windows.")
|
|
||||||
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
|
|
||||||
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
|
|
||||||
return decrypted_bytes.decode("utf-8")
|
|
||||||
|
|
||||||
def get_resource_path(relative_path):
|
def get_resource_path(relative_path):
|
||||||
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
|
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
|
||||||
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
|
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
|
||||||
@@ -180,222 +52,6 @@ def get_resource_path(relative_path):
|
|||||||
# Для cx_Freeze и обычного запуска
|
# Для cx_Freeze и обычного запуска
|
||||||
return os.path.join(os.path.abspath("."), relative_path)
|
return os.path.join(os.path.abspath("."), relative_path)
|
||||||
|
|
||||||
def save_token(token, expires_in=0):
|
|
||||||
"""Сохраняет токен. Если expires_in=0, токен считается бессрочным."""
|
|
||||||
try:
|
|
||||||
expires_in = int(expires_in)
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
expires_in = 0
|
|
||||||
|
|
||||||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
|
||||||
|
|
||||||
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
|
|
||||||
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
|
|
||||||
|
|
||||||
stored_token = token
|
|
||||||
encrypted = False
|
|
||||||
if os.name == "nt":
|
|
||||||
try:
|
|
||||||
stored_token = _encrypt_token(token)
|
|
||||||
encrypted = True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Ошибка шифрования токена: {e}")
|
|
||||||
|
|
||||||
data = {
|
|
||||||
"token": stored_token,
|
|
||||||
"expiration_time": expiration_time,
|
|
||||||
"encrypted": encrypted
|
|
||||||
}
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(TOKEN_FILE, "w") as f:
|
|
||||||
json.dump(data, f)
|
|
||||||
|
|
||||||
status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString()
|
|
||||||
print(f"Токен сохранен. Срок действия: {status}")
|
|
||||||
return expiration_time
|
|
||||||
except IOError as e:
|
|
||||||
print(f"Ошибка сохранения токена: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def load_token():
|
|
||||||
"""Загружает токен и проверяет его валидность."""
|
|
||||||
try:
|
|
||||||
if not os.path.exists(TOKEN_FILE):
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
with open(TOKEN_FILE, "r") as f:
|
|
||||||
data = json.load(f)
|
|
||||||
|
|
||||||
token = data.get("token")
|
|
||||||
encrypted = data.get("encrypted", False)
|
|
||||||
if token and encrypted:
|
|
||||||
try:
|
|
||||||
token = _decrypt_token(token)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Ошибка расшифровки токена: {e}")
|
|
||||||
if os.path.exists(TOKEN_FILE):
|
|
||||||
os.remove(TOKEN_FILE)
|
|
||||||
return None, None
|
|
||||||
expiration_time = data.get("expiration_time")
|
|
||||||
|
|
||||||
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
|
|
||||||
if token and (expiration_time == 0 or expiration_time > time.time()):
|
|
||||||
return token, expiration_time
|
|
||||||
else:
|
|
||||||
if os.path.exists(TOKEN_FILE):
|
|
||||||
os.remove(TOKEN_FILE)
|
|
||||||
return None, None
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Ошибка загрузки: {e}")
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
class VkService:
|
|
||||||
def __init__(self):
|
|
||||||
self.session = None
|
|
||||||
self.api = None
|
|
||||||
|
|
||||||
def set_token(self, token):
|
|
||||||
self.session = VkApi(token=token)
|
|
||||||
self.api = self.session.get_api()
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self.session = None
|
|
||||||
self.api = None
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def build_auth_command(auth_url, output_path):
|
|
||||||
if getattr(sys, "frozen", False):
|
|
||||||
return sys.executable, ["--auth", auth_url, output_path]
|
|
||||||
return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def vk_error_code(exc):
|
|
||||||
error = getattr(exc, "error", None)
|
|
||||||
if isinstance(error, dict):
|
|
||||||
return error.get("error_code")
|
|
||||||
return getattr(exc, "code", None)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_auth_error(cls, exc, formatted_message=None):
|
|
||||||
code = cls.vk_error_code(exc)
|
|
||||||
if code == 5:
|
|
||||||
return True
|
|
||||||
message = (formatted_message or str(exc)).lower()
|
|
||||||
return "invalid_access_token" in message or "user authorization failed" in message
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def is_retryable_error(cls, exc):
|
|
||||||
return cls.vk_error_code(exc) in (6, 9, 10)
|
|
||||||
|
|
||||||
def call_with_retry(self, func, *args, **kwargs):
|
|
||||||
max_attempts = 5
|
|
||||||
for attempt in range(1, max_attempts + 1):
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except VkApiError as e:
|
|
||||||
if not self.is_retryable_error(e) or attempt == max_attempts:
|
|
||||||
raise
|
|
||||||
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
|
||||||
if self.vk_error_code(e) == 9:
|
|
||||||
delay = max(delay, 1.0)
|
|
||||||
time.sleep(delay)
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateChecker(QObject):
|
|
||||||
check_finished = Signal(dict)
|
|
||||||
check_failed = Signal(str)
|
|
||||||
|
|
||||||
def __init__(self, repository_url, current_version):
|
|
||||||
super().__init__()
|
|
||||||
self.repository_url = repository_url
|
|
||||||
self.current_version = current_version
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
if not self.repository_url:
|
|
||||||
self.check_failed.emit("Не задан URL репозитория обновлений.")
|
|
||||||
return
|
|
||||||
|
|
||||||
parsed = urlparse(self.repository_url)
|
|
||||||
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
||||||
repo_path = parsed.path.strip("/")
|
|
||||||
if not repo_path or repo_path.count("/") < 1:
|
|
||||||
self.check_failed.emit("Некорректный URL репозитория обновлений.")
|
|
||||||
return
|
|
||||||
|
|
||||||
if parsed.netloc.lower().endswith("github.com"):
|
|
||||||
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
|
|
||||||
else:
|
|
||||||
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
|
|
||||||
releases_url = f"{base_url}/{repo_path}/releases"
|
|
||||||
request = urllib.request.Request(
|
|
||||||
api_url,
|
|
||||||
headers={
|
|
||||||
"Accept": "application/vnd.github+json",
|
|
||||||
"User-Agent": "AnabasisManager-Updater",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response:
|
|
||||||
release_data = json.loads(response.read().decode("utf-8"))
|
|
||||||
except urllib.error.HTTPError as e:
|
|
||||||
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
|
|
||||||
return
|
|
||||||
except urllib.error.URLError as e:
|
|
||||||
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
|
|
||||||
return
|
|
||||||
|
|
||||||
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
|
||||||
latest_version = latest_tag.lstrip("vV").strip()
|
|
||||||
html_url = release_data.get("html_url") or releases_url
|
|
||||||
assets = release_data.get("assets") or []
|
|
||||||
download_url = ""
|
|
||||||
for asset in assets:
|
|
||||||
url = asset.get("browser_download_url", "")
|
|
||||||
if url.lower().endswith(".zip"):
|
|
||||||
download_url = url
|
|
||||||
break
|
|
||||||
if not download_url and assets:
|
|
||||||
download_url = assets[0].get("browser_download_url", "")
|
|
||||||
|
|
||||||
self.check_finished.emit(
|
|
||||||
{
|
|
||||||
"repository_url": self.repository_url,
|
|
||||||
"latest_version": latest_version,
|
|
||||||
"current_version": self.current_version,
|
|
||||||
"latest_tag": latest_tag,
|
|
||||||
"release_url": html_url,
|
|
||||||
"download_url": download_url,
|
|
||||||
"has_update": _is_newer_version(latest_version, self.current_version),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class MultiLinkDialog(QDialog):
|
|
||||||
def __init__(self, parent=None):
|
|
||||||
super().__init__(parent)
|
|
||||||
self.setWindowTitle("Ввод нескольких ссылок")
|
|
||||||
self.setMinimumSize(400, 300)
|
|
||||||
layout = QVBoxLayout(self)
|
|
||||||
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
|
|
||||||
layout.addWidget(label)
|
|
||||||
self.links_text_edit = QTextEdit()
|
|
||||||
layout.addWidget(self.links_text_edit)
|
|
||||||
button_box = QDialogButtonBox()
|
|
||||||
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
|
|
||||||
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
|
|
||||||
button_box.accepted.connect(self.accept)
|
|
||||||
button_box.rejected.connect(self.reject)
|
|
||||||
layout.addWidget(button_box)
|
|
||||||
|
|
||||||
def get_links(self):
|
|
||||||
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()]
|
|
||||||
|
|
||||||
|
|
||||||
class VkChatManager(QMainWindow):
|
class VkChatManager(QMainWindow):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@@ -422,7 +78,7 @@ class VkChatManager(QMainWindow):
|
|||||||
self._auth_ui_busy = False
|
self._auth_ui_busy = False
|
||||||
self._auth_relogin_in_progress = False
|
self._auth_relogin_in_progress = False
|
||||||
self._last_auth_relogin_ts = 0.0
|
self._last_auth_relogin_ts = 0.0
|
||||||
self.update_repository_url = _detect_update_repository_url()
|
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
|
||||||
self.update_checker = None
|
self.update_checker = None
|
||||||
self.update_thread = None
|
self.update_thread = None
|
||||||
self._update_check_silent = False
|
self._update_check_silent = False
|
||||||
@@ -447,15 +103,8 @@ class VkChatManager(QMainWindow):
|
|||||||
layout.setSpacing(5)
|
layout.setSpacing(5)
|
||||||
|
|
||||||
self.instructions = QTextBrowser()
|
self.instructions = QTextBrowser()
|
||||||
self.instructions.setPlainText(
|
|
||||||
"Инструкция:\n"
|
|
||||||
"1. Авторизуйтесь через VK.\n"
|
|
||||||
"2. Выберите чаты.\n"
|
|
||||||
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
|
|
||||||
"4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
|
|
||||||
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
|
|
||||||
)
|
|
||||||
self.instructions.setFixedHeight(120)
|
self.instructions.setFixedHeight(120)
|
||||||
|
self.instructions.setPlainText(instructions_text())
|
||||||
layout.addWidget(self.instructions)
|
layout.addWidget(self.instructions)
|
||||||
|
|
||||||
layout.addWidget(QLabel("Access Token VK:"))
|
layout.addWidget(QLabel("Access Token VK:"))
|
||||||
@@ -543,7 +192,7 @@ class VkChatManager(QMainWindow):
|
|||||||
self.resolve_timer.start()
|
self.resolve_timer.start()
|
||||||
|
|
||||||
def open_multi_link_dialog(self):
|
def open_multi_link_dialog(self):
|
||||||
dialog = MultiLinkDialog(self)
|
dialog = UIMultiLinkDialog(self)
|
||||||
if dialog.exec():
|
if dialog.exec():
|
||||||
links = dialog.get_links()
|
links = dialog.get_links()
|
||||||
if links:
|
if links:
|
||||||
@@ -561,55 +210,6 @@ class VkChatManager(QMainWindow):
|
|||||||
return
|
return
|
||||||
self._process_links_list([url])
|
self._process_links_list([url])
|
||||||
|
|
||||||
def _process_links_list(self, links_list):
|
|
||||||
if not self.vk:
|
|
||||||
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.user_ids_to_process.clear()
|
|
||||||
resolved_ids = []
|
|
||||||
failed_links = []
|
|
||||||
|
|
||||||
self._set_busy(True, "Статус: Определяю ID...")
|
|
||||||
try:
|
|
||||||
for link in links_list:
|
|
||||||
try:
|
|
||||||
path = urlparse(link).path
|
|
||||||
screen_name = path.split('/')[-1] if path else ''
|
|
||||||
if not screen_name and len(path.split('/')) > 1:
|
|
||||||
screen_name = path.split('/')[-2]
|
|
||||||
|
|
||||||
if not screen_name:
|
|
||||||
failed_links.append(link)
|
|
||||||
continue
|
|
||||||
|
|
||||||
resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
|
|
||||||
if resolved_object and resolved_object.get('type') == 'user':
|
|
||||||
resolved_ids.append(resolved_object['object_id'])
|
|
||||||
else:
|
|
||||||
failed_links.append(link)
|
|
||||||
except VkApiError as e:
|
|
||||||
if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"):
|
|
||||||
return
|
|
||||||
failed_links.append(f"{link} ({self._format_vk_error(e)})")
|
|
||||||
except Exception:
|
|
||||||
failed_links.append(link)
|
|
||||||
finally:
|
|
||||||
self._set_busy(False)
|
|
||||||
|
|
||||||
self.user_ids_to_process = resolved_ids
|
|
||||||
|
|
||||||
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
|
|
||||||
if len(links_list) > 1:
|
|
||||||
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
|
|
||||||
|
|
||||||
if failed_links:
|
|
||||||
QMessageBox.warning(self, "Ошибка получения ID",
|
|
||||||
f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links))
|
|
||||||
|
|
||||||
self.status_label.setText(status_message)
|
|
||||||
self.set_ui_state(self.token is not None)
|
|
||||||
|
|
||||||
def create_menu(self):
|
def create_menu(self):
|
||||||
"""Создает верхнее меню."""
|
"""Создает верхнее меню."""
|
||||||
menu_bar = self.menuBar()
|
menu_bar = self.menuBar()
|
||||||
@@ -636,6 +236,42 @@ class VkChatManager(QMainWindow):
|
|||||||
tools_menu.addAction(logout_action)
|
tools_menu.addAction(logout_action)
|
||||||
self.logout_action = logout_action
|
self.logout_action = logout_action
|
||||||
|
|
||||||
|
help_menu = menu_bar.addMenu("Справка")
|
||||||
|
about_action = QAction("О приложении", self)
|
||||||
|
about_action.setStatusTip("Показать информацию о приложении")
|
||||||
|
about_action.triggered.connect(self.show_about_dialog)
|
||||||
|
help_menu.addAction(about_action)
|
||||||
|
self.about_action = about_action
|
||||||
|
|
||||||
|
def show_about_dialog(self):
|
||||||
|
dialog = QDialog(self)
|
||||||
|
dialog.setWindowTitle("О приложении")
|
||||||
|
dialog.setMinimumWidth(460)
|
||||||
|
repo_url = self.update_repository_url
|
||||||
|
if repo_url:
|
||||||
|
repo_html = f'<a href="{repo_url}">{repo_url}</a>'
|
||||||
|
else:
|
||||||
|
repo_html = "не указан"
|
||||||
|
content = QLabel(
|
||||||
|
f"<b>Anabasis Chat Manager</b><br>"
|
||||||
|
f"Версия: {APP_VERSION}<br><br>"
|
||||||
|
"Инструмент для массового управления пользователями в чатах VK.<br>"
|
||||||
|
"Поддерживается проверка обновлений и автообновление Windows-сборки.<br><br>"
|
||||||
|
f"Репозиторий: {repo_html}"
|
||||||
|
)
|
||||||
|
content.setTextFormat(Qt.RichText)
|
||||||
|
content.setTextInteractionFlags(Qt.TextBrowserInteraction)
|
||||||
|
content.setOpenExternalLinks(True)
|
||||||
|
content.setWordWrap(True)
|
||||||
|
|
||||||
|
button_box = QDialogButtonBox(QDialogButtonBox.Ok, parent=dialog)
|
||||||
|
button_box.accepted.connect(dialog.accept)
|
||||||
|
|
||||||
|
layout = QVBoxLayout(dialog)
|
||||||
|
layout.addWidget(content)
|
||||||
|
layout.addWidget(button_box)
|
||||||
|
dialog.exec()
|
||||||
|
|
||||||
def create_chat_tab(self):
|
def create_chat_tab(self):
|
||||||
# This implementation correctly creates a scrollable area for chat lists.
|
# This implementation correctly creates a scrollable area for chat lists.
|
||||||
tab_content_widget = QWidget()
|
tab_content_widget = QWidget()
|
||||||
@@ -672,7 +308,7 @@ class VkChatManager(QMainWindow):
|
|||||||
self._set_update_action_state(True)
|
self._set_update_action_state(True)
|
||||||
self.status_label.setText("Статус: проверка обновлений...")
|
self.status_label.setText("Статус: проверка обновлений...")
|
||||||
|
|
||||||
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION)
|
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT)
|
||||||
self.update_checker.check_finished.connect(self._on_update_check_finished)
|
self.update_checker.check_finished.connect(self._on_update_check_finished)
|
||||||
self.update_checker.check_failed.connect(self._on_update_check_failed)
|
self.update_checker.check_failed.connect(self._on_update_check_failed)
|
||||||
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
|
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
|
||||||
@@ -695,15 +331,23 @@ class VkChatManager(QMainWindow):
|
|||||||
f"Доступная версия: {latest_version}\n\n"
|
f"Доступная версия: {latest_version}\n\n"
|
||||||
"Открыть страницу загрузки?"
|
"Открыть страницу загрузки?"
|
||||||
)
|
)
|
||||||
|
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)
|
||||||
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole)
|
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole)
|
||||||
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole)
|
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole)
|
||||||
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole)
|
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole)
|
||||||
message_box.setDefaultButton(download_button)
|
message_box.setDefaultButton(update_now_button)
|
||||||
message_box.exec()
|
message_box.exec()
|
||||||
|
|
||||||
clicked = message_box.clickedButton()
|
clicked = message_box.clickedButton()
|
||||||
download_url = result.get("download_url")
|
download_url = result.get("download_url")
|
||||||
|
checksum_url = result.get("checksum_url")
|
||||||
|
download_name = result.get("download_name")
|
||||||
release_url = result.get("release_url")
|
release_url = result.get("release_url")
|
||||||
|
if clicked is update_now_button and download_url:
|
||||||
|
if not self._start_auto_update(download_url, latest_version, checksum_url, download_name):
|
||||||
|
if release_url:
|
||||||
|
QDesktopServices.openUrl(QUrl(release_url))
|
||||||
|
return
|
||||||
if clicked is download_button and download_url:
|
if clicked is download_button and download_url:
|
||||||
QDesktopServices.openUrl(QUrl(download_url))
|
QDesktopServices.openUrl(QUrl(download_url))
|
||||||
elif clicked in (download_button, releases_button) and release_url:
|
elif clicked in (download_button, releases_button) and release_url:
|
||||||
@@ -933,7 +577,7 @@ class VkChatManager(QMainWindow):
|
|||||||
raise last_error
|
raise last_error
|
||||||
|
|
||||||
def load_saved_token_on_startup(self):
|
def load_saved_token_on_startup(self):
|
||||||
loaded_token, expiration_time = load_token()
|
loaded_token, expiration_time = token_store_load_token(TOKEN_FILE)
|
||||||
if loaded_token:
|
if loaded_token:
|
||||||
self.handle_auth_token_on_load(loaded_token, expiration_time)
|
self.handle_auth_token_on_load(loaded_token, expiration_time)
|
||||||
else:
|
else:
|
||||||
@@ -947,7 +591,7 @@ class VkChatManager(QMainWindow):
|
|||||||
checkbox.setChecked(checked)
|
checkbox.setChecked(checked)
|
||||||
|
|
||||||
def _build_auth_command(self, auth_url, output_path):
|
def _build_auth_command(self, auth_url, output_path):
|
||||||
return self.vk_service.build_auth_command(auth_url, output_path)
|
return self.vk_service.build_auth_command(auth_url, output_path, entry_script_path=os.path.abspath(__file__))
|
||||||
|
|
||||||
def _on_auth_process_error(self, process_error):
|
def _on_auth_process_error(self, process_error):
|
||||||
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
|
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
|
||||||
@@ -1059,7 +703,12 @@ class VkChatManager(QMainWindow):
|
|||||||
|
|
||||||
self.token = token
|
self.token = token
|
||||||
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
|
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
|
||||||
self.token_expiration_time = save_token(self.token, expires_in)
|
self.token_expiration_time = token_store_save_token(
|
||||||
|
self.token,
|
||||||
|
TOKEN_FILE,
|
||||||
|
APP_DATA_DIR,
|
||||||
|
expires_in=expires_in,
|
||||||
|
)
|
||||||
|
|
||||||
self.token_input.setText(self.token[:50] + "...")
|
self.token_input.setText(self.token[:50] + "...")
|
||||||
self.status_label.setText("Статус: авторизован")
|
self.status_label.setText("Статус: авторизован")
|
||||||
@@ -1167,83 +816,6 @@ class VkChatManager(QMainWindow):
|
|||||||
def _vk_call_with_retry(self, func, *args, **kwargs):
|
def _vk_call_with_retry(self, func, *args, **kwargs):
|
||||||
return self.vk_service.call_with_retry(func, *args, **kwargs)
|
return self.vk_service.call_with_retry(func, *args, **kwargs)
|
||||||
|
|
||||||
def load_chats(self):
|
|
||||||
self._clear_chat_tabs()
|
|
||||||
|
|
||||||
# Get the checkbox layouts from each tab
|
|
||||||
layouts = [
|
|
||||||
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
|
|
||||||
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
|
|
||||||
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
|
|
||||||
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
|
|
||||||
self.other_tab.findChild(QWidget).findChild(QVBoxLayout)
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._set_busy(True, "Статус: загрузка чатов...")
|
|
||||||
conversations = []
|
|
||||||
start_from = None
|
|
||||||
seen_start_tokens = set()
|
|
||||||
while True:
|
|
||||||
params = {"count": 200, "filter": "all"}
|
|
||||||
if start_from:
|
|
||||||
if start_from in seen_start_tokens:
|
|
||||||
break
|
|
||||||
params["start_from"] = start_from
|
|
||||||
seen_start_tokens.add(start_from)
|
|
||||||
|
|
||||||
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
|
|
||||||
page_items = response.get("items", [])
|
|
||||||
if not page_items:
|
|
||||||
break
|
|
||||||
conversations.extend(page_items)
|
|
||||||
|
|
||||||
start_from = response.get("next_from")
|
|
||||||
if not start_from:
|
|
||||||
break
|
|
||||||
for conv in conversations:
|
|
||||||
if conv['conversation']['peer']['type'] == 'chat':
|
|
||||||
chat_id = conv['conversation']['peer']['local_id']
|
|
||||||
title = conv['conversation']['chat_settings']['title']
|
|
||||||
self.chats.append({'id': chat_id, 'title': title})
|
|
||||||
|
|
||||||
checkbox = QCheckBox(f"{title} (id: {chat_id})")
|
|
||||||
checkbox.setProperty("chat_id", chat_id)
|
|
||||||
|
|
||||||
# Insert checkbox at the top of the layout (before the stretch)
|
|
||||||
if "AG офис" in title:
|
|
||||||
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
|
|
||||||
self.office_chat_checkboxes.append(checkbox)
|
|
||||||
elif "AG розница" in title:
|
|
||||||
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
|
|
||||||
self.retail_chat_checkboxes.append(checkbox)
|
|
||||||
elif "AG склад" in title:
|
|
||||||
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
|
|
||||||
self.warehouse_chat_checkboxes.append(checkbox)
|
|
||||||
elif "AG кофейни" in title:
|
|
||||||
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
|
|
||||||
self.coffee_chat_checkboxes.append(checkbox)
|
|
||||||
else:
|
|
||||||
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
|
|
||||||
self.other_chat_checkboxes.append(checkbox)
|
|
||||||
|
|
||||||
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
|
|
||||||
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
|
|
||||||
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
|
|
||||||
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
|
|
||||||
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
|
|
||||||
except VkApiError as e:
|
|
||||||
if self._handle_vk_api_error(
|
|
||||||
"load_chats",
|
|
||||||
e,
|
|
||||||
action_name="загрузки чатов",
|
|
||||||
):
|
|
||||||
return
|
|
||||||
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
|
|
||||||
self.set_ui_state(False)
|
|
||||||
finally:
|
|
||||||
self._set_busy(False)
|
|
||||||
|
|
||||||
def get_user_info_by_id(self, user_id):
|
def get_user_info_by_id(self, user_id):
|
||||||
try:
|
try:
|
||||||
user = self.vk.users.get(user_ids=user_id)[0]
|
user = self.vk.users.get(user_ids=user_id)[0]
|
||||||
@@ -1434,6 +1006,151 @@ class VkChatManager(QMainWindow):
|
|||||||
self.user_ids_to_process.clear()
|
self.user_ids_to_process.clear()
|
||||||
self.set_ui_state(self.token is not None)
|
self.set_ui_state(self.token is not None)
|
||||||
|
|
||||||
|
# Refactor overrides: keep logic in service modules and thin UI orchestration here.
|
||||||
|
def _process_links_list(self, links_list):
|
||||||
|
if not self.vk:
|
||||||
|
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.user_ids_to_process.clear()
|
||||||
|
resolved_ids = []
|
||||||
|
failed_links = []
|
||||||
|
|
||||||
|
self._set_busy(True, "Статус: Определяю ID...")
|
||||||
|
try:
|
||||||
|
resolved_ids, failed_items = chat_resolve_user_ids(
|
||||||
|
self._vk_call_with_retry,
|
||||||
|
self.vk,
|
||||||
|
links_list,
|
||||||
|
)
|
||||||
|
for failed_link, failed_exc in failed_items:
|
||||||
|
if isinstance(failed_exc, VkApiError):
|
||||||
|
if self._handle_vk_api_error("resolveScreenName", failed_exc, action_name="получения ID пользователей"):
|
||||||
|
return
|
||||||
|
failed_links.append(f"{failed_link} ({self._format_vk_error(failed_exc)})")
|
||||||
|
else:
|
||||||
|
failed_links.append(failed_link)
|
||||||
|
finally:
|
||||||
|
self._set_busy(False)
|
||||||
|
|
||||||
|
self.user_ids_to_process = resolved_ids
|
||||||
|
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
|
||||||
|
if len(links_list) > 1:
|
||||||
|
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
|
||||||
|
|
||||||
|
if failed_links:
|
||||||
|
QMessageBox.warning(
|
||||||
|
self,
|
||||||
|
"Ошибка получения ID",
|
||||||
|
"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links),
|
||||||
|
)
|
||||||
|
|
||||||
|
self.status_label.setText(status_message)
|
||||||
|
self.set_ui_state(self.token is not None)
|
||||||
|
|
||||||
|
def load_chats(self):
|
||||||
|
self._clear_chat_tabs()
|
||||||
|
|
||||||
|
layouts = [
|
||||||
|
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||||||
|
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||||||
|
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||||||
|
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||||||
|
self.other_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._set_busy(True, "Статус: загрузка чатов...")
|
||||||
|
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
|
||||||
|
for conv in conversations:
|
||||||
|
if conv["conversation"]["peer"]["type"] != "chat":
|
||||||
|
continue
|
||||||
|
|
||||||
|
chat_id = conv["conversation"]["peer"]["local_id"]
|
||||||
|
title = conv["conversation"]["chat_settings"]["title"]
|
||||||
|
self.chats.append({"id": chat_id, "title": title})
|
||||||
|
checkbox = QCheckBox(f"{title} (id: {chat_id})")
|
||||||
|
checkbox.setProperty("chat_id", chat_id)
|
||||||
|
|
||||||
|
if "AG офис" in title:
|
||||||
|
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
|
||||||
|
self.office_chat_checkboxes.append(checkbox)
|
||||||
|
elif "AG розница" in title:
|
||||||
|
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
|
||||||
|
self.retail_chat_checkboxes.append(checkbox)
|
||||||
|
elif "AG склад" in title:
|
||||||
|
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
|
||||||
|
self.warehouse_chat_checkboxes.append(checkbox)
|
||||||
|
elif "AG кофейни" in title:
|
||||||
|
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
|
||||||
|
self.coffee_chat_checkboxes.append(checkbox)
|
||||||
|
else:
|
||||||
|
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
|
||||||
|
self.other_chat_checkboxes.append(checkbox)
|
||||||
|
|
||||||
|
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
|
||||||
|
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
|
||||||
|
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
|
||||||
|
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
|
||||||
|
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
|
||||||
|
except VkApiError as e:
|
||||||
|
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
|
||||||
|
return
|
||||||
|
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
|
||||||
|
self.set_ui_state(False)
|
||||||
|
finally:
|
||||||
|
self._set_busy(False)
|
||||||
|
|
||||||
|
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
|
||||||
|
if os.name != "nt":
|
||||||
|
QMessageBox.information(
|
||||||
|
self,
|
||||||
|
"Автообновление",
|
||||||
|
"Автообновление пока поддерживается только в Windows-сборке.",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
if not getattr(sys, "frozen", False):
|
||||||
|
QMessageBox.information(
|
||||||
|
self,
|
||||||
|
"Автообновление",
|
||||||
|
"Автообновление доступно в собранной версии приложения (.exe).",
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
if not download_url:
|
||||||
|
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
|
||||||
|
self._set_busy(True)
|
||||||
|
try:
|
||||||
|
work_dir, source_dir = AutoUpdateService.prepare_update(
|
||||||
|
download_url=download_url,
|
||||||
|
checksum_url=checksum_url,
|
||||||
|
download_name=download_name,
|
||||||
|
)
|
||||||
|
app_exe = sys.executable
|
||||||
|
script_path = AutoUpdateService.build_update_script(
|
||||||
|
app_dir=os.path.dirname(app_exe),
|
||||||
|
source_dir=source_dir,
|
||||||
|
exe_name=os.path.basename(app_exe),
|
||||||
|
target_pid=os.getpid(),
|
||||||
|
)
|
||||||
|
AutoUpdateService.launch_update_script(script_path, work_dir)
|
||||||
|
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
|
||||||
|
QMessageBox.information(
|
||||||
|
self,
|
||||||
|
"Обновление запущено",
|
||||||
|
"Обновление скачано. Приложение будет перезапущено.",
|
||||||
|
)
|
||||||
|
QApplication.instance().quit()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
self._log_event("auto_update_failed", str(e), level="ERROR")
|
||||||
|
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
self._set_busy(False)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if "--auth" in sys.argv:
|
if "--auth" in sys.argv:
|
||||||
|
|||||||
5
services/__init__.py
Normal file
5
services/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from .auto_update_service import AutoUpdateService
|
||||||
|
from .chat_actions import load_chat_conversations, resolve_user_ids
|
||||||
|
from .token_store import load_token, save_token
|
||||||
|
from .update_service import UpdateChecker, detect_update_repository_url
|
||||||
|
from .vk_service import VkService
|
||||||
166
services/auto_update_service.py
Normal file
166
services/auto_update_service.py
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import tempfile
|
||||||
|
import urllib.request
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
|
||||||
|
class AutoUpdateService:
|
||||||
|
@staticmethod
|
||||||
|
def download_update_archive(download_url, destination_path):
|
||||||
|
request = urllib.request.Request(
|
||||||
|
download_url,
|
||||||
|
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=60) as response:
|
||||||
|
with open(destination_path, "wb") as f:
|
||||||
|
shutil.copyfileobj(response, f)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def download_update_text(url):
|
||||||
|
request = urllib.request.Request(
|
||||||
|
url,
|
||||||
|
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=30) as response:
|
||||||
|
return response.read().decode("utf-8", errors="replace")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def sha256_file(path):
|
||||||
|
digest = hashlib.sha256()
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||||
|
digest.update(chunk)
|
||||||
|
return digest.hexdigest().lower()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_sha256_from_text(checksum_text, target_file_name):
|
||||||
|
target = (target_file_name or "").strip().lower()
|
||||||
|
for raw_line in checksum_text.splitlines():
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
|
||||||
|
if not match:
|
||||||
|
continue
|
||||||
|
checksum = match.group(1).lower()
|
||||||
|
if not target:
|
||||||
|
return checksum
|
||||||
|
line_lower = line.lower()
|
||||||
|
if target in line_lower:
|
||||||
|
return checksum
|
||||||
|
if os.path.basename(target) in line_lower:
|
||||||
|
return checksum
|
||||||
|
return ""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
|
||||||
|
if not checksum_url:
|
||||||
|
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
|
||||||
|
checksum_text = cls.download_update_text(checksum_url)
|
||||||
|
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
|
||||||
|
if not expected_hash:
|
||||||
|
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
|
||||||
|
actual_hash = cls.sha256_file(zip_path)
|
||||||
|
if actual_hash != expected_hash:
|
||||||
|
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def locate_extracted_root(extracted_dir):
|
||||||
|
entries = []
|
||||||
|
for name in os.listdir(extracted_dir):
|
||||||
|
full_path = os.path.join(extracted_dir, name)
|
||||||
|
if os.path.isdir(full_path):
|
||||||
|
entries.append(full_path)
|
||||||
|
if len(entries) == 1:
|
||||||
|
candidate = entries[0]
|
||||||
|
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
|
||||||
|
return candidate
|
||||||
|
return extracted_dir
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_update_script(app_dir, source_dir, exe_name, target_pid):
|
||||||
|
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
|
||||||
|
script_lines = [
|
||||||
|
"@echo off",
|
||||||
|
"setlocal EnableExtensions",
|
||||||
|
f"set \"APP_DIR={app_dir}\"",
|
||||||
|
f"set \"SRC_DIR={source_dir}\"",
|
||||||
|
f"set \"EXE_NAME={exe_name}\"",
|
||||||
|
f"set \"TARGET_PID={target_pid}\"",
|
||||||
|
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
|
||||||
|
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
|
||||||
|
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
|
||||||
|
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
|
||||||
|
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
|
||||||
|
" exit /b 3",
|
||||||
|
")",
|
||||||
|
"set /a WAIT_LOOPS=0",
|
||||||
|
":wait_for_exit",
|
||||||
|
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
|
||||||
|
"if %ERRORLEVEL% EQU 0 (",
|
||||||
|
" set /a WAIT_LOOPS+=1",
|
||||||
|
" if %WAIT_LOOPS% GEQ 180 (",
|
||||||
|
" echo Timeout waiting for process %TARGET_PID% to exit >> \"%UPDATE_LOG%\"",
|
||||||
|
" goto :backup",
|
||||||
|
" )",
|
||||||
|
" timeout /t 1 /nobreak >nul",
|
||||||
|
" goto :wait_for_exit",
|
||||||
|
")",
|
||||||
|
":backup",
|
||||||
|
"timeout /t 1 /nobreak >nul",
|
||||||
|
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
|
||||||
|
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||||
|
"set \"RC=%ERRORLEVEL%\"",
|
||||||
|
"if %RC% GEQ 8 goto :backup_error",
|
||||||
|
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
|
||||||
|
"set \"RC=%ERRORLEVEL%\"",
|
||||||
|
"if %RC% GEQ 8 goto :rollback",
|
||||||
|
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||||
|
"timeout /t 2 /nobreak >nul",
|
||||||
|
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
|
||||||
|
"if %ERRORLEVEL% NEQ 0 goto :rollback",
|
||||||
|
"echo Update success >> \"%UPDATE_LOG%\"",
|
||||||
|
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
|
||||||
|
"exit /b 0",
|
||||||
|
":rollback",
|
||||||
|
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||||
|
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||||
|
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b 2",
|
||||||
|
":backup_error",
|
||||||
|
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
|
||||||
|
"exit /b %RC%",
|
||||||
|
]
|
||||||
|
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
|
||||||
|
f.write("\r\n".join(script_lines) + "\r\n")
|
||||||
|
return script_path
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def launch_update_script(script_path, work_dir):
|
||||||
|
creation_flags = 0
|
||||||
|
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
|
||||||
|
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
|
||||||
|
if hasattr(subprocess, "DETACHED_PROCESS"):
|
||||||
|
creation_flags |= subprocess.DETACHED_PROCESS
|
||||||
|
subprocess.Popen(
|
||||||
|
["cmd.exe", "/c", script_path],
|
||||||
|
cwd=work_dir,
|
||||||
|
creationflags=creation_flags,
|
||||||
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def prepare_update(cls, download_url, checksum_url, download_name):
|
||||||
|
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
|
||||||
|
zip_path = os.path.join(work_dir, "update.zip")
|
||||||
|
unpack_dir = os.path.join(work_dir, "extracted")
|
||||||
|
cls.download_update_archive(download_url, zip_path)
|
||||||
|
cls.verify_update_checksum(zip_path, checksum_url, download_name)
|
||||||
|
os.makedirs(unpack_dir, exist_ok=True)
|
||||||
|
with zipfile.ZipFile(zip_path, "r") as archive:
|
||||||
|
archive.extractall(unpack_dir)
|
||||||
|
source_dir = cls.locate_extracted_root(unpack_dir)
|
||||||
|
return work_dir, source_dir
|
||||||
46
services/chat_actions.py
Normal file
46
services/chat_actions.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_user_ids(vk_call_with_retry, vk_api, links):
|
||||||
|
resolved_ids = []
|
||||||
|
failed_links = []
|
||||||
|
for link in links:
|
||||||
|
try:
|
||||||
|
path = urlparse(link).path
|
||||||
|
screen_name = path.split("/")[-1] if path else ""
|
||||||
|
if not screen_name and len(path.split("/")) > 1:
|
||||||
|
screen_name = path.split("/")[-2]
|
||||||
|
if not screen_name:
|
||||||
|
failed_links.append((link, None))
|
||||||
|
continue
|
||||||
|
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
|
||||||
|
if resolved_object and resolved_object.get("type") == "user":
|
||||||
|
resolved_ids.append(resolved_object["object_id"])
|
||||||
|
else:
|
||||||
|
failed_links.append((link, None))
|
||||||
|
except Exception as e:
|
||||||
|
failed_links.append((link, e))
|
||||||
|
return resolved_ids, failed_links
|
||||||
|
|
||||||
|
|
||||||
|
def load_chat_conversations(vk_call_with_retry, vk_api):
|
||||||
|
conversations = []
|
||||||
|
start_from = None
|
||||||
|
seen_start_tokens = set()
|
||||||
|
while True:
|
||||||
|
params = {"count": 200, "filter": "all"}
|
||||||
|
if start_from:
|
||||||
|
if start_from in seen_start_tokens:
|
||||||
|
break
|
||||||
|
params["start_from"] = start_from
|
||||||
|
seen_start_tokens.add(start_from)
|
||||||
|
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
|
||||||
|
page_items = response.get("items", [])
|
||||||
|
if not page_items:
|
||||||
|
break
|
||||||
|
conversations.extend(page_items)
|
||||||
|
start_from = response.get("next_from")
|
||||||
|
if not start_from:
|
||||||
|
break
|
||||||
|
return conversations
|
||||||
|
|
||||||
136
services/token_store.py
Normal file
136
services/token_store.py
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
import base64
|
||||||
|
import ctypes
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from ctypes import wintypes
|
||||||
|
|
||||||
|
|
||||||
|
class _DataBlob(ctypes.Structure):
|
||||||
|
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
|
||||||
|
|
||||||
|
|
||||||
|
_crypt32 = None
|
||||||
|
_kernel32 = None
|
||||||
|
if os.name == "nt":
|
||||||
|
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
|
||||||
|
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
||||||
|
_crypt32.CryptProtectData.argtypes = [
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
wintypes.LPCWSTR,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
wintypes.DWORD,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
]
|
||||||
|
_crypt32.CryptProtectData.restype = wintypes.BOOL
|
||||||
|
_crypt32.CryptUnprotectData.argtypes = [
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.POINTER(wintypes.LPWSTR),
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
ctypes.c_void_p,
|
||||||
|
ctypes.c_void_p,
|
||||||
|
wintypes.DWORD,
|
||||||
|
ctypes.POINTER(_DataBlob),
|
||||||
|
]
|
||||||
|
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
|
||||||
|
|
||||||
|
|
||||||
|
def _crypt_protect_data(data, description=""):
|
||||||
|
buffer = ctypes.create_string_buffer(data)
|
||||||
|
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||||||
|
data_out = _DataBlob()
|
||||||
|
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
|
||||||
|
raise ctypes.WinError(ctypes.get_last_error())
|
||||||
|
try:
|
||||||
|
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||||||
|
finally:
|
||||||
|
_kernel32.LocalFree(data_out.pbData)
|
||||||
|
|
||||||
|
|
||||||
|
def _crypt_unprotect_data(data):
|
||||||
|
buffer = ctypes.create_string_buffer(data)
|
||||||
|
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||||||
|
data_out = _DataBlob()
|
||||||
|
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
|
||||||
|
raise ctypes.WinError(ctypes.get_last_error())
|
||||||
|
try:
|
||||||
|
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||||||
|
finally:
|
||||||
|
_kernel32.LocalFree(data_out.pbData)
|
||||||
|
|
||||||
|
|
||||||
|
def _encrypt_token(token):
|
||||||
|
if os.name != "nt":
|
||||||
|
raise RuntimeError("DPAPI is available only on Windows.")
|
||||||
|
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
|
||||||
|
return base64.b64encode(encrypted_bytes).decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
|
def _decrypt_token(token_data):
|
||||||
|
if os.name != "nt":
|
||||||
|
raise RuntimeError("DPAPI is available only on Windows.")
|
||||||
|
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
|
||||||
|
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
|
||||||
|
return decrypted_bytes.decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def save_token(token, token_file, app_data_dir, expires_in=0):
|
||||||
|
try:
|
||||||
|
expires_in = int(expires_in)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
expires_in = 0
|
||||||
|
|
||||||
|
os.makedirs(app_data_dir, exist_ok=True)
|
||||||
|
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
|
||||||
|
|
||||||
|
stored_token = token
|
||||||
|
encrypted = False
|
||||||
|
if os.name == "nt":
|
||||||
|
try:
|
||||||
|
stored_token = _encrypt_token(token)
|
||||||
|
encrypted = True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"token": stored_token,
|
||||||
|
"expiration_time": expiration_time,
|
||||||
|
"encrypted": encrypted,
|
||||||
|
}
|
||||||
|
|
||||||
|
with open(token_file, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(data, f)
|
||||||
|
return expiration_time
|
||||||
|
|
||||||
|
|
||||||
|
def load_token(token_file):
|
||||||
|
if not os.path.exists(token_file):
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
with open(token_file, "r", encoding="utf-8") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
token = data.get("token")
|
||||||
|
encrypted = data.get("encrypted", False)
|
||||||
|
if token and encrypted:
|
||||||
|
try:
|
||||||
|
token = _decrypt_token(token)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
os.remove(token_file)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
expiration_time = data.get("expiration_time")
|
||||||
|
if token and (expiration_time == 0 or expiration_time > time.time()):
|
||||||
|
return token, expiration_time
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(token_file)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None, None
|
||||||
|
|
||||||
163
services/update_service.py
Normal file
163
services/update_service.py
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
from PySide6.QtCore import QObject, Signal
|
||||||
|
|
||||||
|
|
||||||
|
def _version_key(version_text):
|
||||||
|
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
|
||||||
|
if not parts:
|
||||||
|
return (0, 0, 0)
|
||||||
|
while len(parts) < 3:
|
||||||
|
parts.append(0)
|
||||||
|
return tuple(parts[:3])
|
||||||
|
|
||||||
|
|
||||||
|
def _is_newer_version(latest_version, current_version):
|
||||||
|
latest_key = _version_key(latest_version)
|
||||||
|
current_key = _version_key(current_version)
|
||||||
|
return latest_key > current_key
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_repo_url(value):
|
||||||
|
value = (value or "").strip()
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
if "://" not in value and value.count("/") == 1:
|
||||||
|
return f"https://github.com/{value}"
|
||||||
|
parsed = urlparse(value)
|
||||||
|
if not parsed.scheme or not parsed.netloc:
|
||||||
|
return ""
|
||||||
|
clean_path = parsed.path.rstrip("/")
|
||||||
|
if clean_path.endswith(".git"):
|
||||||
|
clean_path = clean_path[:-4]
|
||||||
|
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
|
||||||
|
|
||||||
|
|
||||||
|
def detect_update_repository_url(configured_url="", configured_repo=""):
|
||||||
|
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
|
||||||
|
if env_url:
|
||||||
|
return env_url
|
||||||
|
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
|
||||||
|
if env_repo:
|
||||||
|
return env_repo
|
||||||
|
cfg_url = _sanitize_repo_url(configured_url)
|
||||||
|
if cfg_url:
|
||||||
|
return cfg_url
|
||||||
|
cfg_repo = _sanitize_repo_url(configured_repo)
|
||||||
|
if cfg_repo:
|
||||||
|
return cfg_repo
|
||||||
|
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
|
||||||
|
if not os.path.exists(git_config_path):
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
with open(git_config_path, "r", encoding="utf-8") as f:
|
||||||
|
content = f.read()
|
||||||
|
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
|
||||||
|
if not match:
|
||||||
|
return ""
|
||||||
|
remote = match.group(1).strip()
|
||||||
|
if remote.startswith("git@"):
|
||||||
|
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
|
||||||
|
if ssh_match:
|
||||||
|
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
|
||||||
|
return _sanitize_repo_url(remote)
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateChecker(QObject):
|
||||||
|
check_finished = Signal(dict)
|
||||||
|
check_failed = Signal(str)
|
||||||
|
|
||||||
|
def __init__(self, repository_url, current_version, request_timeout=8):
|
||||||
|
super().__init__()
|
||||||
|
self.repository_url = repository_url
|
||||||
|
self.current_version = current_version
|
||||||
|
self.request_timeout = request_timeout
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if not self.repository_url:
|
||||||
|
self.check_failed.emit("Не задан URL репозитория обновлений.")
|
||||||
|
return
|
||||||
|
|
||||||
|
parsed = urlparse(self.repository_url)
|
||||||
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||||||
|
repo_path = parsed.path.strip("/")
|
||||||
|
if not repo_path or repo_path.count("/") < 1:
|
||||||
|
self.check_failed.emit("Некорректный URL репозитория обновлений.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if parsed.netloc.lower().endswith("github.com"):
|
||||||
|
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
|
||||||
|
else:
|
||||||
|
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
|
||||||
|
releases_url = f"{base_url}/{repo_path}/releases"
|
||||||
|
request = urllib.request.Request(
|
||||||
|
api_url,
|
||||||
|
headers={
|
||||||
|
"Accept": "application/vnd.github+json",
|
||||||
|
"User-Agent": "AnabasisManager-Updater",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
|
||||||
|
release_data = json.loads(response.read().decode("utf-8"))
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
|
||||||
|
return
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
||||||
|
latest_version = latest_tag.lstrip("vV").strip()
|
||||||
|
html_url = release_data.get("html_url") or releases_url
|
||||||
|
assets = release_data.get("assets") or []
|
||||||
|
download_url = ""
|
||||||
|
download_name = ""
|
||||||
|
checksum_url = ""
|
||||||
|
for asset in assets:
|
||||||
|
url = asset.get("browser_download_url", "")
|
||||||
|
if url.lower().endswith(".zip"):
|
||||||
|
download_url = url
|
||||||
|
download_name = asset.get("name", "")
|
||||||
|
break
|
||||||
|
if not download_url and assets:
|
||||||
|
download_url = assets[0].get("browser_download_url", "")
|
||||||
|
download_name = assets[0].get("name", "")
|
||||||
|
|
||||||
|
for asset in assets:
|
||||||
|
name = asset.get("name", "").lower()
|
||||||
|
if not name:
|
||||||
|
continue
|
||||||
|
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
|
||||||
|
if not is_checksum_asset:
|
||||||
|
continue
|
||||||
|
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
|
||||||
|
checksum_url = asset.get("browser_download_url", "")
|
||||||
|
break
|
||||||
|
if not checksum_url:
|
||||||
|
checksum_url = asset.get("browser_download_url", "")
|
||||||
|
|
||||||
|
self.check_finished.emit(
|
||||||
|
{
|
||||||
|
"repository_url": self.repository_url,
|
||||||
|
"latest_version": latest_version,
|
||||||
|
"current_version": self.current_version,
|
||||||
|
"latest_tag": latest_tag,
|
||||||
|
"release_url": html_url,
|
||||||
|
"download_url": download_url,
|
||||||
|
"download_name": download_name,
|
||||||
|
"checksum_url": checksum_url,
|
||||||
|
"has_update": _is_newer_version(latest_version, self.current_version),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
59
services/vk_service.py
Normal file
59
services/vk_service.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from vk_api import VkApi
|
||||||
|
from vk_api.exceptions import VkApiError
|
||||||
|
|
||||||
|
|
||||||
|
class VkService:
|
||||||
|
def __init__(self):
|
||||||
|
self.session = None
|
||||||
|
self.api = None
|
||||||
|
|
||||||
|
def set_token(self, token):
|
||||||
|
self.session = VkApi(token=token)
|
||||||
|
self.api = self.session.get_api()
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.session = None
|
||||||
|
self.api = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_auth_command(auth_url, output_path, entry_script_path=None):
|
||||||
|
if getattr(sys, "frozen", False):
|
||||||
|
return sys.executable, ["--auth", auth_url, output_path]
|
||||||
|
script_path = entry_script_path or os.path.abspath(__file__)
|
||||||
|
return sys.executable, [script_path, "--auth", auth_url, output_path]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def vk_error_code(exc):
|
||||||
|
error = getattr(exc, "error", None)
|
||||||
|
if isinstance(error, dict):
|
||||||
|
return error.get("error_code")
|
||||||
|
return getattr(exc, "code", None)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_auth_error(cls, exc, formatted_message=None):
|
||||||
|
code = cls.vk_error_code(exc)
|
||||||
|
if code == 5:
|
||||||
|
return True
|
||||||
|
message = (formatted_message or str(exc)).lower()
|
||||||
|
return "invalid_access_token" in message or "user authorization failed" in message
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_retryable_error(cls, exc):
|
||||||
|
return cls.vk_error_code(exc) in (6, 9, 10)
|
||||||
|
|
||||||
|
def call_with_retry(self, func, *args, **kwargs):
|
||||||
|
max_attempts = 5
|
||||||
|
for attempt in range(1, max_attempts + 1):
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except VkApiError as e:
|
||||||
|
if not self.is_retryable_error(e) or attempt == max_attempts:
|
||||||
|
raise
|
||||||
|
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
||||||
|
if self.vk_error_code(e) == 9:
|
||||||
|
delay = max(delay, 1.0)
|
||||||
|
time.sleep(delay)
|
||||||
@@ -5,44 +5,55 @@ from pathlib import Path
|
|||||||
class AuthReloginSmokeTests(unittest.TestCase):
|
class AuthReloginSmokeTests(unittest.TestCase):
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
cls.source = Path("main.py").read_text(encoding="utf-8")
|
cls.main_source = Path("main.py").read_text(encoding="utf-8")
|
||||||
|
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
|
||||||
|
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
|
||||||
|
|
||||||
def test_auth_command_builder_handles_frozen_and_source(self):
|
def test_auth_command_builder_handles_frozen_and_source(self):
|
||||||
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source)
|
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
|
||||||
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source)
|
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
|
||||||
self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source)
|
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
|
||||||
|
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
|
||||||
|
|
||||||
def test_auth_runs_via_qprocess(self):
|
def test_auth_runs_via_qprocess(self):
|
||||||
self.assertIn("process = QProcess(self)", self.source)
|
self.assertIn("process = QProcess(self)", self.main_source)
|
||||||
self.assertIn("process.start(program, args)", self.source)
|
self.assertIn("process.start(program, args)", self.main_source)
|
||||||
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source)
|
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
|
||||||
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source)
|
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.main_source)
|
||||||
|
|
||||||
def test_force_relogin_has_backoff_and_event_log(self):
|
def test_force_relogin_has_backoff_and_event_log(self):
|
||||||
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source)
|
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
|
||||||
self.assertIn("if self._auth_relogin_in_progress:", self.source)
|
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
|
||||||
self.assertIn("force_relogin_backoff", self.source)
|
self.assertIn("force_relogin_backoff", self.main_source)
|
||||||
self.assertIn("force_relogin", self.source)
|
self.assertIn("force_relogin", self.main_source)
|
||||||
|
|
||||||
def test_auth_error_paths_trigger_force_relogin(self):
|
def test_auth_error_paths_trigger_force_relogin(self):
|
||||||
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source)
|
self.assertIn(
|
||||||
self.assertIn("self._force_relogin(exc, action_name or context)", self.source)
|
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
|
||||||
self.assertIn('"load_chats",', self.source)
|
self.main_source,
|
||||||
self.assertIn('"execute_user_action",', self.source)
|
)
|
||||||
self.assertIn('"set_user_admin",', self.source)
|
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
|
||||||
|
self.assertIn('"load_chats",', self.main_source)
|
||||||
|
self.assertIn('"execute_user_action",', self.main_source)
|
||||||
|
self.assertIn('"set_user_admin",', self.main_source)
|
||||||
|
|
||||||
def test_tab_checkbox_lists_use_existing_attributes(self):
|
def test_tab_checkbox_lists_use_existing_attributes(self):
|
||||||
self.assertIn("self.warehouse_chat_checkboxes", self.source)
|
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
|
||||||
self.assertIn("self.coffee_chat_checkboxes", self.source)
|
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
|
||||||
self.assertNotIn("self.retail_warehouse_checkboxes", self.source)
|
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
|
||||||
self.assertNotIn("self.retail_coffee_checkboxes", self.source)
|
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
|
||||||
|
|
||||||
def test_update_check_actions_exist(self):
|
def test_update_check_actions_exist(self):
|
||||||
self.assertIn("APP_VERSION = ", self.source)
|
self.assertIn("from app_version import APP_VERSION", self.main_source)
|
||||||
self.assertIn("UPDATE_REPOSITORY = ", self.source)
|
self.assertIn("from services import (", self.main_source)
|
||||||
self.assertIn('QAction("Проверить обновления", self)', self.source)
|
self.assertIn("UpdateChecker", self.main_source)
|
||||||
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source)
|
self.assertIn("detect_update_repository_url", self.main_source)
|
||||||
self.assertIn("class UpdateChecker(QObject):", self.source)
|
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
|
||||||
|
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
|
||||||
|
self.assertIn("class UpdateChecker(QObject):", self.update_source)
|
||||||
|
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
|
||||||
|
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
|
||||||
|
self.assertIn("AutoUpdateService.build_update_script", self.main_source)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
51
tests/test_auto_update_service.py
Normal file
51
tests/test_auto_update_service.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
import hashlib
|
||||||
|
import importlib.util
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"auto_update_service",
|
||||||
|
Path("services/auto_update_service.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
AutoUpdateService = _MODULE.AutoUpdateService
|
||||||
|
|
||||||
|
|
||||||
|
class AutoUpdateServiceTests(unittest.TestCase):
|
||||||
|
def test_extract_sha256_from_text(self):
|
||||||
|
digest = "a" * 64
|
||||||
|
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
|
||||||
|
extracted = AutoUpdateService.extract_sha256_from_text(
|
||||||
|
text,
|
||||||
|
"AnabasisManager-1.0.0-win.zip",
|
||||||
|
)
|
||||||
|
self.assertEqual(extracted, digest)
|
||||||
|
|
||||||
|
def test_sha256_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
path = Path(td) / "payload.bin"
|
||||||
|
payload = b"anabasis"
|
||||||
|
path.write_bytes(payload)
|
||||||
|
expected = hashlib.sha256(payload).hexdigest()
|
||||||
|
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
|
||||||
|
|
||||||
|
def test_build_update_script_contains_core_vars(self):
|
||||||
|
script = AutoUpdateService.build_update_script(
|
||||||
|
app_dir=r"C:\Apps\AnabasisManager",
|
||||||
|
source_dir=r"C:\Temp\Extracted",
|
||||||
|
exe_name="AnabasisManager.exe",
|
||||||
|
target_pid=1234,
|
||||||
|
)
|
||||||
|
script_text = Path(script).read_text(encoding="utf-8")
|
||||||
|
self.assertIn("set \"APP_DIR=", script_text)
|
||||||
|
self.assertIn("set \"SRC_DIR=", script_text)
|
||||||
|
self.assertIn("set \"EXE_NAME=", script_text)
|
||||||
|
self.assertIn("set \"TARGET_PID=", script_text)
|
||||||
|
self.assertIn(":rollback", script_text)
|
||||||
|
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
65
tests/test_chat_actions.py
Normal file
65
tests/test_chat_actions.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
import unittest
|
||||||
|
import importlib.util
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"chat_actions",
|
||||||
|
Path("services/chat_actions.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
load_chat_conversations = _MODULE.load_chat_conversations
|
||||||
|
resolve_user_ids = _MODULE.resolve_user_ids
|
||||||
|
|
||||||
|
|
||||||
|
class ChatActionsTests(unittest.TestCase):
|
||||||
|
def test_resolve_user_ids_mixed_results(self):
|
||||||
|
mapping = {
|
||||||
|
"id1": {"type": "user", "object_id": 1},
|
||||||
|
"id2": {"type": "group", "object_id": 2},
|
||||||
|
}
|
||||||
|
|
||||||
|
def call_with_retry(func, **kwargs):
|
||||||
|
return func(**kwargs)
|
||||||
|
|
||||||
|
def resolve_screen_name(screen_name):
|
||||||
|
if screen_name == "boom":
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
return mapping.get(screen_name)
|
||||||
|
|
||||||
|
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
|
||||||
|
links = [
|
||||||
|
"https://vk.com/id1",
|
||||||
|
"https://vk.com/id2",
|
||||||
|
"https://vk.com/boom",
|
||||||
|
"https://vk.com/",
|
||||||
|
]
|
||||||
|
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
|
||||||
|
|
||||||
|
self.assertEqual(resolved, [1])
|
||||||
|
self.assertEqual(len(failed), 3)
|
||||||
|
self.assertEqual(failed[0][0], "https://vk.com/id2")
|
||||||
|
self.assertIsNone(failed[0][1])
|
||||||
|
|
||||||
|
def test_load_chat_conversations_paginated(self):
|
||||||
|
pages = [
|
||||||
|
{"items": [{"id": 1}], "next_from": "page-2"},
|
||||||
|
{"items": [{"id": 2}]},
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_conversations(**kwargs):
|
||||||
|
if kwargs.get("start_from") == "page-2":
|
||||||
|
return pages[1]
|
||||||
|
return pages[0]
|
||||||
|
|
||||||
|
def call_with_retry(func, **kwargs):
|
||||||
|
return func(**kwargs)
|
||||||
|
|
||||||
|
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
|
||||||
|
items = load_chat_conversations(call_with_retry, vk_api)
|
||||||
|
self.assertEqual(items, [{"id": 1}, {"id": 2}])
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
53
tests/test_token_store.py
Normal file
53
tests/test_token_store.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
_SPEC = importlib.util.spec_from_file_location(
|
||||||
|
"token_store",
|
||||||
|
Path("services/token_store.py"),
|
||||||
|
)
|
||||||
|
_MODULE = importlib.util.module_from_spec(_SPEC)
|
||||||
|
_SPEC.loader.exec_module(_MODULE)
|
||||||
|
load_token = _MODULE.load_token
|
||||||
|
save_token = _MODULE.save_token
|
||||||
|
|
||||||
|
|
||||||
|
class TokenStoreTests(unittest.TestCase):
|
||||||
|
def test_save_and_load_non_expiring_token(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
token_file = Path(td) / "token.json"
|
||||||
|
with patch.object(_MODULE.os, "name", "posix"):
|
||||||
|
expiration = save_token(
|
||||||
|
token="abc123",
|
||||||
|
token_file=str(token_file),
|
||||||
|
app_data_dir=td,
|
||||||
|
expires_in=0,
|
||||||
|
)
|
||||||
|
token, loaded_expiration = load_token(str(token_file))
|
||||||
|
|
||||||
|
self.assertEqual(expiration, 0)
|
||||||
|
self.assertEqual(token, "abc123")
|
||||||
|
self.assertEqual(loaded_expiration, 0)
|
||||||
|
|
||||||
|
def test_expired_token_is_removed(self):
|
||||||
|
with tempfile.TemporaryDirectory() as td:
|
||||||
|
token_file = Path(td) / "token.json"
|
||||||
|
with patch.object(_MODULE.os, "name", "posix"):
|
||||||
|
with patch.object(_MODULE.time, "time", return_value=1000):
|
||||||
|
save_token(
|
||||||
|
token="abc123",
|
||||||
|
token_file=str(token_file),
|
||||||
|
app_data_dir=td,
|
||||||
|
expires_in=1,
|
||||||
|
)
|
||||||
|
with patch.object(_MODULE.time, "time", return_value=2000):
|
||||||
|
token, expiration = load_token(str(token_file))
|
||||||
|
|
||||||
|
self.assertIsNone(token)
|
||||||
|
self.assertIsNone(expiration)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
25
ui/dialogs.py
Normal file
25
ui/dialogs.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
|
||||||
|
|
||||||
|
|
||||||
|
class MultiLinkDialog(QDialog):
|
||||||
|
def __init__(self, parent=None):
|
||||||
|
super().__init__(parent)
|
||||||
|
self.setWindowTitle("Ввод нескольких ссылок")
|
||||||
|
self.setMinimumSize(400, 300)
|
||||||
|
|
||||||
|
layout = QVBoxLayout(self)
|
||||||
|
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
|
||||||
|
layout.addWidget(label)
|
||||||
|
|
||||||
|
self.links_text_edit = QTextEdit()
|
||||||
|
layout.addWidget(self.links_text_edit)
|
||||||
|
|
||||||
|
button_box = QDialogButtonBox()
|
||||||
|
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
|
||||||
|
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
|
||||||
|
button_box.accepted.connect(self.accept)
|
||||||
|
button_box.rejected.connect(self.reject)
|
||||||
|
layout.addWidget(button_box)
|
||||||
|
|
||||||
|
def get_links(self):
|
||||||
|
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]
|
||||||
9
ui/main_window.py
Normal file
9
ui/main_window.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
def instructions_text():
|
||||||
|
return (
|
||||||
|
"Инструкция:\n"
|
||||||
|
"1. Авторизуйтесь через VK.\n"
|
||||||
|
"2. Выберите чаты.\n"
|
||||||
|
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
|
||||||
|
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
|
||||||
|
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user