merge: release 1.7.0
Some checks failed
Desktop CI / tests (push) Successful in 12s
Desktop Release / release (push) Failing after 1m49s

This commit is contained in:
2026-02-15 20:38:32 +03:00
12 changed files with 731 additions and 15 deletions

View File

@@ -1 +1 @@
APP_VERSION = "1.6.4"
APP_VERSION = "1.7.0"

172
main.py
View File

@@ -14,7 +14,18 @@ import tempfile
import urllib.request
import zipfile
from app_version import APP_VERSION
from services import UpdateChecker, VkService, detect_update_repository_url
from services import (
AutoUpdateService,
UpdateChecker,
VkService,
detect_update_repository_url,
load_chat_conversations,
load_token as token_store_load_token,
resolve_user_ids as chat_resolve_user_ids,
save_token as token_store_save_token,
)
from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog
from ui.main_window import instructions_text
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
@@ -22,7 +33,7 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices
from urllib.parse import urlparse, parse_qs, unquote
from urllib.parse import parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess
@@ -272,6 +283,7 @@ class VkChatManager(QMainWindow):
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)
self.instructions.setFixedHeight(120)
self.instructions.setPlainText(instructions_text())
layout.addWidget(self.instructions)
layout.addWidget(QLabel("Access Token VK:"))
@@ -359,7 +371,7 @@ class VkChatManager(QMainWindow):
self.resolve_timer.start()
def open_multi_link_dialog(self):
dialog = MultiLinkDialog(self)
dialog = UIMultiLinkDialog(self)
if dialog.exec():
links = dialog.get_links()
if links:
@@ -931,7 +943,7 @@ class VkChatManager(QMainWindow):
raise last_error
def load_saved_token_on_startup(self):
loaded_token, expiration_time = load_token()
loaded_token, expiration_time = token_store_load_token(TOKEN_FILE)
if loaded_token:
self.handle_auth_token_on_load(loaded_token, expiration_time)
else:
@@ -1057,7 +1069,12 @@ class VkChatManager(QMainWindow):
self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = save_token(self.token, expires_in)
self.token_expiration_time = token_store_save_token(
self.token,
TOKEN_FILE,
APP_DATA_DIR,
expires_in=expires_in,
)
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован")
@@ -1432,6 +1449,151 @@ class VkChatManager(QMainWindow):
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list):
if not self.vk:
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return
self.user_ids_to_process.clear()
resolved_ids = []
failed_links = []
self._set_busy(True, "Статус: Определяю ID...")
try:
resolved_ids, failed_items = chat_resolve_user_ids(
self._vk_call_with_retry,
self.vk,
links_list,
)
for failed_link, failed_exc in failed_items:
if isinstance(failed_exc, VkApiError):
if self._handle_vk_api_error("resolveScreenName", failed_exc, action_name="получения ID пользователей"):
return
failed_links.append(f"{failed_link} ({self._format_vk_error(failed_exc)})")
else:
failed_links.append(failed_link)
finally:
self._set_busy(False)
self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links:
QMessageBox.warning(
self,
"Ошибка получения ID",
"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links),
)
self.status_label.setText(status_message)
self.set_ui_state(self.token is not None)
def load_chats(self):
self._clear_chat_tabs()
layouts = [
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
self.other_tab.findChild(QWidget).findChild(QVBoxLayout),
]
try:
self._set_busy(True, "Статус: загрузка чатов...")
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
for conv in conversations:
if conv["conversation"]["peer"]["type"] != "chat":
continue
chat_id = conv["conversation"]["peer"]["local_id"]
title = conv["conversation"]["chat_settings"]["title"]
self.chats.append({"id": chat_id, "title": title})
checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id)
if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox)
elif "AG розница" in title:
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
self.retail_chat_checkboxes.append(checkbox)
elif "AG склад" in title:
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
self.warehouse_chat_checkboxes.append(checkbox)
elif "AG кофейни" in title:
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
self.coffee_chat_checkboxes.append(checkbox)
else:
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
self.other_chat_checkboxes.append(checkbox)
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
return
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
self._set_busy(False)
def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
if os.name != "nt":
QMessageBox.information(
self,
"Автообновление",
"Автообновление пока поддерживается только в Windows-сборке.",
)
return False
if not getattr(sys, "frozen", False):
QMessageBox.information(
self,
"Автообновление",
"Автообновление доступно в собранной версии приложения (.exe).",
)
return False
if not download_url:
QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
return False
self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
self._set_busy(True)
try:
work_dir, source_dir = AutoUpdateService.prepare_update(
download_url=download_url,
checksum_url=checksum_url,
download_name=download_name,
)
app_exe = sys.executable
script_path = AutoUpdateService.build_update_script(
app_dir=os.path.dirname(app_exe),
source_dir=source_dir,
exe_name=os.path.basename(app_exe),
target_pid=os.getpid(),
)
AutoUpdateService.launch_update_script(script_path, work_dir)
self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
QMessageBox.information(
self,
"Обновление запущено",
"Обновление скачано. Приложение будет перезапущено.",
)
QApplication.instance().quit()
return True
except Exception as e:
self._log_event("auto_update_failed", str(e), level="ERROR")
QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
return False
finally:
self._set_busy(False)
if __name__ == "__main__":
if "--auth" in sys.argv:

