Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 02350cfca1 | |||
| 68fa841857 | |||
| bca9007463 | |||
| 52b1301982 | |||
| 90b3b4fc9d | |||
| 190e67c931 | |||
| 2eb4c52b81 | |||
| 3d73a504d2 | |||
| 1524271be7 | |||
| 561cf43e09 | |||
| e8930f7550 | |||
| c8da0f9191 | |||
| 37ce500fd2 | |||
| 098a84e5bd | |||
| 5aa17c1a84 | |||
| dde14f3714 | |||
| fa5d4c6993 | |||
| f9e0225243 | |||
| c42b23bea5 | |||
| b52cdea425 | |||
| b7fad78a71 | |||
| e590a6cde0 |
35
.gitea/workflows/ci.yml
Normal file
35
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,35 @@
|
||||
name: Desktop CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
tests:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: https://git.daemonlord.ru/actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.13"
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements.txt
|
||||
|
||||
- name: Validate syntax
|
||||
run: |
|
||||
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py
|
||||
|
||||
- name: Run tests
|
||||
run: |
|
||||
python -m unittest tests/test_auth_relogin_smoke.py
|
||||
133
.gitea/workflows/release.yml
Normal file
133
.gitea/workflows/release.yml
Normal file
@@ -0,0 +1,133 @@
|
||||
name: Desktop Release
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
release:
|
||||
runs-on: windows
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: https://git.daemonlord.ru/actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
tags: true
|
||||
|
||||
- name: Ensure Python 3.13
|
||||
shell: powershell
|
||||
run: |
|
||||
if (Get-Command python -ErrorAction SilentlyContinue) {
|
||||
python --version
|
||||
} elseif (Get-Command py -ErrorAction SilentlyContinue) {
|
||||
$pyExe = py -3.13 -c "import sys; print(sys.executable)"
|
||||
if (-not $pyExe) {
|
||||
throw "Python 3.13 launcher is available, but interpreter was not found."
|
||||
}
|
||||
Split-Path $pyExe | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
|
||||
python --version
|
||||
} else {
|
||||
throw "Python is not installed on runner. Install Python 3.13 and restart runner service."
|
||||
}
|
||||
|
||||
- name: Install dependencies
|
||||
shell: powershell
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install -r requirements.txt pyinstaller
|
||||
|
||||
- name: Extract app version
|
||||
id: extract_version
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = (python -c "from app_version import APP_VERSION; print(APP_VERSION)").Trim()
|
||||
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_OUTPUT, "version=$version`n", $utf8NoBom)
|
||||
Write-Host "Detected version: $version"
|
||||
|
||||
- name: Stop if version already released
|
||||
id: stop
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$tag = "v$version"
|
||||
git show-ref --tags --quiet --verify "refs/tags/$tag"
|
||||
$tagExists = ($LASTEXITCODE -eq 0)
|
||||
$global:LASTEXITCODE = 0
|
||||
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
|
||||
if ($tagExists) {
|
||||
Write-Host "Version $tag already released, stopping job."
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=false`n", $utf8NoBom)
|
||||
} else {
|
||||
Write-Host "Version $tag not released yet, continuing workflow..."
|
||||
[System.IO.File]::AppendAllText($env:GITHUB_ENV, "CONTINUE=true`n", $utf8NoBom)
|
||||
}
|
||||
exit 0
|
||||
|
||||
- name: Run tests
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
python -m py_compile app_version.py main.py build.py tests/test_auth_relogin_smoke.py
|
||||
python -m unittest tests/test_auth_relogin_smoke.py
|
||||
|
||||
- name: Build release zip
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
python build.py
|
||||
|
||||
- name: Ensure archive exists
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$archivePath = "dist/AnabasisManager-$version.zip"
|
||||
if (-not (Test-Path $archivePath)) {
|
||||
throw "Archive not found: $archivePath"
|
||||
}
|
||||
|
||||
- name: Generate SHA256 checksum
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$archiveName = "AnabasisManager-$version.zip"
|
||||
$archivePath = "dist/$archiveName"
|
||||
$checksumPath = "dist/$archiveName.sha256"
|
||||
$hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower()
|
||||
"$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8
|
||||
Write-Host "Checksum created: $checksumPath"
|
||||
|
||||
- name: Configure git identity
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
git config user.name "gitea-actions"
|
||||
git config user.email "gitea-actions@daemonlord.ru"
|
||||
|
||||
- name: Create git tag
|
||||
if: env.CONTINUE == 'true'
|
||||
shell: powershell
|
||||
run: |
|
||||
$version = "${{ steps.extract_version.outputs.version }}"
|
||||
$tag = "v$version"
|
||||
git tag "$tag"
|
||||
git push origin "$tag"
|
||||
|
||||
- name: Create Gitea Release
|
||||
if: env.CONTINUE == 'true'
|
||||
uses: https://git.daemonlord.ru/actions/gitea-release-action@v1
|
||||
with:
|
||||
server_url: https://git.daemonlord.ru
|
||||
repository: ${{ gitea.repository }}
|
||||
token: ${{ secrets.API_TOKEN }}
|
||||
tag_name: v${{ steps.extract_version.outputs.version }}
|
||||
name: Anabasis Manager ${{ steps.extract_version.outputs.version }}
|
||||
body: |
|
||||
Desktop release v${{ steps.extract_version.outputs.version }}
|
||||
files: |
|
||||
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip
|
||||
dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256
|
||||
7
.gitignore
vendored
7
.gitignore
vendored
@@ -4,3 +4,10 @@
|
||||
/build_linux/
|
||||
/build_win32/
|
||||
/build_darwin/
|
||||
.idea/
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
tests/__pycache__/
|
||||
build/
|
||||
dist/
|
||||
AnabasisManager.spec
|
||||
|
||||
1
app_version.py
Normal file
1
app_version.py
Normal file
@@ -0,0 +1 @@
|
||||
APP_VERSION = "1.6.4"
|
||||
3
build.py
3
build.py
@@ -2,10 +2,11 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from app_version import APP_VERSION
|
||||
|
||||
# --- Конфигурация ---
|
||||
APP_NAME = "AnabasisManager"
|
||||
VERSION = "1.5" # Ваша версия
|
||||
VERSION = APP_VERSION # Единая версия приложения
|
||||
MAIN_SCRIPT = "main.py"
|
||||
ICON_PATH = "icon.ico"
|
||||
DIST_DIR = os.path.join("dist", APP_NAME)
|
||||
|
||||
795
main.py
795
main.py
@@ -2,21 +2,31 @@ import sys
|
||||
import base64
|
||||
import ctypes
|
||||
import shutil
|
||||
import subprocess
|
||||
from vk_api import VkApi
|
||||
import json
|
||||
import time
|
||||
import auth_webview
|
||||
import os
|
||||
import re
|
||||
import hashlib
|
||||
import subprocess
|
||||
import threading
|
||||
import tempfile
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
import zipfile
|
||||
from app_version import APP_VERSION
|
||||
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
|
||||
QPushButton, QVBoxLayout, QWidget, QMessageBox,
|
||||
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
|
||||
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
|
||||
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
|
||||
from PySide6.QtGui import QIcon, QAction
|
||||
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
|
||||
QProgressBar)
|
||||
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject
|
||||
from PySide6.QtGui import QIcon, QAction, QDesktopServices
|
||||
from urllib.parse import urlparse, parse_qs, unquote
|
||||
from vk_api.exceptions import VkApiError
|
||||
from PySide6.QtCore import QStandardPaths
|
||||
from PySide6.QtCore import QProcess
|
||||
from ctypes import wintypes
|
||||
|
||||
# --- Управление токенами и настройками ---
|
||||
@@ -27,12 +37,81 @@ CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
|
||||
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
|
||||
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
|
||||
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
|
||||
AUTH_RELOGIN_BACKOFF_SECONDS = 5.0
|
||||
# Legacy owner/repo format for GitHub-only fallback.
|
||||
UPDATE_REPOSITORY = ""
|
||||
# Full repository URL is preferred (supports GitHub/Gitea).
|
||||
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
|
||||
UPDATE_REQUEST_TIMEOUT = 8
|
||||
|
||||
|
||||
class _DataBlob(ctypes.Structure):
|
||||
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
|
||||
|
||||
|
||||
def _version_key(version_text):
|
||||
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
|
||||
if not parts:
|
||||
return (0, 0, 0)
|
||||
while len(parts) < 3:
|
||||
parts.append(0)
|
||||
return tuple(parts[:3])
|
||||
|
||||
|
||||
def _is_newer_version(latest_version, current_version):
|
||||
latest_key = _version_key(latest_version)
|
||||
current_key = _version_key(current_version)
|
||||
return latest_key > current_key
|
||||
|
||||
|
||||
def _sanitize_repo_url(value):
|
||||
value = (value or "").strip()
|
||||
if not value:
|
||||
return ""
|
||||
if "://" not in value and value.count("/") == 1:
|
||||
return f"https://github.com/{value}"
|
||||
parsed = urlparse(value)
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
return ""
|
||||
clean_path = parsed.path.rstrip("/")
|
||||
if clean_path.endswith(".git"):
|
||||
clean_path = clean_path[:-4]
|
||||
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
|
||||
|
||||
|
||||
def _detect_update_repository_url():
|
||||
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
|
||||
if env_url:
|
||||
return env_url
|
||||
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
|
||||
if env_repo:
|
||||
return env_repo
|
||||
configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL)
|
||||
if configured_url:
|
||||
return configured_url
|
||||
configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY)
|
||||
if configured_repo:
|
||||
return configured_repo
|
||||
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
|
||||
if not os.path.exists(git_config_path):
|
||||
return ""
|
||||
try:
|
||||
with open(git_config_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
|
||||
if match:
|
||||
remote = match.group(1).strip()
|
||||
if remote.startswith("git@"):
|
||||
# git@host:owner/repo(.git)
|
||||
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
|
||||
if ssh_match:
|
||||
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
|
||||
return _sanitize_repo_url(remote)
|
||||
except Exception:
|
||||
return ""
|
||||
return ""
|
||||
|
||||
|
||||
_crypt32 = None
|
||||
_kernel32 = None
|
||||
if os.name == "nt":
|
||||
@@ -176,6 +255,149 @@ def load_token():
|
||||
print(f"Ошибка загрузки: {e}")
|
||||
return None, None
|
||||
|
||||
class VkService:
|
||||
def __init__(self):
|
||||
self.session = None
|
||||
self.api = None
|
||||
|
||||
def set_token(self, token):
|
||||
self.session = VkApi(token=token)
|
||||
self.api = self.session.get_api()
|
||||
|
||||
def clear(self):
|
||||
self.session = None
|
||||
self.api = None
|
||||
|
||||
@staticmethod
|
||||
def build_auth_command(auth_url, output_path):
|
||||
if getattr(sys, "frozen", False):
|
||||
return sys.executable, ["--auth", auth_url, output_path]
|
||||
return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]
|
||||
|
||||
@staticmethod
|
||||
def vk_error_code(exc):
|
||||
error = getattr(exc, "error", None)
|
||||
if isinstance(error, dict):
|
||||
return error.get("error_code")
|
||||
return getattr(exc, "code", None)
|
||||
|
||||
@classmethod
|
||||
def is_auth_error(cls, exc, formatted_message=None):
|
||||
code = cls.vk_error_code(exc)
|
||||
if code == 5:
|
||||
return True
|
||||
message = (formatted_message or str(exc)).lower()
|
||||
return "invalid_access_token" in message or "user authorization failed" in message
|
||||
|
||||
@classmethod
|
||||
def is_retryable_error(cls, exc):
|
||||
return cls.vk_error_code(exc) in (6, 9, 10)
|
||||
|
||||
def call_with_retry(self, func, *args, **kwargs):
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except VkApiError as e:
|
||||
if not self.is_retryable_error(e) or attempt == max_attempts:
|
||||
raise
|
||||
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
||||
if self.vk_error_code(e) == 9:
|
||||
delay = max(delay, 1.0)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
class UpdateChecker(QObject):
|
||||
check_finished = Signal(dict)
|
||||
check_failed = Signal(str)
|
||||
|
||||
def __init__(self, repository_url, current_version):
|
||||
super().__init__()
|
||||
self.repository_url = repository_url
|
||||
self.current_version = current_version
|
||||
|
||||
def run(self):
|
||||
if not self.repository_url:
|
||||
self.check_failed.emit("Не задан URL репозитория обновлений.")
|
||||
return
|
||||
|
||||
parsed = urlparse(self.repository_url)
|
||||
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||||
repo_path = parsed.path.strip("/")
|
||||
if not repo_path or repo_path.count("/") < 1:
|
||||
self.check_failed.emit("Некорректный URL репозитория обновлений.")
|
||||
return
|
||||
|
||||
if parsed.netloc.lower().endswith("github.com"):
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
|
||||
else:
|
||||
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
|
||||
releases_url = f"{base_url}/{repo_path}/releases"
|
||||
request = urllib.request.Request(
|
||||
api_url,
|
||||
headers={
|
||||
"Accept": "application/vnd.github+json",
|
||||
"User-Agent": "AnabasisManager-Updater",
|
||||
},
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response:
|
||||
release_data = json.loads(response.read().decode("utf-8"))
|
||||
except urllib.error.HTTPError as e:
|
||||
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
|
||||
return
|
||||
except urllib.error.URLError as e:
|
||||
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
|
||||
return
|
||||
except Exception as e:
|
||||
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
|
||||
return
|
||||
|
||||
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
||||
latest_version = latest_tag.lstrip("vV").strip()
|
||||
html_url = release_data.get("html_url") or releases_url
|
||||
assets = release_data.get("assets") or []
|
||||
download_url = ""
|
||||
download_name = ""
|
||||
checksum_url = ""
|
||||
for asset in assets:
|
||||
url = asset.get("browser_download_url", "")
|
||||
if url.lower().endswith(".zip"):
|
||||
download_url = url
|
||||
download_name = asset.get("name", "")
|
||||
break
|
||||
if not download_url and assets:
|
||||
download_url = assets[0].get("browser_download_url", "")
|
||||
download_name = assets[0].get("name", "")
|
||||
|
||||
for asset in assets:
|
||||
name = asset.get("name", "").lower()
|
||||
if not name:
|
||||
continue
|
||||
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
|
||||
if not is_checksum_asset:
|
||||
continue
|
||||
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
|
||||
checksum_url = asset.get("browser_download_url", "")
|
||||
break
|
||||
if not checksum_url:
|
||||
checksum_url = asset.get("browser_download_url", "")
|
||||
|
||||
self.check_finished.emit(
|
||||
{
|
||||
"repository_url": self.repository_url,
|
||||
"latest_version": latest_version,
|
||||
"current_version": self.current_version,
|
||||
"latest_tag": latest_tag,
|
||||
"release_url": html_url,
|
||||
"download_url": download_url,
|
||||
"download_name": download_name,
|
||||
"checksum_url": checksum_url,
|
||||
"has_update": _is_newer_version(latest_version, self.current_version),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class MultiLinkDialog(QDialog):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
@@ -211,11 +433,22 @@ class VkChatManager(QMainWindow):
|
||||
self.warehouse_chat_checkboxes = []
|
||||
self.coffee_chat_checkboxes = []
|
||||
self.other_chat_checkboxes = []
|
||||
self.vk_service = VkService()
|
||||
self.vk_session = None
|
||||
self.vk = None
|
||||
self.user_ids_to_process = []
|
||||
self._busy = False
|
||||
self.suppress_resolve = False
|
||||
self.auth_process = None
|
||||
self.auth_output_path = None
|
||||
self._auth_process_error_text = None
|
||||
self._auth_ui_busy = False
|
||||
self._auth_relogin_in_progress = False
|
||||
self._last_auth_relogin_ts = 0.0
|
||||
self.update_repository_url = _detect_update_repository_url()
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
self._update_check_silent = False
|
||||
|
||||
self.resolve_timer = QTimer(self)
|
||||
self.resolve_timer.setSingleShot(True)
|
||||
@@ -227,6 +460,7 @@ class VkChatManager(QMainWindow):
|
||||
self.init_ui()
|
||||
self.load_saved_token_on_startup()
|
||||
self.setup_token_timer()
|
||||
QTimer.singleShot(1800, lambda: self.check_for_updates(silent_no_updates=True))
|
||||
|
||||
def init_ui(self):
|
||||
central_widget = QWidget()
|
||||
@@ -260,6 +494,11 @@ class VkChatManager(QMainWindow):
|
||||
self.auth_btn = QPushButton("Авторизоваться через VK")
|
||||
self.auth_btn.clicked.connect(self.start_auth)
|
||||
layout.addWidget(self.auth_btn)
|
||||
self.auth_progress = QProgressBar()
|
||||
self.auth_progress.setRange(0, 0)
|
||||
self.auth_progress.setTextVisible(False)
|
||||
self.auth_progress.hide()
|
||||
layout.addWidget(self.auth_progress)
|
||||
|
||||
self.chat_tabs = QTabWidget()
|
||||
self.chat_tabs.hide()
|
||||
@@ -373,7 +612,8 @@ class VkChatManager(QMainWindow):
|
||||
else:
|
||||
failed_links.append(link)
|
||||
except VkApiError as e:
|
||||
self._log_error("resolveScreenName", e)
|
||||
if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"):
|
||||
return
|
||||
failed_links.append(f"{link} ({self._format_vk_error(e)})")
|
||||
except Exception:
|
||||
failed_links.append(link)
|
||||
@@ -405,11 +645,19 @@ class VkChatManager(QMainWindow):
|
||||
make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах")
|
||||
make_admin_action.triggered.connect(self.set_user_admin)
|
||||
tools_menu.addAction(make_admin_action)
|
||||
self.make_admin_action = make_admin_action
|
||||
|
||||
check_updates_action = QAction("Проверить обновления", self)
|
||||
check_updates_action.setStatusTip("Проверить наличие новой версии приложения")
|
||||
check_updates_action.triggered.connect(self.check_for_updates)
|
||||
tools_menu.addAction(check_updates_action)
|
||||
self.check_updates_action = check_updates_action
|
||||
|
||||
logout_action = QAction("Выйти и очистить", self)
|
||||
logout_action.setStatusTip("Выйти, удалить токен и кэш")
|
||||
logout_action.triggered.connect(self.logout_and_clear)
|
||||
tools_menu.addAction(logout_action)
|
||||
self.logout_action = logout_action
|
||||
|
||||
def create_chat_tab(self):
|
||||
# This implementation correctly creates a scrollable area for chat lists.
|
||||
@@ -435,6 +683,264 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
return tab_content_widget
|
||||
|
||||
def _set_update_action_state(self, in_progress):
|
||||
if hasattr(self, "check_updates_action"):
|
||||
self.check_updates_action.setEnabled(not in_progress)
|
||||
|
||||
def check_for_updates(self, silent_no_updates=False):
|
||||
if self.update_thread and self.update_thread.is_alive():
|
||||
return
|
||||
|
||||
self._update_check_silent = silent_no_updates
|
||||
self._set_update_action_state(True)
|
||||
self.status_label.setText("Статус: проверка обновлений...")
|
||||
|
||||
self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION)
|
||||
self.update_checker.check_finished.connect(self._on_update_check_finished)
|
||||
self.update_checker.check_failed.connect(self._on_update_check_failed)
|
||||
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
|
||||
self.update_thread.start()
|
||||
|
||||
def _on_update_check_finished(self, result):
|
||||
self._set_update_action_state(False)
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
|
||||
if result.get("has_update"):
|
||||
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
|
||||
self.status_label.setText(f"Статус: доступно обновление {latest_version}")
|
||||
|
||||
message_box = QMessageBox(self)
|
||||
message_box.setIcon(QMessageBox.Information)
|
||||
message_box.setWindowTitle("Доступно обновление")
|
||||
message_box.setText(
|
||||
f"Текущая версия: {result.get('current_version')}\n"
|
||||
f"Доступная версия: {latest_version}\n\n"
|
||||
"Открыть страницу загрузки?"
|
||||
)
|
||||
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)
|
||||
download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole)
|
||||
releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole)
|
||||
cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole)
|
||||
message_box.setDefaultButton(update_now_button)
|
||||
message_box.exec()
|
||||
|
||||
clicked = message_box.clickedButton()
|
||||
download_url = result.get("download_url")
|
||||
checksum_url = result.get("checksum_url")
|
||||
download_name = result.get("download_name")
|
||||
release_url = result.get("release_url")
|
||||
if clicked is update_now_button and download_url:
|
||||
if not self._start_auto_update(download_url, latest_version, checksum_url, download_name):
|
||||
if release_url:
|
||||
QDesktopServices.openUrl(QUrl(release_url))
|
||||
return
|
||||
if clicked is download_button and download_url:
|
||||
QDesktopServices.openUrl(QUrl(download_url))
|
||||
elif clicked in (download_button, releases_button) and release_url:
|
||||
QDesktopServices.openUrl(QUrl(release_url))
|
||||
elif clicked not in (cancel_button,):
|
||||
self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN")
|
||||
return
|
||||
|
||||
self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}).")
|
||||
if not self._update_check_silent:
|
||||
QMessageBox.information(self, "Обновления", "Установлена актуальная версия.")
|
||||
|
||||
def _on_update_check_failed(self, error_text):
|
||||
self._set_update_action_state(False)
|
||||
self.update_checker = None
|
||||
self.update_thread = None
|
||||
self._log_event("update_check_failed", error_text, level="WARN")
|
||||
if not self.update_repository_url:
|
||||
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
|
||||
if not self._update_check_silent:
|
||||
QMessageBox.warning(
|
||||
self,
|
||||
"Обновления не настроены",
|
||||
"Не задан URL репозитория для обновлений.\n"
|
||||
"Укажите UPDATE_REPOSITORY_URL в main.py или переменную окружения "
|
||||
"ANABASIS_UPDATE_URL (например: https://git.daemonlord.ru/owner/repo).",
|
||||
)
|
||||
return
|
||||
self.status_label.setText("Статус: не удалось проверить обновления.")
|
||||
if not self._update_check_silent:
|
||||
QMessageBox.warning(self, "Проверка обновлений", error_text)
|
||||
|
||||
def _download_update_archive(self, download_url, destination_path):
|
||||
request = urllib.request.Request(
|
||||
download_url,
|
||||
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||
)
|
||||
with urllib.request.urlopen(request, timeout=60) as response:
|
||||
with open(destination_path, "wb") as f:
|
||||
shutil.copyfileobj(response, f)
|
||||
|
||||
def _download_update_text(self, url):
|
||||
request = urllib.request.Request(
|
||||
url,
|
||||
headers={"User-Agent": "AnabasisManager-Updater"},
|
||||
)
|
||||
with urllib.request.urlopen(request, timeout=30) as response:
|
||||
return response.read().decode("utf-8", errors="replace")
|
||||
|
||||
@staticmethod
|
||||
def _sha256_file(path):
|
||||
digest = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b""):
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest().lower()
|
||||
|
||||
@staticmethod
|
||||
def _extract_sha256_from_text(checksum_text, target_file_name):
|
||||
target = (target_file_name or "").strip().lower()
|
||||
for raw_line in checksum_text.splitlines():
|
||||
line = raw_line.strip()
|
||||
if not line:
|
||||
continue
|
||||
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
|
||||
if not match:
|
||||
continue
|
||||
checksum = match.group(1).lower()
|
||||
if not target:
|
||||
return checksum
|
||||
line_lower = line.lower()
|
||||
if target in line_lower:
|
||||
return checksum
|
||||
if os.path.basename(target) in line_lower:
|
||||
return checksum
|
||||
return ""
|
||||
|
||||
def _verify_update_checksum(self, zip_path, checksum_url, download_name):
|
||||
if not checksum_url:
|
||||
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
|
||||
checksum_text = self._download_update_text(checksum_url)
|
||||
expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
|
||||
if not expected_hash:
|
||||
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
|
||||
actual_hash = self._sha256_file(zip_path)
|
||||
if actual_hash != expected_hash:
|
||||
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
|
||||
|
||||
def _locate_extracted_root(self, extracted_dir):
|
||||
entries = []
|
||||
for name in os.listdir(extracted_dir):
|
||||
full_path = os.path.join(extracted_dir, name)
|
||||
if os.path.isdir(full_path):
|
||||
entries.append(full_path)
|
||||
if len(entries) == 1:
|
||||
candidate = entries[0]
|
||||
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
|
||||
return candidate
|
||||
return extracted_dir
|
||||
|
||||
def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):
|
||||
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
|
||||
script_lines = [
|
||||
"@echo off",
|
||||
"setlocal",
|
||||
f"set APP_DIR={app_dir}",
|
||||
f"set SRC_DIR={source_dir}",
|
||||
f"set EXE_NAME={exe_name}",
|
||||
f"set TARGET_PID={target_pid}",
|
||||
"set BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%",
|
||||
":wait_for_exit",
|
||||
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
|
||||
"if %ERRORLEVEL% EQU 0 (",
|
||||
" timeout /t 1 /nobreak >nul",
|
||||
" goto :wait_for_exit",
|
||||
")",
|
||||
"timeout /t 1 /nobreak >nul",
|
||||
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
|
||||
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||
"set RC=%ERRORLEVEL%",
|
||||
"if %RC% GEQ 8 goto :backup_error",
|
||||
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
|
||||
"set RC=%ERRORLEVEL%",
|
||||
"if %RC% GEQ 8 goto :rollback",
|
||||
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||
"timeout /t 2 /nobreak >nul",
|
||||
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
|
||||
"if %ERRORLEVEL% NEQ 0 goto :rollback",
|
||||
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
|
||||
"exit /b 0",
|
||||
":rollback",
|
||||
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
|
||||
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
|
||||
"echo Auto-update failed. Rollback executed. > \"%APP_DIR%\\update_error.log\"",
|
||||
"exit /b 2",
|
||||
":backup_error",
|
||||
"echo Auto-update failed during backup. Code %RC% > \"%APP_DIR%\\update_error.log\"",
|
||||
"exit /b %RC%",
|
||||
]
|
||||
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
|
||||
f.write("\r\n".join(script_lines) + "\r\n")
|
||||
return script_path
|
||||
|
||||
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
|
||||
if os.name != "nt":
|
||||
QMessageBox.information(
|
||||
self,
|
||||
"Автообновление",
|
||||
"Автообновление пока поддерживается только в Windows-сборке.",
|
||||
)
|
||||
return False
|
||||
if not getattr(sys, "frozen", False):
|
||||
QMessageBox.information(
|
||||
self,
|
||||
"Автообновление",
|
||||
"Автообновление доступно в собранной версии приложения (.exe).",
|
||||
)
|
||||
return False
|
||||
if not download_url:
|
||||
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
|
||||
return False
|
||||
|
||||
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
|
||||
self._set_busy(True)
|
||||
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
|
||||
zip_path = os.path.join(work_dir, "update.zip")
|
||||
unpack_dir = os.path.join(work_dir, "extracted")
|
||||
try:
|
||||
self._download_update_archive(download_url, zip_path)
|
||||
self._verify_update_checksum(zip_path, checksum_url, download_name)
|
||||
os.makedirs(unpack_dir, exist_ok=True)
|
||||
with zipfile.ZipFile(zip_path, "r") as archive:
|
||||
archive.extractall(unpack_dir)
|
||||
|
||||
source_dir = self._locate_extracted_root(unpack_dir)
|
||||
app_exe = sys.executable
|
||||
app_dir = os.path.dirname(app_exe)
|
||||
exe_name = os.path.basename(app_exe)
|
||||
script_path = self._build_update_script(app_dir, source_dir, exe_name, os.getpid())
|
||||
|
||||
creation_flags = 0
|
||||
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
|
||||
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
|
||||
if hasattr(subprocess, "DETACHED_PROCESS"):
|
||||
creation_flags |= subprocess.DETACHED_PROCESS
|
||||
|
||||
subprocess.Popen(
|
||||
["cmd.exe", "/c", script_path],
|
||||
cwd=work_dir,
|
||||
creationflags=creation_flags,
|
||||
)
|
||||
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
|
||||
QMessageBox.information(
|
||||
self,
|
||||
"Обновление запущено",
|
||||
"Обновление скачано. Приложение будет перезапущено.",
|
||||
)
|
||||
QApplication.instance().quit()
|
||||
return True
|
||||
except Exception as e:
|
||||
self._log_event("auto_update_failed", str(e), level="ERROR")
|
||||
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
|
||||
return False
|
||||
finally:
|
||||
self._set_busy(False)
|
||||
|
||||
def setup_token_timer(self):
|
||||
self.token_countdown_timer = QTimer(self)
|
||||
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
|
||||
@@ -467,7 +973,7 @@ class VkChatManager(QMainWindow):
|
||||
self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с")
|
||||
|
||||
def set_ui_state(self, authorized):
|
||||
self.auth_btn.setEnabled(not authorized)
|
||||
self.auth_btn.setEnabled((not authorized) and (not self._auth_ui_busy))
|
||||
for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn,
|
||||
self.vk_url_input, self.multi_link_btn,
|
||||
self.visible_messages_checkbox]:
|
||||
@@ -489,6 +995,19 @@ class VkChatManager(QMainWindow):
|
||||
self.user_ids_to_process.clear()
|
||||
self._set_vk_url_input_text("")
|
||||
self._clear_chat_tabs()
|
||||
if hasattr(self, "make_admin_action"):
|
||||
self.make_admin_action.setEnabled(authorized and (not self._auth_ui_busy))
|
||||
if hasattr(self, "logout_action"):
|
||||
self.logout_action.setEnabled(not self._auth_ui_busy)
|
||||
|
||||
def _set_auth_ui_state(self, in_progress):
|
||||
self._auth_ui_busy = in_progress
|
||||
self.auth_progress.setVisible(in_progress)
|
||||
if hasattr(self, "logout_action"):
|
||||
self.logout_action.setEnabled(not in_progress)
|
||||
if hasattr(self, "make_admin_action"):
|
||||
self.make_admin_action.setEnabled(not in_progress and self.token is not None)
|
||||
self.auth_btn.setEnabled((self.token is None) and (not in_progress))
|
||||
|
||||
def _set_busy(self, busy, status_text=None):
|
||||
if status_text:
|
||||
@@ -513,17 +1032,22 @@ class VkChatManager(QMainWindow):
|
||||
def _ensure_log_dir(self):
|
||||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||||
|
||||
def _log_error(self, context, exc):
|
||||
def _log(self, level, context, message):
|
||||
try:
|
||||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||||
self._rotate_log_if_needed()
|
||||
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
|
||||
message = self._format_vk_error(exc)
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(f"[{timestamp}] {context}: {message}\n")
|
||||
f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _log_error(self, context, exc):
|
||||
self._log("ERROR", context, self._format_vk_error(exc))
|
||||
|
||||
def _log_event(self, context, message, level="INFO"):
|
||||
self._log(level, context, message)
|
||||
|
||||
def _rotate_log_if_needed(self):
|
||||
try:
|
||||
if not os.path.exists(LOG_FILE):
|
||||
@@ -579,30 +1103,11 @@ class VkChatManager(QMainWindow):
|
||||
if confirm != QMessageBox.Yes:
|
||||
return
|
||||
|
||||
self.token = None
|
||||
self.token_expiration_time = None
|
||||
self.vk_session = None
|
||||
self.vk = None
|
||||
self.user_ids_to_process.clear()
|
||||
self._set_vk_url_input_text("")
|
||||
self.token_input.clear()
|
||||
self.token_timer_label.setText("Срок действия токена: Н/Д")
|
||||
self.status_label.setText("Статус: не авторизован")
|
||||
if self.token_countdown_timer.isActive():
|
||||
self.token_countdown_timer.stop()
|
||||
self._clear_chat_tabs()
|
||||
self.set_ui_state(False)
|
||||
|
||||
try:
|
||||
if os.path.exists(TOKEN_FILE):
|
||||
os.remove(TOKEN_FILE)
|
||||
except Exception as e:
|
||||
print(f"Ошибка удаления токена: {e}")
|
||||
|
||||
self._clear_auth_state(stop_timer=True, remove_token_file=True)
|
||||
try:
|
||||
self._try_remove_web_cache()
|
||||
except Exception as e:
|
||||
print(f"Ошибка удаления кэша: {e}")
|
||||
self._log_event("logout_and_clear", f"Cache cleanup failed: {e}", level="WARN")
|
||||
|
||||
def _cleanup_cache_if_needed(self):
|
||||
if os.path.exists(CACHE_CLEANUP_MARKER):
|
||||
@@ -641,13 +1146,88 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
def set_all_checkboxes_on_current_tab(self, checked):
|
||||
current_index = self.chat_tabs.currentIndex()
|
||||
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.retail_warehouse_checkboxes, self.retail_coffee_checkboxes, self.other_chat_checkboxes]
|
||||
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]
|
||||
if 0 <= current_index < len(checkbox_lists):
|
||||
for checkbox in checkbox_lists[current_index]:
|
||||
checkbox.setChecked(checked)
|
||||
|
||||
def start_auth(self):
|
||||
self.status_label.setText("Статус: ожидание авторизации...")
|
||||
def _build_auth_command(self, auth_url, output_path):
|
||||
return self.vk_service.build_auth_command(auth_url, output_path)
|
||||
|
||||
def _on_auth_process_error(self, process_error):
|
||||
self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}"
|
||||
# For failed starts Qt may not emit finished(), so release UI here.
|
||||
if self.auth_process and self.auth_process.state() == QProcess.NotRunning:
|
||||
output_path = self.auth_output_path
|
||||
self.auth_output_path = None
|
||||
self.auth_process = None
|
||||
self._set_auth_ui_state(False)
|
||||
self.status_label.setText(f"Статус: {self._auth_process_error_text}")
|
||||
self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR")
|
||||
self._auth_process_error_text = None
|
||||
self._auth_relogin_in_progress = False
|
||||
self.set_ui_state(self.token is not None)
|
||||
try:
|
||||
if output_path and os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _on_auth_process_finished(self, exit_code, _exit_status):
|
||||
output_path = self.auth_output_path
|
||||
self.auth_output_path = None
|
||||
self.auth_process = None
|
||||
self._set_auth_ui_state(False)
|
||||
|
||||
if self._auth_process_error_text:
|
||||
self.status_label.setText(f"Статус: {self._auth_process_error_text}")
|
||||
self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR")
|
||||
self._auth_process_error_text = None
|
||||
self._auth_relogin_in_progress = False
|
||||
self.set_ui_state(self.token is not None)
|
||||
return
|
||||
|
||||
if exit_code != 0:
|
||||
self.status_label.setText(f"Статус: авторизация не удалась (код {exit_code}).")
|
||||
self._log_event("auth_process_finished", f"exit_code={exit_code}", level="WARN")
|
||||
self._auth_relogin_in_progress = False
|
||||
self.set_ui_state(self.token is not None)
|
||||
return
|
||||
|
||||
token = None
|
||||
expires_in = 0
|
||||
if output_path and os.path.exists(output_path):
|
||||
try:
|
||||
with open(output_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
token = data.get("token")
|
||||
expires_in = data.get("expires_in", 0)
|
||||
except Exception as e:
|
||||
self._log_event("auth_result_parse", f"Ошибка чтения результата авторизации: {e}", level="ERROR")
|
||||
finally:
|
||||
try:
|
||||
if os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
|
||||
|
||||
self._auth_relogin_in_progress = False
|
||||
self.handle_new_auth_token(token, expires_in)
|
||||
|
||||
def start_auth(self, keep_status_text=False):
|
||||
if self.auth_process and self.auth_process.state() != QProcess.NotRunning:
|
||||
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
|
||||
return
|
||||
|
||||
if keep_status_text and hasattr(self, "_relogin_status_text"):
|
||||
status_text = self._relogin_status_text
|
||||
self._relogin_status_text = None
|
||||
else:
|
||||
status_text = "Статус: ожидание авторизации..."
|
||||
self.status_label.setText(status_text)
|
||||
|
||||
auth_url = (
|
||||
"https://oauth.vk.com/authorize?"
|
||||
"client_id=2685278&"
|
||||
@@ -664,38 +1244,22 @@ class VkChatManager(QMainWindow):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
cmd = [sys.executable, "--auth", auth_url, output_path]
|
||||
try:
|
||||
subprocess.check_call(cmd)
|
||||
except Exception as e:
|
||||
self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}")
|
||||
return
|
||||
program, args = self._build_auth_command(auth_url, output_path)
|
||||
self.auth_output_path = output_path
|
||||
self._auth_process_error_text = None
|
||||
|
||||
if not os.path.exists(output_path):
|
||||
self.status_label.setText("Статус: авторизация не удалась")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(output_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
token = data.get("token")
|
||||
expires_in = data.get("expires_in", 0)
|
||||
except Exception:
|
||||
token = None
|
||||
expires_in = 0
|
||||
|
||||
try:
|
||||
if os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.handle_new_auth_token(token, expires_in)
|
||||
process = QProcess(self)
|
||||
process.finished.connect(self._on_auth_process_finished)
|
||||
process.errorOccurred.connect(self._on_auth_process_error)
|
||||
self.auth_process = process
|
||||
self._set_auth_ui_state(True)
|
||||
process.start(program, args)
|
||||
|
||||
def handle_new_auth_token(self, token, expires_in):
|
||||
if not token:
|
||||
self.status_label.setText("Статус: Авторизация не удалась")
|
||||
self.set_ui_state(False)
|
||||
self._auth_relogin_in_progress = False
|
||||
return
|
||||
|
||||
self.token = token
|
||||
@@ -704,9 +1268,11 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
self.token_input.setText(self.token[:50] + "...")
|
||||
self.status_label.setText("Статус: авторизован")
|
||||
self.vk_session = VkApi(token=self.token)
|
||||
self.vk = self.vk_session.get_api()
|
||||
self.vk_service.set_token(self.token)
|
||||
self.vk_session = self.vk_service.session
|
||||
self.vk = self.vk_service.api
|
||||
self.set_ui_state(True)
|
||||
self._auth_relogin_in_progress = False
|
||||
self.load_chats()
|
||||
|
||||
def handle_auth_token_on_load(self, token, expiration_time):
|
||||
@@ -715,9 +1281,11 @@ class VkChatManager(QMainWindow):
|
||||
|
||||
self.token_input.setText(self.token[:50] + "...")
|
||||
self.status_label.setText("Статус: авторизован (токен загружен)")
|
||||
self.vk_session = VkApi(token=self.token)
|
||||
self.vk = self.vk_session.get_api()
|
||||
self.vk_service.set_token(self.token)
|
||||
self.vk_session = self.vk_service.session
|
||||
self.vk = self.vk_service.api
|
||||
self.set_ui_state(True)
|
||||
self._auth_relogin_in_progress = False
|
||||
self.load_chats()
|
||||
|
||||
def _clear_chat_tabs(self):
|
||||
@@ -729,24 +1297,80 @@ class VkChatManager(QMainWindow):
|
||||
chk_list.clear()
|
||||
|
||||
def _vk_error_code(self, exc):
|
||||
error = getattr(exc, "error", None)
|
||||
if isinstance(error, dict):
|
||||
return error.get("error_code")
|
||||
return getattr(exc, "code", None)
|
||||
return self.vk_service.vk_error_code(exc)
|
||||
|
||||
def _is_auth_token_error(self, exc):
|
||||
message = self._format_vk_error(exc).lower()
|
||||
return self.vk_service.is_auth_error(exc, message)
|
||||
|
||||
def _clear_auth_state(self, stop_timer=False, remove_token_file=True):
|
||||
self.token = None
|
||||
self.token_expiration_time = None
|
||||
self.vk_service.clear()
|
||||
self.vk_session = None
|
||||
self.vk = None
|
||||
self.user_ids_to_process.clear()
|
||||
self._set_vk_url_input_text("")
|
||||
self.token_input.clear()
|
||||
if stop_timer and self.token_countdown_timer.isActive():
|
||||
self.token_countdown_timer.stop()
|
||||
self.token_timer_label.setText("Срок действия токена: Н/Д")
|
||||
self.status_label.setText("Статус: не авторизован")
|
||||
self._clear_chat_tabs()
|
||||
self.set_ui_state(False)
|
||||
if remove_token_file:
|
||||
try:
|
||||
if os.path.exists(TOKEN_FILE):
|
||||
os.remove(TOKEN_FILE)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _force_relogin(self, exc, action_name):
|
||||
now = time.monotonic()
|
||||
if self._auth_relogin_in_progress:
|
||||
self._log_event("force_relogin_skip", f"already_in_progress action={action_name}", level="WARN")
|
||||
return
|
||||
|
||||
elapsed = now - self._last_auth_relogin_ts
|
||||
if elapsed < AUTH_RELOGIN_BACKOFF_SECONDS:
|
||||
wait_seconds = int(AUTH_RELOGIN_BACKOFF_SECONDS - elapsed) + 1
|
||||
self.status_label.setText(f"Статус: повторная авторизация через {wait_seconds} сек.")
|
||||
self._log_event("force_relogin_backoff", f"action={action_name}; wait={wait_seconds}s", level="WARN")
|
||||
return
|
||||
|
||||
self._auth_relogin_in_progress = True
|
||||
self._last_auth_relogin_ts = now
|
||||
error_code = self._vk_error_code(exc)
|
||||
self._log_event(
|
||||
"force_relogin",
|
||||
f"action={action_name}; code={error_code}; message={self._format_vk_error(exc)}",
|
||||
level="WARN",
|
||||
)
|
||||
|
||||
self._clear_auth_state()
|
||||
self._relogin_status_text = "Статус: Токен отозван VK, выполните повторный вход."
|
||||
QMessageBox.warning(
|
||||
self,
|
||||
"Требуется повторная авторизация",
|
||||
f"Во время {action_name} получена ошибка авторизации:\n"
|
||||
f"{self._format_vk_error(exc)}\n\n"
|
||||
"Сейчас откроется окно авторизации VK."
|
||||
)
|
||||
self.start_auth(keep_status_text=True)
|
||||
|
||||
def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):
|
||||
self._log_error(context, exc)
|
||||
if self._is_auth_token_error(exc):
|
||||
self._force_relogin(exc, action_name or context)
|
||||
return True
|
||||
if ui_message_prefix:
|
||||
QMessageBox.critical(self, "Ошибка", f"{ui_message_prefix}: {self._format_vk_error(exc)}")
|
||||
if disable_ui:
|
||||
self.set_ui_state(False)
|
||||
return False
|
||||
|
||||
def _vk_call_with_retry(self, func, *args, **kwargs):
|
||||
max_attempts = 5
|
||||
for attempt in range(1, max_attempts + 1):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except VkApiError as e:
|
||||
code = self._vk_error_code(e)
|
||||
if code not in (6, 9, 10) or attempt == max_attempts:
|
||||
raise
|
||||
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
||||
if code == 9:
|
||||
delay = max(delay, 1.0)
|
||||
time.sleep(delay)
|
||||
return self.vk_service.call_with_retry(func, *args, **kwargs)
|
||||
|
||||
def load_chats(self):
|
||||
self._clear_chat_tabs()
|
||||
@@ -814,7 +1438,12 @@ class VkChatManager(QMainWindow):
|
||||
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
|
||||
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
|
||||
except VkApiError as e:
|
||||
self._log_error("load_chats", e)
|
||||
if self._handle_vk_api_error(
|
||||
"load_chats",
|
||||
e,
|
||||
action_name="загрузки чатов",
|
||||
):
|
||||
return
|
||||
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
|
||||
self.set_ui_state(False)
|
||||
finally:
|
||||
@@ -913,7 +1542,8 @@ class VkChatManager(QMainWindow):
|
||||
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
|
||||
results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.")
|
||||
except VkApiError as e:
|
||||
self._log_error("execute_user_action", e)
|
||||
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
|
||||
return
|
||||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||||
finally:
|
||||
processed += 1
|
||||
@@ -992,7 +1622,8 @@ class VkChatManager(QMainWindow):
|
||||
)
|
||||
results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.")
|
||||
except VkApiError as e:
|
||||
self._log_error("set_user_admin", e)
|
||||
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
|
||||
return
|
||||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||||
finally:
|
||||
processed += 1
|
||||
|
||||
56
tests/test_auth_relogin_smoke.py
Normal file
56
tests/test_auth_relogin_smoke.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class AuthReloginSmokeTests(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
cls.source = Path("main.py").read_text(encoding="utf-8")
|
||||
|
||||
def test_auth_command_builder_handles_frozen_and_source(self):
|
||||
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.source)
|
||||
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.source)
|
||||
self.assertIn('return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path]', self.source)
|
||||
|
||||
def test_auth_runs_via_qprocess(self):
|
||||
self.assertIn("process = QProcess(self)", self.source)
|
||||
self.assertIn("process.start(program, args)", self.source)
|
||||
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.source)
|
||||
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.NotRunning:", self.source)
|
||||
|
||||
def test_force_relogin_has_backoff_and_event_log(self):
|
||||
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.source)
|
||||
self.assertIn("if self._auth_relogin_in_progress:", self.source)
|
||||
self.assertIn("force_relogin_backoff", self.source)
|
||||
self.assertIn("force_relogin", self.source)
|
||||
|
||||
def test_auth_error_paths_trigger_force_relogin(self):
|
||||
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.source)
|
||||
self.assertIn("self._force_relogin(exc, action_name or context)", self.source)
|
||||
self.assertIn('"load_chats",', self.source)
|
||||
self.assertIn('"execute_user_action",', self.source)
|
||||
self.assertIn('"set_user_admin",', self.source)
|
||||
|
||||
def test_tab_checkbox_lists_use_existing_attributes(self):
|
||||
self.assertIn("self.warehouse_chat_checkboxes", self.source)
|
||||
self.assertIn("self.coffee_chat_checkboxes", self.source)
|
||||
self.assertNotIn("self.retail_warehouse_checkboxes", self.source)
|
||||
self.assertNotIn("self.retail_coffee_checkboxes", self.source)
|
||||
|
||||
def test_update_check_actions_exist(self):
|
||||
self.assertIn("from app_version import APP_VERSION", self.source)
|
||||
self.assertIn("UPDATE_REPOSITORY = ", self.source)
|
||||
self.assertIn('QAction("Проверить обновления", self)', self.source)
|
||||
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source)
|
||||
self.assertIn("class UpdateChecker(QObject):", self.source)
|
||||
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source)
|
||||
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.source)
|
||||
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.source)
|
||||
self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.source)
|
||||
self.assertIn("set TARGET_PID=", self.source)
|
||||
self.assertIn("set BACKUP_DIR=", self.source)
|
||||
self.assertIn(":rollback", self.source)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user