diff --git a/main.py b/main.py index a1ac4df..8ab6f33 100644 --- a/main.py +++ b/main.py @@ -14,7 +14,18 @@ import tempfile import urllib.request import zipfile from app_version import APP_VERSION -from services import UpdateChecker, VkService, detect_update_repository_url +from services import ( + AutoUpdateService, + UpdateChecker, + VkService, + detect_update_repository_url, + load_chat_conversations, + load_token as token_store_load_token, + resolve_user_ids as chat_resolve_user_ids, + save_token as token_store_save_token, +) +from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog +from ui.main_window import instructions_text from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, @@ -22,7 +33,7 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QProgressBar) from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer from PySide6.QtGui import QIcon, QAction, QDesktopServices -from urllib.parse import urlparse, parse_qs, unquote +from urllib.parse import parse_qs, unquote from vk_api.exceptions import VkApiError from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QProcess @@ -272,6 +283,7 @@ class VkChatManager(QMainWindow): "5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'." ) self.instructions.setFixedHeight(120) + self.instructions.setPlainText(instructions_text()) layout.addWidget(self.instructions) layout.addWidget(QLabel("Access Token VK:")) @@ -359,7 +371,7 @@ class VkChatManager(QMainWindow): self.resolve_timer.start() def open_multi_link_dialog(self): - dialog = MultiLinkDialog(self) + dialog = UIMultiLinkDialog(self) if dialog.exec(): links = dialog.get_links() if links: @@ -931,7 +943,7 @@ class VkChatManager(QMainWindow): raise last_error def load_saved_token_on_startup(self): - loaded_token, expiration_time = load_token() + loaded_token, expiration_time = token_store_load_token(TOKEN_FILE) if loaded_token: self.handle_auth_token_on_load(loaded_token, expiration_time) else: @@ -1057,7 +1069,12 @@ class VkChatManager(QMainWindow): self.token = token # Сохраняем и получаем корректный expiration_time (0 или будущее время) - self.token_expiration_time = save_token(self.token, expires_in) + self.token_expiration_time = token_store_save_token( + self.token, + TOKEN_FILE, + APP_DATA_DIR, + expires_in=expires_in, + ) self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован") @@ -1432,6 +1449,151 @@ class VkChatManager(QMainWindow): self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) + # Refactor overrides: keep logic in service modules and thin UI orchestration here. + def _process_links_list(self, links_list): + if not self.vk: + QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") + return + + self.user_ids_to_process.clear() + resolved_ids = [] + failed_links = [] + + self._set_busy(True, "Статус: Определяю ID...") + try: + resolved_ids, failed_items = chat_resolve_user_ids( + self._vk_call_with_retry, + self.vk, + links_list, + ) + for failed_link, failed_exc in failed_items: + if isinstance(failed_exc, VkApiError): + if self._handle_vk_api_error("resolveScreenName", failed_exc, action_name="получения ID пользователей"): + return + failed_links.append(f"{failed_link} ({self._format_vk_error(failed_exc)})") + else: + failed_links.append(failed_link) + finally: + self._set_busy(False) + + self.user_ids_to_process = resolved_ids + status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)." + if len(links_list) > 1: + self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") + + if failed_links: + QMessageBox.warning( + self, + "Ошибка получения ID", + "Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links), + ) + + self.status_label.setText(status_message) + self.set_ui_state(self.token is not None) + + def load_chats(self): + self._clear_chat_tabs() + + layouts = [ + self.office_tab.findChild(QWidget).findChild(QVBoxLayout), + self.retail_tab.findChild(QWidget).findChild(QVBoxLayout), + self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout), + self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout), + self.other_tab.findChild(QWidget).findChild(QVBoxLayout), + ] + + try: + self._set_busy(True, "Статус: загрузка чатов...") + conversations = load_chat_conversations(self._vk_call_with_retry, self.vk) + for conv in conversations: + if conv["conversation"]["peer"]["type"] != "chat": + continue + + chat_id = conv["conversation"]["peer"]["local_id"] + title = conv["conversation"]["chat_settings"]["title"] + self.chats.append({"id": chat_id, "title": title}) + checkbox = QCheckBox(f"{title} (id: {chat_id})") + checkbox.setProperty("chat_id", chat_id) + + if "AG офис" in title: + layouts[0].insertWidget(layouts[0].count() - 1, checkbox) + self.office_chat_checkboxes.append(checkbox) + elif "AG розница" in title: + layouts[1].insertWidget(layouts[1].count() - 1, checkbox) + self.retail_chat_checkboxes.append(checkbox) + elif "AG склад" in title: + layouts[2].insertWidget(layouts[2].count() - 1, checkbox) + self.warehouse_chat_checkboxes.append(checkbox) + elif "AG кофейни" in title: + layouts[3].insertWidget(layouts[3].count() - 1, checkbox) + self.coffee_chat_checkboxes.append(checkbox) + else: + layouts[4].insertWidget(layouts[4].count() - 1, checkbox) + self.other_chat_checkboxes.append(checkbox) + + self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})") + self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})") + self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") + self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") + self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") + except VkApiError as e: + if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"): + return + QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}") + self.set_ui_state(False) + finally: + self._set_busy(False) + + def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""): + if os.name != "nt": + QMessageBox.information( + self, + "Автообновление", + "Автообновление пока поддерживается только в Windows-сборке.", + ) + return False + if not getattr(sys, "frozen", False): + QMessageBox.information( + self, + "Автообновление", + "Автообновление доступно в собранной версии приложения (.exe).", + ) + return False + if not download_url: + QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.") + return False + + self.status_label.setText(f"Статус: загрузка обновления {latest_version}...") + self._set_busy(True) + try: + work_dir, source_dir = AutoUpdateService.prepare_update( + download_url=download_url, + checksum_url=checksum_url, + download_name=download_name, + ) + app_exe = sys.executable + script_path = AutoUpdateService.build_update_script( + app_dir=os.path.dirname(app_exe), + source_dir=source_dir, + exe_name=os.path.basename(app_exe), + target_pid=os.getpid(), + ) + AutoUpdateService.launch_update_script(script_path, work_dir) + self._log_event("auto_update", f"Update {latest_version} started from {download_url}") + QMessageBox.information( + self, + "Обновление запущено", + "Обновление скачано. Приложение будет перезапущено.", + ) + QApplication.instance().quit() + return True + except Exception as e: + self._log_event("auto_update_failed", str(e), level="ERROR") + QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}") + return False + finally: + self._set_busy(False) + if __name__ == "__main__": if "--auth" in sys.argv: diff --git a/services/__init__.py b/services/__init__.py index 4ee06d9..2cfba1c 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -1,3 +1,5 @@ +from .auto_update_service import AutoUpdateService +from .chat_actions import load_chat_conversations, resolve_user_ids +from .token_store import load_token, save_token from .update_service import UpdateChecker, detect_update_repository_url from .vk_service import VkService - diff --git a/services/auto_update_service.py b/services/auto_update_service.py new file mode 100644 index 0000000..daaabaf --- /dev/null +++ b/services/auto_update_service.py @@ -0,0 +1,152 @@ +import hashlib +import os +import re +import shutil +import subprocess +import tempfile +import urllib.request +import zipfile + + +class AutoUpdateService: + @staticmethod + def download_update_archive(download_url, destination_path): + request = urllib.request.Request( + download_url, + headers={"User-Agent": "AnabasisManager-Updater"}, + ) + with urllib.request.urlopen(request, timeout=60) as response: + with open(destination_path, "wb") as f: + shutil.copyfileobj(response, f) + + @staticmethod + def download_update_text(url): + request = urllib.request.Request( + url, + headers={"User-Agent": "AnabasisManager-Updater"}, + ) + with urllib.request.urlopen(request, timeout=30) as response: + return response.read().decode("utf-8", errors="replace") + + @staticmethod + def sha256_file(path): + digest = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest().lower() + + @staticmethod + def extract_sha256_from_text(checksum_text, target_file_name): + target = (target_file_name or "").strip().lower() + for raw_line in checksum_text.splitlines(): + line = raw_line.strip() + if not line: + continue + match = re.search(r"\b([A-Fa-f0-9]{64})\b", line) + if not match: + continue + checksum = match.group(1).lower() + if not target: + return checksum + line_lower = line.lower() + if target in line_lower: + return checksum + if os.path.basename(target) in line_lower: + return checksum + return "" + + @classmethod + def verify_update_checksum(cls, zip_path, checksum_url, download_name): + if not checksum_url: + raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.") + checksum_text = cls.download_update_text(checksum_url) + expected_hash = cls.extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path)) + if not expected_hash: + raise RuntimeError("Не удалось найти SHA256 для архива обновления.") + actual_hash = cls.sha256_file(zip_path) + if actual_hash != expected_hash: + raise RuntimeError("SHA256 не совпадает, обновление отменено.") + + @staticmethod + def locate_extracted_root(extracted_dir): + entries = [] + for name in os.listdir(extracted_dir): + full_path = os.path.join(extracted_dir, name) + if os.path.isdir(full_path): + entries.append(full_path) + if len(entries) == 1: + candidate = entries[0] + if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")): + return candidate + return extracted_dir + + @staticmethod + def build_update_script(app_dir, source_dir, exe_name, target_pid): + script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd") + script_lines = [ + "@echo off", + "setlocal", + f"set APP_DIR={app_dir}", + f"set SRC_DIR={source_dir}", + f"set EXE_NAME={exe_name}", + f"set TARGET_PID={target_pid}", + "set BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%", + ":wait_for_exit", + "tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul", + "if %ERRORLEVEL% EQU 0 (", + " timeout /t 1 /nobreak >nul", + " goto :wait_for_exit", + ")", + "timeout /t 1 /nobreak >nul", + "mkdir \"%BACKUP_DIR%\" >nul 2>&1", + "robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul", + "set RC=%ERRORLEVEL%", + "if %RC% GEQ 8 goto :backup_error", + "robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul", + "set RC=%ERRORLEVEL%", + "if %RC% GEQ 8 goto :rollback", + "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", + "timeout /t 2 /nobreak >nul", + "tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul", + "if %ERRORLEVEL% NEQ 0 goto :rollback", + "rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1", + "exit /b 0", + ":rollback", + "robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul", + "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", + "echo Auto-update failed. Rollback executed. > \"%APP_DIR%\\update_error.log\"", + "exit /b 2", + ":backup_error", + "echo Auto-update failed during backup. Code %RC% > \"%APP_DIR%\\update_error.log\"", + "exit /b %RC%", + ] + with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: + f.write("\r\n".join(script_lines) + "\r\n") + return script_path + + @staticmethod + def launch_update_script(script_path, work_dir): + creation_flags = 0 + if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): + creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP + if hasattr(subprocess, "DETACHED_PROCESS"): + creation_flags |= subprocess.DETACHED_PROCESS + subprocess.Popen( + ["cmd.exe", "/c", script_path], + cwd=work_dir, + creationflags=creation_flags, + ) + + @classmethod + def prepare_update(cls, download_url, checksum_url, download_name): + work_dir = tempfile.mkdtemp(prefix="anabasis_update_") + zip_path = os.path.join(work_dir, "update.zip") + unpack_dir = os.path.join(work_dir, "extracted") + cls.download_update_archive(download_url, zip_path) + cls.verify_update_checksum(zip_path, checksum_url, download_name) + os.makedirs(unpack_dir, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as archive: + archive.extractall(unpack_dir) + source_dir = cls.locate_extracted_root(unpack_dir) + return work_dir, source_dir diff --git a/services/chat_actions.py b/services/chat_actions.py new file mode 100644 index 0000000..0b22475 --- /dev/null +++ b/services/chat_actions.py @@ -0,0 +1,46 @@ +from urllib.parse import urlparse + + +def resolve_user_ids(vk_call_with_retry, vk_api, links): + resolved_ids = [] + failed_links = [] + for link in links: + try: + path = urlparse(link).path + screen_name = path.split("/")[-1] if path else "" + if not screen_name and len(path.split("/")) > 1: + screen_name = path.split("/")[-2] + if not screen_name: + failed_links.append((link, None)) + continue + resolved_object = vk_call_with_retry(vk_api.utils.resolveScreenName, screen_name=screen_name) + if resolved_object and resolved_object.get("type") == "user": + resolved_ids.append(resolved_object["object_id"]) + else: + failed_links.append((link, None)) + except Exception as e: + failed_links.append((link, e)) + return resolved_ids, failed_links + + +def load_chat_conversations(vk_call_with_retry, vk_api): + conversations = [] + start_from = None + seen_start_tokens = set() + while True: + params = {"count": 200, "filter": "all"} + if start_from: + if start_from in seen_start_tokens: + break + params["start_from"] = start_from + seen_start_tokens.add(start_from) + response = vk_call_with_retry(vk_api.messages.getConversations, **params) + page_items = response.get("items", []) + if not page_items: + break + conversations.extend(page_items) + start_from = response.get("next_from") + if not start_from: + break + return conversations + diff --git a/services/token_store.py b/services/token_store.py new file mode 100644 index 0000000..6e76f95 --- /dev/null +++ b/services/token_store.py @@ -0,0 +1,136 @@ +import base64 +import ctypes +import json +import os +import time +from ctypes import wintypes + + +class _DataBlob(ctypes.Structure): + _fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))] + + +_crypt32 = None +_kernel32 = None +if os.name == "nt": + _crypt32 = ctypes.WinDLL("crypt32", use_last_error=True) + _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) + _crypt32.CryptProtectData.argtypes = [ + ctypes.POINTER(_DataBlob), + wintypes.LPCWSTR, + ctypes.POINTER(_DataBlob), + ctypes.c_void_p, + ctypes.c_void_p, + wintypes.DWORD, + ctypes.POINTER(_DataBlob), + ] + _crypt32.CryptProtectData.restype = wintypes.BOOL + _crypt32.CryptUnprotectData.argtypes = [ + ctypes.POINTER(_DataBlob), + ctypes.POINTER(wintypes.LPWSTR), + ctypes.POINTER(_DataBlob), + ctypes.c_void_p, + ctypes.c_void_p, + wintypes.DWORD, + ctypes.POINTER(_DataBlob), + ] + _crypt32.CryptUnprotectData.restype = wintypes.BOOL + + +def _crypt_protect_data(data, description=""): + buffer = ctypes.create_string_buffer(data) + data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) + data_out = _DataBlob() + if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)): + raise ctypes.WinError(ctypes.get_last_error()) + try: + return ctypes.string_at(data_out.pbData, data_out.cbData) + finally: + _kernel32.LocalFree(data_out.pbData) + + +def _crypt_unprotect_data(data): + buffer = ctypes.create_string_buffer(data) + data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) + data_out = _DataBlob() + if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)): + raise ctypes.WinError(ctypes.get_last_error()) + try: + return ctypes.string_at(data_out.pbData, data_out.cbData) + finally: + _kernel32.LocalFree(data_out.pbData) + + +def _encrypt_token(token): + if os.name != "nt": + raise RuntimeError("DPAPI is available only on Windows.") + encrypted_bytes = _crypt_protect_data(token.encode("utf-8")) + return base64.b64encode(encrypted_bytes).decode("ascii") + + +def _decrypt_token(token_data): + if os.name != "nt": + raise RuntimeError("DPAPI is available only on Windows.") + encrypted_bytes = base64.b64decode(token_data.encode("ascii")) + decrypted_bytes = _crypt_unprotect_data(encrypted_bytes) + return decrypted_bytes.decode("utf-8") + + +def save_token(token, token_file, app_data_dir, expires_in=0): + try: + expires_in = int(expires_in) + except (ValueError, TypeError): + expires_in = 0 + + os.makedirs(app_data_dir, exist_ok=True) + expiration_time = (time.time() + expires_in) if expires_in > 0 else 0 + + stored_token = token + encrypted = False + if os.name == "nt": + try: + stored_token = _encrypt_token(token) + encrypted = True + except Exception: + pass + + data = { + "token": stored_token, + "expiration_time": expiration_time, + "encrypted": encrypted, + } + + with open(token_file, "w", encoding="utf-8") as f: + json.dump(data, f) + return expiration_time + + +def load_token(token_file): + if not os.path.exists(token_file): + return None, None + + with open(token_file, "r", encoding="utf-8") as f: + data = json.load(f) + + token = data.get("token") + encrypted = data.get("encrypted", False) + if token and encrypted: + try: + token = _decrypt_token(token) + except Exception: + try: + os.remove(token_file) + except Exception: + pass + return None, None + + expiration_time = data.get("expiration_time") + if token and (expiration_time == 0 or expiration_time > time.time()): + return token, expiration_time + + try: + os.remove(token_file) + except Exception: + pass + return None, None + diff --git a/tests/test_auth_relogin_smoke.py b/tests/test_auth_relogin_smoke.py index 6ebe384..9dc0888 100644 --- a/tests/test_auth_relogin_smoke.py +++ b/tests/test_auth_relogin_smoke.py @@ -28,7 +28,10 @@ class AuthReloginSmokeTests(unittest.TestCase): self.assertIn("force_relogin", self.main_source) def test_auth_error_paths_trigger_force_relogin(self): - self.assertIn("def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", self.main_source) + self.assertIn( + "def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):", + self.main_source, + ) self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source) self.assertIn('"load_chats",', self.main_source) self.assertIn('"execute_user_action",', self.main_source) @@ -42,17 +45,15 @@ class AuthReloginSmokeTests(unittest.TestCase): def test_update_check_actions_exist(self): self.assertIn("from app_version import APP_VERSION", self.main_source) - self.assertIn("from services import UpdateChecker, VkService, detect_update_repository_url", self.main_source) + self.assertIn("from services import (", self.main_source) + self.assertIn("UpdateChecker", self.main_source) + self.assertIn("detect_update_repository_url", self.main_source) self.assertIn('QAction("Проверить обновления", self)', self.main_source) self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source) self.assertIn("class UpdateChecker(QObject):", self.update_source) - self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.main_source) self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source) - self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.main_source) - self.assertIn("def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):", self.main_source) - self.assertIn("set TARGET_PID=", self.main_source) - self.assertIn("set BACKUP_DIR=", self.main_source) - self.assertIn(":rollback", self.main_source) + self.assertIn("AutoUpdateService.prepare_update", self.main_source) + self.assertIn("AutoUpdateService.build_update_script", self.main_source) if __name__ == "__main__": diff --git a/tests/test_auto_update_service.py b/tests/test_auto_update_service.py new file mode 100644 index 0000000..37fbbc9 --- /dev/null +++ b/tests/test_auto_update_service.py @@ -0,0 +1,50 @@ +import hashlib +import importlib.util +import tempfile +import unittest +from pathlib import Path + +_SPEC = importlib.util.spec_from_file_location( + "auto_update_service", + Path("services/auto_update_service.py"), +) +_MODULE = importlib.util.module_from_spec(_SPEC) +_SPEC.loader.exec_module(_MODULE) +AutoUpdateService = _MODULE.AutoUpdateService + + +class AutoUpdateServiceTests(unittest.TestCase): + def test_extract_sha256_from_text(self): + digest = "a" * 64 + text = f"{digest} AnabasisManager-1.0.0-win.zip\n" + extracted = AutoUpdateService.extract_sha256_from_text( + text, + "AnabasisManager-1.0.0-win.zip", + ) + self.assertEqual(extracted, digest) + + def test_sha256_file(self): + with tempfile.TemporaryDirectory() as td: + path = Path(td) / "payload.bin" + payload = b"anabasis" + path.write_bytes(payload) + expected = hashlib.sha256(payload).hexdigest() + self.assertEqual(AutoUpdateService.sha256_file(str(path)), expected) + + def test_build_update_script_contains_core_vars(self): + script = AutoUpdateService.build_update_script( + app_dir=r"C:\Apps\AnabasisManager", + source_dir=r"C:\Temp\Extracted", + exe_name="AnabasisManager.exe", + target_pid=1234, + ) + script_text = Path(script).read_text(encoding="utf-8") + self.assertIn("set APP_DIR=", script_text) + self.assertIn("set SRC_DIR=", script_text) + self.assertIn("set EXE_NAME=", script_text) + self.assertIn("set TARGET_PID=", script_text) + self.assertIn(":rollback", script_text) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_chat_actions.py b/tests/test_chat_actions.py new file mode 100644 index 0000000..faee0a8 --- /dev/null +++ b/tests/test_chat_actions.py @@ -0,0 +1,65 @@ +import unittest +import importlib.util +from types import SimpleNamespace +from pathlib import Path + +_SPEC = importlib.util.spec_from_file_location( + "chat_actions", + Path("services/chat_actions.py"), +) +_MODULE = importlib.util.module_from_spec(_SPEC) +_SPEC.loader.exec_module(_MODULE) +load_chat_conversations = _MODULE.load_chat_conversations +resolve_user_ids = _MODULE.resolve_user_ids + + +class ChatActionsTests(unittest.TestCase): + def test_resolve_user_ids_mixed_results(self): + mapping = { + "id1": {"type": "user", "object_id": 1}, + "id2": {"type": "group", "object_id": 2}, + } + + def call_with_retry(func, **kwargs): + return func(**kwargs) + + def resolve_screen_name(screen_name): + if screen_name == "boom": + raise RuntimeError("boom") + return mapping.get(screen_name) + + vk_api = SimpleNamespace(utils=SimpleNamespace(resolveScreenName=resolve_screen_name)) + links = [ + "https://vk.com/id1", + "https://vk.com/id2", + "https://vk.com/boom", + "https://vk.com/", + ] + resolved, failed = resolve_user_ids(call_with_retry, vk_api, links) + + self.assertEqual(resolved, [1]) + self.assertEqual(len(failed), 3) + self.assertEqual(failed[0][0], "https://vk.com/id2") + self.assertIsNone(failed[0][1]) + + def test_load_chat_conversations_paginated(self): + pages = [ + {"items": [{"id": 1}], "next_from": "page-2"}, + {"items": [{"id": 2}]}, + ] + + def get_conversations(**kwargs): + if kwargs.get("start_from") == "page-2": + return pages[1] + return pages[0] + + def call_with_retry(func, **kwargs): + return func(**kwargs) + + vk_api = SimpleNamespace(messages=SimpleNamespace(getConversations=get_conversations)) + items = load_chat_conversations(call_with_retry, vk_api) + self.assertEqual(items, [{"id": 1}, {"id": 2}]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_token_store.py b/tests/test_token_store.py new file mode 100644 index 0000000..c1ff986 --- /dev/null +++ b/tests/test_token_store.py @@ -0,0 +1,53 @@ +import tempfile +import unittest +import importlib.util +from pathlib import Path +from unittest.mock import patch + +_SPEC = importlib.util.spec_from_file_location( + "token_store", + Path("services/token_store.py"), +) +_MODULE = importlib.util.module_from_spec(_SPEC) +_SPEC.loader.exec_module(_MODULE) +load_token = _MODULE.load_token +save_token = _MODULE.save_token + + +class TokenStoreTests(unittest.TestCase): + def test_save_and_load_non_expiring_token(self): + with tempfile.TemporaryDirectory() as td: + token_file = Path(td) / "token.json" + with patch.object(_MODULE.os, "name", "posix"): + expiration = save_token( + token="abc123", + token_file=str(token_file), + app_data_dir=td, + expires_in=0, + ) + token, loaded_expiration = load_token(str(token_file)) + + self.assertEqual(expiration, 0) + self.assertEqual(token, "abc123") + self.assertEqual(loaded_expiration, 0) + + def test_expired_token_is_removed(self): + with tempfile.TemporaryDirectory() as td: + token_file = Path(td) / "token.json" + with patch.object(_MODULE.os, "name", "posix"): + with patch.object(_MODULE.time, "time", return_value=1000): + save_token( + token="abc123", + token_file=str(token_file), + app_data_dir=td, + expires_in=1, + ) + with patch.object(_MODULE.time, "time", return_value=2000): + token, expiration = load_token(str(token_file)) + + self.assertIsNone(token) + self.assertIsNone(expiration) + + +if __name__ == "__main__": + unittest.main() diff --git a/ui/dialogs.py b/ui/dialogs.py new file mode 100644 index 0000000..b2d43d9 --- /dev/null +++ b/ui/dialogs.py @@ -0,0 +1,25 @@ +from PySide6.QtWidgets import QDialog, QDialogButtonBox, QLabel, QTextEdit, QVBoxLayout + + +class MultiLinkDialog(QDialog): + def __init__(self, parent=None): + super().__init__(parent) + self.setWindowTitle("Ввод нескольких ссылок") + self.setMinimumSize(400, 300) + + layout = QVBoxLayout(self) + label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:") + layout.addWidget(label) + + self.links_text_edit = QTextEdit() + layout.addWidget(self.links_text_edit) + + button_box = QDialogButtonBox() + button_box.addButton("ОК", QDialogButtonBox.AcceptRole) + button_box.addButton("Отмена", QDialogButtonBox.RejectRole) + button_box.accepted.connect(self.accept) + button_box.rejected.connect(self.reject) + layout.addWidget(button_box) + + def get_links(self): + return [line.strip() for line in self.links_text_edit.toPlainText().strip().split("\n") if line.strip()] diff --git a/ui/main_window.py b/ui/main_window.py new file mode 100644 index 0000000..557a5b3 --- /dev/null +++ b/ui/main_window.py @@ -0,0 +1,9 @@ +def instructions_text(): + return ( + "Инструкция:\n" + "1. Авторизуйтесь через VK.\n" + "2. Выберите чаты.\n" + "3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n" + "4. Для массовых операций нажмите кнопку 'Список' и вставьте ссылки в окне.\n" + "5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'." + )