View File

@@ -1,3 +1,5 @@
from .auto_update_service import AutoUpdateService
from .chat_actions import load_chat_conversations, resolve_user_ids
from .token_store import load_token, save_token
from .update_service import UpdateChecker, detect_update_repository_url
from .vk_service import VkService

View File

@@ -0,0 +1,166 @@
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import urllib.request
import zipfile
class AutoUpdateService:
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
download_url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=60) as response:
with open(destination_path, "wb") as f:
shutil.copyfileobj(response, f)
@staticmethod
def download_update_text(url):
request = urllib.request.Request(
url,
headers={"User-Agent": "AnabasisManager-Updater"},
)
with urllib.request.urlopen(request, timeout=30) as response:
return response.read().decode("utf-8", errors="replace")
@staticmethod
def sha256_file(path):
digest = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest().lower()
@staticmethod
def extract_sha256_from_text(checksum_text, target_file_name):
target = (target_file_name or "").strip().lower()
for raw_line in checksum_text.splitlines():
line = raw_line.strip()
if not line:
continue
match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
if not match:
continue
checksum = match.group(1).lower()
if not target:
return checksum
line_lower = line.lower()
if target in line_lower:
return checksum
if os.path.basename(target) in line_lower:
return checksum
return ""
@classmethod
def verify_update_checksum(cls, zip_path, checksum_url, download_name):
if not checksum_url:
raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
checksum_text = cls.download_update_text(checksum_url)
expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
if not expected_hash:
raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
actual_hash = cls.sha256_file(zip_path)
if actual_hash != expected_hash:
raise RuntimeError("SHA256 не совпадает, обновление отменено.")
@staticmethod
def locate_extracted_root(extracted_dir):
entries = []
for name in os.listdir(extracted_dir):
full_path = os.path.join(extracted_dir, name)
if os.path.isdir(full_path):
entries.append(full_path)
if len(entries) == 1:
candidate = entries[0]
if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
return candidate
return extracted_dir
@staticmethod
def build_update_script(app_dir, source_dir, exe_name, target_pid):
script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
script_lines = [
"@echo off",
"setlocal EnableExtensions",
f"set \"APP_DIR={app_dir}\"",
f"set \"SRC_DIR={source_dir}\"",
f"set \"EXE_NAME={exe_name}\"",
f"set \"TARGET_PID={target_pid}\"",
"set \"BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%\"",
"set \"UPDATE_LOG=%APP_DIR%\\update_error.log\"",
"echo [%DATE% %TIME%] Update start > \"%UPDATE_LOG%\"",
"if not exist \"%SRC_DIR%\\%EXE_NAME%\" (",
" echo Source executable not found: \"%SRC_DIR%\\%EXE_NAME%\" >> \"%UPDATE_LOG%\"",
" exit /b 3",
")",
"set /a WAIT_LOOPS=0",
":wait_for_exit",
"tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
"if %ERRORLEVEL% EQU 0 (",
" set /a WAIT_LOOPS+=1",
" if %WAIT_LOOPS% GEQ 180 (",
" echo Timeout waiting for process %TARGET_PID% to exit >> \"%UPDATE_LOG%\"",
" goto :backup",
" )",
" timeout /t 1 /nobreak >nul",
" goto :wait_for_exit",
")",
":backup",
"timeout /t 1 /nobreak >nul",
"mkdir \"%BACKUP_DIR%\" >nul 2>&1",
"robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :backup_error",
"robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
"set \"RC=%ERRORLEVEL%\"",
"if %RC% GEQ 8 goto :rollback",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"timeout /t 2 /nobreak >nul",
"tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
"if %ERRORLEVEL% NEQ 0 goto :rollback",
"echo Update success >> \"%UPDATE_LOG%\"",
"rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
"exit /b 0",
":rollback",
"robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
"start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
"echo Auto-update failed. Rollback executed. >> \"%UPDATE_LOG%\"",
"exit /b 2",
":backup_error",
"echo Auto-update failed during backup. Code %RC% >> \"%UPDATE_LOG%\"",
"exit /b %RC%",
]
with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
f.write("\r\n".join(script_lines) + "\r\n")
return script_path
@staticmethod
def launch_update_script(script_path, work_dir):
creation_flags = 0
if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
if hasattr(subprocess, "DETACHED_PROCESS"):
creation_flags |= subprocess.DETACHED_PROCESS
subprocess.Popen(
["cmd.exe", "/c", script_path],
cwd=work_dir,
creationflags=creation_flags,
)
@classmethod
def prepare_update(cls, download_url, checksum_url, download_name):
work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
zip_path = os.path.join(work_dir, "update.zip")
unpack_dir = os.path.join(work_dir, "extracted")
cls.download_update_archive(download_url, zip_path)
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

46
services/chat_actions.py Normal file
View File

@@ -0,0 +1,46 @@
from urllib.parse import urlparse
def resolve_user_ids(vk_call_with_retry, vk_api, links):
resolved_ids = []
failed_links = []
for link in links:
try:
path = urlparse(link).path
screen_name = path.split("/")[-1] if path else ""
if not screen_name and len(path.split("/")) > 1:
screen_name = path.split("/")[-2]
if not screen_name:
failed_links.append((link, None))
continue
resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get("type") == "user":
resolved_ids.append(resolved_object["object_id"])
else:
failed_links.append((link, None))
except Exception as e:
failed_links.append((link, e))
return resolved_ids, failed_links
def load_chat_conversations(vk_call_with_retry, vk_api):
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = vk_call_with_retry(vk_api.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
return conversations

136
services/token_store.py Normal file
View File

@@ -0,0 +1,136 @@
import base64
import ctypes
import json
import os
import time
from ctypes import wintypes
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def save_token(token, token_file, app_data_dir, expires_in=0):
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(app_data_dir, exist_ok=True)
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception:
pass
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted,
}
with open(token_file, "w", encoding="utf-8") as f:
json.dump(data, f)
return expiration_time
def load_token(token_file):
if not os.path.exists(token_file):
return None, None
with open(token_file, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception:
try:
os.remove(token_file)
except Exception:
pass
return None, None
expiration_time = data.get("expiration_time")
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
try:
os.remove(token_file)
except Exception:
pass
return None, None

View File

@@ -28,7 +28,10 @@ class AuthReloginSmokeTests(unittest.TestCase):
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.main_source)
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
@@ -42,17 +45,15 @@ class AuthReloginSmokeTests(unittest.TestCase):
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import UpdateChecker, VkService, detect_update_repository_url", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.main_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.main_source)
self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.main_source)
self.assertIn("set TARGET_PID=", self.main_source)
self.assertIn("set BACKUP_DIR=", self.main_source)
self.assertIn(":rollback", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.build_update_script", self.main_source)
if __name__ == "__main__":

View File

@@ -0,0 +1,51 @@
import hashlib
import importlib.util
import tempfile
import unittest
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"auto_update_service",
Path("services/auto_update_service.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
AutoUpdateService = _MODULE.AutoUpdateService
class AutoUpdateServiceTests(unittest.TestCase):
def test_extract_sha256_from_text(self):
digest = "a" * 64
text = f"{digest} AnabasisManager-1.0.0-win.zip\n"
extracted = AutoUpdateService.extract_sha256_from_text(
text,
"AnabasisManager-1.0.0-win.zip",
)
self.assertEqual(extracted, digest)
def test_sha256_file(self):
with tempfile.TemporaryDirectory() as td:
path = Path(td) / "payload.bin"
payload = b"anabasis"
path.write_bytes(payload)
expected = hashlib.sha256(payload).hexdigest()
self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected)
def test_build_update_script_contains_core_vars(self):
script = AutoUpdateService.build_update_script(
app_dir=r"C:\Apps\AnabasisManager",
source_dir=r"C:\Temp\Extracted",
exe_name="AnabasisManager.exe",
target_pid=1234,
)
script_text = Path(script).read_text(encoding="utf-8")
self.assertIn("set \"APP_DIR=", script_text)
self.assertIn("set \"SRC_DIR=", script_text)
self.assertIn("set \"EXE_NAME=", script_text)
self.assertIn("set \"TARGET_PID=", script_text)
self.assertIn(":rollback", script_text)
self.assertIn("if not exist \"%SRC_DIR%\\%EXE_NAME%\"", script_text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,65 @@
import unittest
import importlib.util
from types import SimpleNamespace
from pathlib import Path
_SPEC = importlib.util.spec_from_file_location(
"chat_actions",
Path("services/chat_actions.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_chat_conversations = _MODULE.load_chat_conversations
resolve_user_ids = _MODULE.resolve_user_ids
class ChatActionsTests(unittest.TestCase):
def test_resolve_user_ids_mixed_results(self):
mapping = {
"id1": {"type": "user", "object_id": 1},
"id2": {"type": "group", "object_id": 2},
}
def call_with_retry(func, **kwargs):
return func(**kwargs)
def resolve_screen_name(screen_name):
if screen_name == "boom":
raise RuntimeError("boom")
return mapping.get(screen_name)
vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name))
links = [
"https://vk.com/id1",
"https://vk.com/id2",
"https://vk.com/boom",
"https://vk.com/",
]
resolved, failed = resolve_user_ids(call_with_retry, vk_api, links)
self.assertEqual(resolved, [1])
self.assertEqual(len(failed), 3)
self.assertEqual(failed[0][0], "https://vk.com/id2")
self.assertIsNone(failed[0][1])
def test_load_chat_conversations_paginated(self):
pages = [
{"items": [{"id": 1}], "next_from": "page-2"},
{"items": [{"id": 2}]},
]
def get_conversations(**kwargs):
if kwargs.get("start_from") == "page-2":
return pages[1]
return pages[0]
def call_with_retry(func, **kwargs):
return func(**kwargs)
vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations))
items = load_chat_conversations(call_with_retry, vk_api)
self.assertEqual(items, [{"id": 1}, {"id": 2}])
if __name__ == "__main__":
unittest.main()

53
tests/test_token_store.py Normal file
View File

@@ -0,0 +1,53 @@
import tempfile
import unittest
import importlib.util
from pathlib import Path
from unittest.mock import patch
_SPEC = importlib.util.spec_from_file_location(
"token_store",
Path("services/token_store.py"),
)
_MODULE = importlib.util.module_from_spec(_SPEC)
_SPEC.loader.exec_module(_MODULE)
load_token = _MODULE.load_token
save_token = _MODULE.save_token
class TokenStoreTests(unittest.TestCase):
def test_save_and_load_non_expiring_token(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
expiration = save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=0,
)
token, loaded_expiration = load_token(str(token_file))
self.assertEqual(expiration, 0)
self.assertEqual(token, "abc123")
self.assertEqual(loaded_expiration, 0)
def test_expired_token_is_removed(self):
with tempfile.TemporaryDirectory() as td:
token_file = Path(td) / "token.json"
with patch.object(_MODULE.os, "name", "posix"):
with patch.object(_MODULE.time, "time", return_value=1000):
save_token(
token="abc123",
token_file=str(token_file),
app_data_dir=td,
expires_in=1,
)
with patch.object(_MODULE.time, "time", return_value=2000):
token, expiration = load_token(str(token_file))
self.assertIsNone(token)
self.assertIsNone(expiration)
if __name__ == "__main__":
unittest.main()

25
ui/dialogs.py Normal file
View File

@@ -0,0 +1,25 @@
from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()]

9
ui/main_window.py Normal file
View File

@@ -0,0 +1,9 @@
def instructions_text():
return (
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)