import sys import base64 import ctypes import shutil from vk_api import VkApi import json import time import auth_webview import os import re import hashlib import subprocess import threading import tempfile import urllib.error import urllib.request import zipfile from app_version import APP_VERSION from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox, QProgressBar) from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer, QObject from PySide6.QtGui import QIcon, QAction, QDesktopServices from urllib.parse import urlparse, parse_qs, unquote from vk_api.exceptions import VkApiError from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QProcess from ctypes import wintypes # --- Управление токенами и настройками --- APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager") TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup") LOG_FILE = os.path.join(APP_DATA_DIR, "app.log") LOG_MAX_BYTES = 1024 * 1024 # 1 MB LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1") AUTH_RELOGIN_BACKOFF_SECONDS = 5.0 # Legacy owner/repo format for GitHub-only fallback. UPDATE_REPOSITORY = "" # Full repository URL is preferred (supports GitHub/Gitea). UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_REQUEST_TIMEOUT = 8 class _DataBlob(ctypes.Structure): _fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))] def _version_key(version_text): parts = [int(x) for x in re.findall(r"\d+", str(version_text))] if not parts: return (0, 0, 0) while len(parts) < 3: parts.append(0) return tuple(parts[:3]) def _is_newer_version(latest_version, current_version): latest_key = _version_key(latest_version) current_key = _version_key(current_version) return latest_key > current_key def _sanitize_repo_url(value): value = (value or "").strip() if not value: return "" if "://" not in value and value.count("/") == 1: return f"https://github.com/{value}" parsed = urlparse(value) if not parsed.scheme or not parsed.netloc: return "" clean_path = parsed.path.rstrip("/") if clean_path.endswith(".git"): clean_path = clean_path[:-4] return f"{parsed.scheme}://{parsed.netloc}{clean_path}" def _detect_update_repository_url(): env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) if env_url: return env_url env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", "")) if env_repo: return env_repo configured_url = _sanitize_repo_url(UPDATE_REPOSITORY_URL) if configured_url: return configured_url configured_repo = _sanitize_repo_url(UPDATE_REPOSITORY) if configured_repo: return configured_repo git_config_path = os.path.join(os.path.abspath("."), ".git", "config") if not os.path.exists(git_config_path): return "" try: with open(git_config_path, "r", encoding="utf-8") as f: content = f.read() match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content) if match: remote = match.group(1).strip() if remote.startswith("git@"): # git@host:owner/repo(.git) ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote) if ssh_match: return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}") return _sanitize_repo_url(remote) except Exception: return "" return "" _crypt32 = None _kernel32 = None if os.name == "nt": _crypt32 = ctypes.WinDLL("crypt32", use_last_error=True) _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) _crypt32.CryptProtectData.argtypes = [ ctypes.POINTER(_DataBlob), wintypes.LPCWSTR, ctypes.POINTER(_DataBlob), ctypes.c_void_p, ctypes.c_void_p, wintypes.DWORD, ctypes.POINTER(_DataBlob), ] _crypt32.CryptProtectData.restype = wintypes.BOOL _crypt32.CryptUnprotectData.argtypes = [ ctypes.POINTER(_DataBlob), ctypes.POINTER(wintypes.LPWSTR), ctypes.POINTER(_DataBlob), ctypes.c_void_p, ctypes.c_void_p, wintypes.DWORD, ctypes.POINTER(_DataBlob), ] _crypt32.CryptUnprotectData.restype = wintypes.BOOL def _crypt_protect_data(data, description=""): buffer = ctypes.create_string_buffer(data) data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) data_out = _DataBlob() if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)): raise ctypes.WinError(ctypes.get_last_error()) try: return ctypes.string_at(data_out.pbData, data_out.cbData) finally: _kernel32.LocalFree(data_out.pbData) def _crypt_unprotect_data(data): buffer = ctypes.create_string_buffer(data) data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) data_out = _DataBlob() if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)): raise ctypes.WinError(ctypes.get_last_error()) try: return ctypes.string_at(data_out.pbData, data_out.cbData) finally: _kernel32.LocalFree(data_out.pbData) def _encrypt_token(token): if os.name != "nt": raise RuntimeError("DPAPI is available only on Windows.") encrypted_bytes = _crypt_protect_data(token.encode("utf-8")) return base64.b64encode(encrypted_bytes).decode("ascii") def _decrypt_token(token_data): if os.name != "nt": raise RuntimeError("DPAPI is available only on Windows.") encrypted_bytes = base64.b64decode(token_data.encode("ascii")) decrypted_bytes = _crypt_unprotect_data(encrypted_bytes) return decrypted_bytes.decode("utf-8") def get_resource_path(relative_path): """ Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """ if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай) return os.path.join(sys._MEIPASS, relative_path) # Для cx_Freeze и обычного запуска return os.path.join(os.path.abspath("."), relative_path) def save_token(token, expires_in=0): """Сохраняет токен. Если expires_in=0, токен считается бессрочным.""" try: expires_in = int(expires_in) except (ValueError, TypeError): expires_in = 0 os.makedirs(APP_DATA_DIR, exist_ok=True) # ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0 expiration_time = (time.time() + expires_in) if expires_in > 0 else 0 stored_token = token encrypted = False if os.name == "nt": try: stored_token = _encrypt_token(token) encrypted = True except Exception as e: print(f"Ошибка шифрования токена: {e}") data = { "token": stored_token, "expiration_time": expiration_time, "encrypted": encrypted } try: with open(TOKEN_FILE, "w") as f: json.dump(data, f) status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString() print(f"Токен сохранен. Срок действия: {status}") return expiration_time except IOError as e: print(f"Ошибка сохранения токена: {e}") return None def load_token(): """Загружает токен и проверяет его валидность.""" try: if not os.path.exists(TOKEN_FILE): return None, None with open(TOKEN_FILE, "r") as f: data = json.load(f) token = data.get("token") encrypted = data.get("encrypted", False) if token and encrypted: try: token = _decrypt_token(token) except Exception as e: print(f"Ошибка расшифровки токена: {e}") if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) return None, None expiration_time = data.get("expiration_time") # ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего if token and (expiration_time == 0 or expiration_time > time.time()): return token, expiration_time else: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) return None, None except Exception as e: print(f"Ошибка загрузки: {e}") return None, None class VkService: def __init__(self): self.session = None self.api = None def set_token(self, token): self.session = VkApi(token=token) self.api = self.session.get_api() def clear(self): self.session = None self.api = None @staticmethod def build_auth_command(auth_url, output_path): if getattr(sys, "frozen", False): return sys.executable, ["--auth", auth_url, output_path] return sys.executable, [os.path.abspath(__file__), "--auth", auth_url, output_path] @staticmethod def vk_error_code(exc): error = getattr(exc, "error", None) if isinstance(error, dict): return error.get("error_code") return getattr(exc, "code", None) @classmethod def is_auth_error(cls, exc, formatted_message=None): code = cls.vk_error_code(exc) if code == 5: return True message = (formatted_message or str(exc)).lower() return "invalid_access_token" in message or "user authorization failed" in message @classmethod def is_retryable_error(cls, exc): return cls.vk_error_code(exc) in (6, 9, 10) def call_with_retry(self, func, *args, **kwargs): max_attempts = 5 for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except VkApiError as e: if not self.is_retryable_error(e) or attempt == max_attempts: raise delay = min(2.0, 0.35 * (2 ** (attempt - 1))) if self.vk_error_code(e) == 9: delay = max(delay, 1.0) time.sleep(delay) class UpdateChecker(QObject): check_finished = Signal(dict) check_failed = Signal(str) def __init__(self, repository_url, current_version): super().__init__() self.repository_url = repository_url self.current_version = current_version def run(self): if not self.repository_url: self.check_failed.emit("Не задан URL репозитория обновлений.") return parsed = urlparse(self.repository_url) base_url = f"{parsed.scheme}://{parsed.netloc}" repo_path = parsed.path.strip("/") if not repo_path or repo_path.count("/") < 1: self.check_failed.emit("Некорректный URL репозитория обновлений.") return if parsed.netloc.lower().endswith("github.com"): api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" else: api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" releases_url = f"{base_url}/{repo_path}/releases" request = urllib.request.Request( api_url, headers={ "Accept": "application/vnd.github+json", "User-Agent": "AnabasisManager-Updater", }, ) try: with urllib.request.urlopen(request, timeout=UPDATE_REQUEST_TIMEOUT) as response: release_data = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") return except urllib.error.URLError as e: self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}") return except Exception as e: self.check_failed.emit(f"Не удалось проверить обновления: {e}") return latest_tag = release_data.get("tag_name") or release_data.get("name") or "" latest_version = latest_tag.lstrip("vV").strip() html_url = release_data.get("html_url") or releases_url assets = release_data.get("assets") or [] download_url = "" download_name = "" checksum_url = "" for asset in assets: url = asset.get("browser_download_url", "") if url.lower().endswith(".zip"): download_url = url download_name = asset.get("name", "") break if not download_url and assets: download_url = assets[0].get("browser_download_url", "") download_name = assets[0].get("name", "") for asset in assets: name = asset.get("name", "").lower() if not name: continue is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt") if not is_checksum_asset: continue if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")): checksum_url = asset.get("browser_download_url", "") break if not checksum_url: checksum_url = asset.get("browser_download_url", "") self.check_finished.emit( { "repository_url": self.repository_url, "latest_version": latest_version, "current_version": self.current_version, "latest_tag": latest_tag, "release_url": html_url, "download_url": download_url, "download_name": download_name, "checksum_url": checksum_url, "has_update": _is_newer_version(latest_version, self.current_version), } ) class MultiLinkDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Ввод нескольких ссылок") self.setMinimumSize(400, 300) layout = QVBoxLayout(self) label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:") layout.addWidget(label) self.links_text_edit = QTextEdit() layout.addWidget(self.links_text_edit) button_box = QDialogButtonBox() button_box.addButton("ОК", QDialogButtonBox.AcceptRole) button_box.addButton("Отмена", QDialogButtonBox.RejectRole) button_box.accepted.connect(self.accept) button_box.rejected.connect(self.reject) layout.addWidget(button_box) def get_links(self): return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()] class VkChatManager(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Anabasis Chat Manager") self.setGeometry(300, 300, 600, 800) self.token = None self.token_expiration_time = None self.chats = [] self.office_chat_checkboxes = [] self.retail_chat_checkboxes = [] self.warehouse_chat_checkboxes = [] self.coffee_chat_checkboxes = [] self.other_chat_checkboxes = [] self.vk_service = VkService() self.vk_session = None self.vk = None self.user_ids_to_process = [] self._busy = False self.suppress_resolve = False self.auth_process = None self.auth_output_path = None self._auth_process_error_text = None self._auth_ui_busy = False self._auth_relogin_in_progress = False self._last_auth_relogin_ts = 0.0 self.update_repository_url = _detect_update_repository_url() self.update_checker = None self.update_thread = None self._update_check_silent = False self.resolve_timer = QTimer(self) self.resolve_timer.setSingleShot(True) self.resolve_timer.setInterval(750) self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input) self._cleanup_cache_if_needed() self._ensure_log_dir() self.init_ui() self.load_saved_token_on_startup() self.setup_token_timer() QTimer.singleShot(1800, lambda: self.check_for_updates(silent_no_updates=True)) def init_ui(self): central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(5) self.instructions = QTextBrowser() self.instructions.setPlainText( "Инструкция:\n" "1. Авторизуйтесь через VK.\n" "2. Выберите чаты.\n" "3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n" "4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n" "5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'." ) self.instructions.setFixedHeight(120) layout.addWidget(self.instructions) layout.addWidget(QLabel("Access Token VK:")) self.token_input = QLineEdit() self.token_input.setPlaceholderText("Токен появится здесь после авторизации...") self.token_input.setReadOnly(True) layout.addWidget(self.token_input) self.token_timer_label = QLabel("Срок действия токена: Н/Д") self.token_timer_label.setAlignment(Qt.AlignRight) layout.addWidget(self.token_timer_label) self.auth_btn = QPushButton("Авторизоваться через VK") self.auth_btn.clicked.connect(self.start_auth) layout.addWidget(self.auth_btn) self.auth_progress = QProgressBar() self.auth_progress.setRange(0, 0) self.auth_progress.setTextVisible(False) self.auth_progress.hide() layout.addWidget(self.auth_progress) self.chat_tabs = QTabWidget() self.chat_tabs.hide() self.office_tab = self.create_chat_tab() self.chat_tabs.addTab(self.office_tab, "AG Офис") self.retail_tab = self.create_chat_tab() self.chat_tabs.addTab(self.retail_tab, "AG Розница") self.warehouse_tab = self.create_chat_tab() self.chat_tabs.addTab(self.warehouse_tab, "AG Склад") self.coffee_tab = self.create_chat_tab() self.chat_tabs.addTab(self.coffee_tab, "AG Кофейни") self.other_tab = self.create_chat_tab() self.chat_tabs.addTab(self.other_tab, "Прочие") layout.addWidget(QLabel("Выберите чаты:")) select_buttons_layout = QHBoxLayout() self.select_all_btn = QPushButton("Выбрать все на вкладке") self.select_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(True)) self.deselect_all_btn = QPushButton("Снять выбор на вкладке") self.deselect_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(False)) self.refresh_chats_btn = QPushButton("Обновить чаты") self.refresh_chats_btn.clicked.connect(self.load_chats) select_buttons_layout.addWidget(self.select_all_btn) select_buttons_layout.addWidget(self.deselect_all_btn) select_buttons_layout.addWidget(self.refresh_chats_btn) layout.addLayout(select_buttons_layout) layout.addWidget(self.chat_tabs) layout.addWidget(QLabel("Ссылка на страницу VK (ID определится автоматически):")) link_input_layout = QHBoxLayout() self.vk_url_input = QLineEdit() self.vk_url_input.setPlaceholderText("https://vk.com/id1") self.vk_url_input.textChanged.connect(self.on_vk_url_input_changed) link_input_layout.addWidget(self.vk_url_input) self.multi_link_btn = QPushButton("Список") self.multi_link_btn.setToolTip("Ввести несколько ссылок списком") self.multi_link_btn.clicked.connect(self.open_multi_link_dialog) link_input_layout.addWidget(self.multi_link_btn) layout.addLayout(link_input_layout) self.remove_user_btn = QPushButton("ИСКЛЮЧИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.remove_user_btn.setMinimumHeight(50) self.remove_user_btn.clicked.connect(self.remove_user) layout.addWidget(self.remove_user_btn) self.visible_messages_checkbox = QCheckBox("Показать 250 последних сообщений при добавлении") layout.addWidget(self.visible_messages_checkbox) self.add_user_btn = QPushButton("ПРИГЛАСИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.add_user_btn.setMinimumHeight(50) self.add_user_btn.clicked.connect(self.add_user_to_chat) layout.addWidget(self.add_user_btn) self.status_label = QLabel("Статус: не авторизован") self.status_label.setAlignment(Qt.AlignCenter) layout.addWidget(self.status_label) layout.addStretch(1) self.create_menu() self.set_ui_state(False) def on_vk_url_input_changed(self, text): if self.suppress_resolve: return if self.vk_url_input.hasFocus(): self.resolve_timer.start() def open_multi_link_dialog(self): dialog = MultiLinkDialog(self) if dialog.exec(): links = dialog.get_links() if links: self._set_vk_url_input_text("") self._process_links_list(links) else: QMessageBox.information(self, "Информация", "Список ссылок пуст.") def resolve_single_user_id_from_input(self): url = self.vk_url_input.text().strip() if not url: self.user_ids_to_process.clear() self.status_label.setText("Статус: Введите ссылку или откройте список.") self.set_ui_state(self.token is not None) return self._process_links_list([url]) def _process_links_list(self, links_list): if not self.vk: QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") return self.user_ids_to_process.clear() resolved_ids = [] failed_links = [] self._set_busy(True, "Статус: Определяю ID...") try: for link in links_list: try: path = urlparse(link).path screen_name = path.split('/')[-1] if path else '' if not screen_name and len(path.split('/')) > 1: screen_name = path.split('/')[-2] if not screen_name: failed_links.append(link) continue resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name) if resolved_object and resolved_object.get('type') == 'user': resolved_ids.append(resolved_object['object_id']) else: failed_links.append(link) except VkApiError as e: if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"): return failed_links.append(f"{link} ({self._format_vk_error(e)})") except Exception: failed_links.append(link) finally: self._set_busy(False) self.user_ids_to_process = resolved_ids status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)." if len(links_list) > 1: self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") if failed_links: QMessageBox.warning(self, "Ошибка получения ID", f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links)) self.status_label.setText(status_message) self.set_ui_state(self.token is not None) def create_menu(self): """Создает верхнее меню.""" menu_bar = self.menuBar() # Меню "Инструменты" tools_menu = menu_bar.addMenu("Инструменты") # Действие "Назначить администратором" make_admin_action = QAction("Назначить администратором", self) make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах") make_admin_action.triggered.connect(self.set_user_admin) tools_menu.addAction(make_admin_action) self.make_admin_action = make_admin_action check_updates_action = QAction("Проверить обновления", self) check_updates_action.setStatusTip("Проверить наличие новой версии приложения") check_updates_action.triggered.connect(self.check_for_updates) tools_menu.addAction(check_updates_action) self.check_updates_action = check_updates_action logout_action = QAction("Выйти и очистить", self) logout_action.setStatusTip("Выйти, удалить токен и кэш") logout_action.triggered.connect(self.logout_and_clear) tools_menu.addAction(logout_action) self.logout_action = logout_action def create_chat_tab(self): # This implementation correctly creates a scrollable area for chat lists. tab_content_widget = QWidget() tab_layout = QVBoxLayout(tab_content_widget) tab_layout.setContentsMargins(0, 0, 0, 0) tab_layout.setSpacing(0) scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) tab_layout.addWidget(scroll_area) checkbox_container_widget = QWidget() checkbox_layout = QVBoxLayout(checkbox_container_widget) checkbox_layout.setContentsMargins(5, 5, 5, 5) checkbox_layout.setSpacing(2) checkbox_layout.addStretch() scroll_area.setWidget(checkbox_container_widget) return tab_content_widget def _set_update_action_state(self, in_progress): if hasattr(self, "check_updates_action"): self.check_updates_action.setEnabled(not in_progress) def check_for_updates(self, silent_no_updates=False): if self.update_thread and self.update_thread.is_alive(): return self._update_check_silent = silent_no_updates self._set_update_action_state(True) self.status_label.setText("Статус: проверка обновлений...") self.update_checker = UpdateChecker(self.update_repository_url, APP_VERSION) self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) self.update_thread.start() def _on_update_check_finished(self, result): self._set_update_action_state(False) self.update_checker = None self.update_thread = None if result.get("has_update"): latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown" self.status_label.setText(f"Статус: доступно обновление {latest_version}") message_box = QMessageBox(self) message_box.setIcon(QMessageBox.Information) message_box.setWindowTitle("Доступно обновление") message_box.setText( f"Текущая версия: {result.get('current_version')}\n" f"Доступная версия: {latest_version}\n\n" "Открыть страницу загрузки?" ) update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.AcceptRole) releases_button = message_box.addButton("Релизы", QMessageBox.ActionRole) cancel_button = message_box.addButton("Позже", QMessageBox.RejectRole) message_box.setDefaultButton(update_now_button) message_box.exec() clicked = message_box.clickedButton() download_url = result.get("download_url") checksum_url = result.get("checksum_url") download_name = result.get("download_name") release_url = result.get("release_url") if clicked is update_now_button and download_url: if not self._start_auto_update(download_url, latest_version, checksum_url, download_name): if release_url: QDesktopServices.openUrl(QUrl(release_url)) return if clicked is download_button and download_url: QDesktopServices.openUrl(QUrl(download_url)) elif clicked in (download_button, releases_button) and release_url: QDesktopServices.openUrl(QUrl(release_url)) elif clicked not in (cancel_button,): self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN") return self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}).") if not self._update_check_silent: QMessageBox.information(self, "Обновления", "Установлена актуальная версия.") def _on_update_check_failed(self, error_text): self._set_update_action_state(False) self.update_checker = None self.update_thread = None self._log_event("update_check_failed", error_text, level="WARN") if not self.update_repository_url: self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).") if not self._update_check_silent: QMessageBox.warning( self, "Обновления не настроены", "Не задан URL репозитория для обновлений.\n" "Укажите UPDATE_REPOSITORY_URL в main.py или переменную окружения " "ANABASIS_UPDATE_URL (например: https://git.daemonlord.ru/owner/repo).", ) return self.status_label.setText("Статус: не удалось проверить обновления.") if not self._update_check_silent: QMessageBox.warning(self, "Проверка обновлений", error_text) def _download_update_archive(self, download_url, destination_path): request = urllib.request.Request( download_url, headers={"User-Agent": "AnabasisManager-Updater"}, ) with urllib.request.urlopen(request, timeout=60) as response: with open(destination_path, "wb") as f: shutil.copyfileobj(response, f) def _download_update_text(self, url): request = urllib.request.Request( url, headers={"User-Agent": "AnabasisManager-Updater"}, ) with urllib.request.urlopen(request, timeout=30) as response: return response.read().decode("utf-8", errors="replace") @staticmethod def _sha256_file(path): digest = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest().lower() @staticmethod def _extract_sha256_from_text(checksum_text, target_file_name): target = (target_file_name or "").strip().lower() for raw_line in checksum_text.splitlines(): line = raw_line.strip() if not line: continue match = re.search(r"\b([A-Fa-f0-9]{64})\b", line) if not match: continue checksum = match.group(1).lower() if not target: return checksum line_lower = line.lower() if target in line_lower: return checksum if os.path.basename(target) in line_lower: return checksum return "" def _verify_update_checksum(self, zip_path, checksum_url, download_name): if not checksum_url: raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.") checksum_text = self._download_update_text(checksum_url) expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path)) if not expected_hash: raise RuntimeError("Не удалось найти SHA256 для архива обновления.") actual_hash = self._sha256_file(zip_path) if actual_hash != expected_hash: raise RuntimeError("SHA256 не совпадает, обновление отменено.") def _locate_extracted_root(self, extracted_dir): entries = [] for name in os.listdir(extracted_dir): full_path = os.path.join(extracted_dir, name) if os.path.isdir(full_path): entries.append(full_path) if len(entries) == 1: candidate = entries[0] if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")): return candidate return extracted_dir def _build_update_script(self, app_dir, source_dir, exe_name): script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd") script_lines = [ "@echo off", "setlocal", f"set APP_DIR={app_dir}", f"set SRC_DIR={source_dir}", f"set EXE_NAME={exe_name}", "timeout /t 2 /nobreak >nul", "robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:3 /W:1 >nul", "set RC=%ERRORLEVEL%", "if %RC% GEQ 8 goto :copy_error", "start \"\" \"%APP_DIR%\\%EXE_NAME%\"", "exit /b 0", ":copy_error", "echo Auto-update failed with code %RC% > \"%APP_DIR%\\update_error.log\"", "exit /b %RC%", ] with open(script_path, "w", encoding="utf-8", newline="\r\n") as f: f.write("\r\n".join(script_lines) + "\r\n") return script_path def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""): if os.name != "nt": QMessageBox.information( self, "Автообновление", "Автообновление пока поддерживается только в Windows-сборке.", ) return False if not getattr(sys, "frozen", False): QMessageBox.information( self, "Автообновление", "Автообновление доступно в собранной версии приложения (.exe).", ) return False if not download_url: QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.") return False self.status_label.setText(f"Статус: загрузка обновления {latest_version}...") self._set_busy(True) work_dir = tempfile.mkdtemp(prefix="anabasis_update_") zip_path = os.path.join(work_dir, "update.zip") unpack_dir = os.path.join(work_dir, "extracted") try: self._download_update_archive(download_url, zip_path) self._verify_update_checksum(zip_path, checksum_url, download_name) os.makedirs(unpack_dir, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as archive: archive.extractall(unpack_dir) source_dir = self._locate_extracted_root(unpack_dir) app_exe = sys.executable app_dir = os.path.dirname(app_exe) exe_name = os.path.basename(app_exe) script_path = self._build_update_script(app_dir, source_dir, exe_name) creation_flags = 0 if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"): creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP if hasattr(subprocess, "DETACHED_PROCESS"): creation_flags |= subprocess.DETACHED_PROCESS subprocess.Popen( ["cmd.exe", "/c", script_path], cwd=work_dir, creationflags=creation_flags, ) self._log_event("auto_update", f"Update {latest_version} started from {download_url}") QMessageBox.information( self, "Обновление запущено", "Обновление скачано. Приложение будет перезапущено.", ) QTimer.singleShot(150, QApplication.instance().quit) return True except Exception as e: self._log_event("auto_update_failed", str(e), level="ERROR") QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}") return False finally: self._set_busy(False) def setup_token_timer(self): self.token_countdown_timer = QTimer(self) self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.start(1000) def update_token_timer_display(self): if self.token_expiration_time is None: self.token_timer_label.setText("Срок действия токена: Н/Д") return # ИСПРАВЛЕНИЕ: обрабатываем бессрочный токен if self.token_expiration_time == 0: self.token_timer_label.setText("Срок действия: Бессрочно") return remaining_seconds = int(self.token_expiration_time - time.time()) if remaining_seconds <= 0: self.token_timer_label.setText("Срок действия истек!") if self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self.set_ui_state(False) self.status_label.setText("Статус: Срок действия истек, авторизуйтесь заново.") self.token, self.token_expiration_time = None, None self.token_input.clear() return minutes, seconds = divmod(remaining_seconds, 60) hours, minutes = divmod(minutes, 60) self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с") def set_ui_state(self, authorized): self.auth_btn.setEnabled((not authorized) and (not self._auth_ui_busy)) for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox]: btn.setEnabled(authorized) has_ids = authorized and bool(self.user_ids_to_process) self.remove_user_btn.setEnabled(has_ids) self.add_user_btn.setEnabled(has_ids) self.chat_tabs.setVisible(authorized) if authorized: # Когда авторизованы, задаем минимальную высоту, достаточную для ~10-12 чатов self.chat_tabs.setMinimumHeight(300) else: # Когда не авторизованы, сбрасываем минимальную высоту self.chat_tabs.setMinimumHeight(0) if not authorized: self.user_ids_to_process.clear() self._set_vk_url_input_text("") self._clear_chat_tabs() if hasattr(self, "make_admin_action"): self.make_admin_action.setEnabled(authorized and (not self._auth_ui_busy)) if hasattr(self, "logout_action"): self.logout_action.setEnabled(not self._auth_ui_busy) def _set_auth_ui_state(self, in_progress): self._auth_ui_busy = in_progress self.auth_progress.setVisible(in_progress) if hasattr(self, "logout_action"): self.logout_action.setEnabled(not in_progress) if hasattr(self, "make_admin_action"): self.make_admin_action.setEnabled(not in_progress and self.token is not None) self.auth_btn.setEnabled((self.token is None) and (not in_progress)) def _set_busy(self, busy, status_text=None): if status_text: self.status_label.setText(status_text) if busy: self._busy = True QApplication.setOverrideCursor(Qt.WaitCursor) for widget in [ self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox, self.remove_user_btn, self.add_user_btn ]: widget.setEnabled(False) else: self._busy = False QApplication.restoreOverrideCursor() if self.token is None: self.set_ui_state(False) else: self.set_ui_state(True) def _ensure_log_dir(self): os.makedirs(APP_DATA_DIR, exist_ok=True) def _log(self, level, context, message): try: os.makedirs(APP_DATA_DIR, exist_ok=True) self._rotate_log_if_needed() timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss") with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] [{level}] {context}: {message}\n") except Exception: pass def _log_error(self, context, exc): self._log("ERROR", context, self._format_vk_error(exc)) def _log_event(self, context, message, level="INFO"): self._log(level, context, message) def _rotate_log_if_needed(self): try: if not os.path.exists(LOG_FILE): return if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES: return if os.path.exists(LOG_BACKUP_FILE): os.remove(LOG_BACKUP_FILE) os.replace(LOG_FILE, LOG_BACKUP_FILE) except Exception: pass def _format_vk_error(self, exc): error = getattr(exc, "error", None) code = None message = str(exc) if isinstance(error, dict): code = error.get("error_code") message = error.get("error_msg") or message hints = { 5: "Ошибка авторизации. Проверьте токен.", 6: "Слишком много запросов. Подождите и повторите.", 7: "Недостаточно прав.", 9: "Слишком много однотипных действий.", 10: "Внутренняя ошибка VK. Повторите позже.", 15: "Доступ запрещен.", 100: "Некорректный параметр запроса.", 113: "Неверный идентификатор пользователя.", 200: "Доступ к чату запрещен.", } if code in hints: message = f"{message} ({hints[code]})" if code is not None: return f"[{code}] {message}" return message def _set_vk_url_input_text(self, text): self.suppress_resolve = True try: self.vk_url_input.blockSignals(True) self.vk_url_input.setText(text) finally: self.vk_url_input.blockSignals(False) self.suppress_resolve = False def logout_and_clear(self): confirm = QMessageBox.question( self, "Подтверждение выхода", "Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?", QMessageBox.Yes | QMessageBox.No ) if confirm != QMessageBox.Yes: return self._clear_auth_state(stop_timer=True, remove_token_file=True) try: self._try_remove_web_cache() except Exception as e: self._log_event("logout_and_clear", f"Cache cleanup failed: {e}", level="WARN") def _cleanup_cache_if_needed(self): if os.path.exists(CACHE_CLEANUP_MARKER): try: self._try_remove_web_cache() if os.path.exists(CACHE_CLEANUP_MARKER): os.remove(CACHE_CLEANUP_MARKER) except Exception as e: print(f"Ошибка отложенной очистки кэша: {e}") def _try_remove_web_cache(self): if not os.path.exists(WEB_ENGINE_CACHE_DIR): return attempts = 5 last_error = None for _ in range(attempts): try: shutil.rmtree(WEB_ENGINE_CACHE_DIR) last_error = None break except Exception as e: last_error = e time.sleep(0.2) if last_error: os.makedirs(APP_DATA_DIR, exist_ok=True) with open(CACHE_CLEANUP_MARKER, "w") as f: f.write("pending") raise last_error def load_saved_token_on_startup(self): loaded_token, expiration_time = load_token() if loaded_token: self.handle_auth_token_on_load(loaded_token, expiration_time) else: self.set_ui_state(False) def set_all_checkboxes_on_current_tab(self, checked): current_index = self.chat_tabs.currentIndex() checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes] if 0 <= current_index < len(checkbox_lists): for checkbox in checkbox_lists[current_index]: checkbox.setChecked(checked) def _build_auth_command(self, auth_url, output_path): return self.vk_service.build_auth_command(auth_url, output_path) def _on_auth_process_error(self, process_error): self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}" # For failed starts Qt may not emit finished(), so release UI here. if self.auth_process and self.auth_process.state() == QProcess.NotRunning: output_path = self.auth_output_path self.auth_output_path = None self.auth_process = None self._set_auth_ui_state(False) self.status_label.setText(f"Статус: {self._auth_process_error_text}") self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR") self._auth_process_error_text = None self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) try: if output_path and os.path.exists(output_path): os.remove(output_path) except Exception: pass def _on_auth_process_finished(self, exit_code, _exit_status): output_path = self.auth_output_path self.auth_output_path = None self.auth_process = None self._set_auth_ui_state(False) if self._auth_process_error_text: self.status_label.setText(f"Статус: {self._auth_process_error_text}") self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR") self._auth_process_error_text = None self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) return if exit_code != 0: self.status_label.setText(f"Статус: авторизация не удалась (код {exit_code}).") self._log_event("auth_process_finished", f"exit_code={exit_code}", level="WARN") self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) return token = None expires_in = 0 if output_path and os.path.exists(output_path): try: with open(output_path, "r", encoding="utf-8") as f: data = json.load(f) token = data.get("token") expires_in = data.get("expires_in", 0) except Exception as e: self._log_event("auth_result_parse", f"Ошибка чтения результата авторизации: {e}", level="ERROR") finally: try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass else: self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN") self._auth_relogin_in_progress = False self.handle_new_auth_token(token, expires_in) def start_auth(self, keep_status_text=False): if self.auth_process and self.auth_process.state() != QProcess.NotRunning: self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.") return if keep_status_text and hasattr(self, "_relogin_status_text"): status_text = self._relogin_status_text self._relogin_status_text = None else: status_text = "Статус: ожидание авторизации..." self.status_label.setText(status_text) auth_url = ( "https://oauth.vk.com/authorize?" "client_id=2685278&" "display=page&" "redirect_uri=https://oauth.vk.com/blank.html&" "scope=1073737727&" "response_type=token&" "v=5.131" ) output_path = os.path.join(APP_DATA_DIR, "auth_result.json") try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass program, args = self._build_auth_command(auth_url, output_path) self.auth_output_path = output_path self._auth_process_error_text = None process = QProcess(self) process.finished.connect(self._on_auth_process_finished) process.errorOccurred.connect(self._on_auth_process_error) self.auth_process = process self._set_auth_ui_state(True) process.start(program, args) def handle_new_auth_token(self, token, expires_in): if not token: self.status_label.setText("Статус: Авторизация не удалась") self.set_ui_state(False) self._auth_relogin_in_progress = False return self.token = token # Сохраняем и получаем корректный expiration_time (0 или будущее время) self.token_expiration_time = save_token(self.token, expires_in) self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован") self.vk_service.set_token(self.token) self.vk_session = self.vk_service.session self.vk = self.vk_service.api self.set_ui_state(True) self._auth_relogin_in_progress = False self.load_chats() def handle_auth_token_on_load(self, token, expiration_time): self.token = token self.token_expiration_time = expiration_time self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован (токен загружен)") self.vk_service.set_token(self.token) self.vk_session = self.vk_service.session self.vk = self.vk_service.api self.set_ui_state(True) self._auth_relogin_in_progress = False self.load_chats() def _clear_chat_tabs(self): self.chats.clear() for chk_list in [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]: for checkbox in chk_list: checkbox.setParent(None) checkbox.deleteLater() chk_list.clear() def _vk_error_code(self, exc): return self.vk_service.vk_error_code(exc) def _is_auth_token_error(self, exc): message = self._format_vk_error(exc).lower() return self.vk_service.is_auth_error(exc, message) def _clear_auth_state(self, stop_timer=False, remove_token_file=True): self.token = None self.token_expiration_time = None self.vk_service.clear() self.vk_session = None self.vk = None self.user_ids_to_process.clear() self._set_vk_url_input_text("") self.token_input.clear() if stop_timer and self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self.token_timer_label.setText("Срок действия токена: Н/Д") self.status_label.setText("Статус: не авторизован") self._clear_chat_tabs() self.set_ui_state(False) if remove_token_file: try: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) except Exception: pass def _force_relogin(self, exc, action_name): now = time.monotonic() if self._auth_relogin_in_progress: self._log_event("force_relogin_skip", f"already_in_progress action={action_name}", level="WARN") return elapsed = now - self._last_auth_relogin_ts if elapsed < AUTH_RELOGIN_BACKOFF_SECONDS: wait_seconds = int(AUTH_RELOGIN_BACKOFF_SECONDS - elapsed) + 1 self.status_label.setText(f"Статус: повторная авторизация через {wait_seconds} сек.") self._log_event("force_relogin_backoff", f"action={action_name}; wait={wait_seconds}s", level="WARN") return self._auth_relogin_in_progress = True self._last_auth_relogin_ts = now error_code = self._vk_error_code(exc) self._log_event( "force_relogin", f"action={action_name}; code={error_code}; message={self._format_vk_error(exc)}", level="WARN", ) self._clear_auth_state() self._relogin_status_text = "Статус: Токен отозван VK, выполните повторный вход." QMessageBox.warning( self, "Требуется повторная авторизация", f"Во время {action_name} получена ошибка авторизации:\n" f"{self._format_vk_error(exc)}\n\n" "Сейчас откроется окно авторизации VK." ) self.start_auth(keep_status_text=True) def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False): self._log_error(context, exc) if self._is_auth_token_error(exc): self._force_relogin(exc, action_name or context) return True if ui_message_prefix: QMessageBox.critical(self, "Ошибка", f"{ui_message_prefix}: {self._format_vk_error(exc)}") if disable_ui: self.set_ui_state(False) return False def _vk_call_with_retry(self, func, *args, **kwargs): return self.vk_service.call_with_retry(func, *args, **kwargs) def load_chats(self): self._clear_chat_tabs() # Get the checkbox layouts from each tab layouts = [ self.office_tab.findChild(QWidget).findChild(QVBoxLayout), self.retail_tab.findChild(QWidget).findChild(QVBoxLayout), self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout), self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout), self.other_tab.findChild(QWidget).findChild(QVBoxLayout) ] try: self._set_busy(True, "Статус: загрузка чатов...") conversations = [] start_from = None seen_start_tokens = set() while True: params = {"count": 200, "filter": "all"} if start_from: if start_from in seen_start_tokens: break params["start_from"] = start_from seen_start_tokens.add(start_from) response = self._vk_call_with_retry(self.vk.messages.getConversations, **params) page_items = response.get("items", []) if not page_items: break conversations.extend(page_items) start_from = response.get("next_from") if not start_from: break for conv in conversations: if conv['conversation']['peer']['type'] == 'chat': chat_id = conv['conversation']['peer']['local_id'] title = conv['conversation']['chat_settings']['title'] self.chats.append({'id': chat_id, 'title': title}) checkbox = QCheckBox(f"{title} (id: {chat_id})") checkbox.setProperty("chat_id", chat_id) # Insert checkbox at the top of the layout (before the stretch) if "AG офис" in title: layouts[0].insertWidget(layouts[0].count() - 1, checkbox) self.office_chat_checkboxes.append(checkbox) elif "AG розница" in title: layouts[1].insertWidget(layouts[1].count() - 1, checkbox) self.retail_chat_checkboxes.append(checkbox) elif "AG склад" in title: layouts[2].insertWidget(layouts[2].count() - 1, checkbox) self.warehouse_chat_checkboxes.append(checkbox) elif "AG кофейни" in title: layouts[3].insertWidget(layouts[3].count() - 1, checkbox) self.coffee_chat_checkboxes.append(checkbox) else: layouts[4].insertWidget(layouts[4].count() - 1, checkbox) self.other_chat_checkboxes.append(checkbox) self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})") self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})") self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") except VkApiError as e: if self._handle_vk_api_error( "load_chats", e, action_name="загрузки чатов", ): return QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}") self.set_ui_state(False) finally: self._set_busy(False) def get_user_info_by_id(self, user_id): try: user = self.vk.users.get(user_ids=user_id)[0] return f"{user.get('first_name', '')} {user.get('last_name', '')}" except Exception: return f"Пользователь {user_id}" def _get_selected_chats(self): selected = [] for chk in self.office_chat_checkboxes + self.retail_chat_checkboxes + self.warehouse_chat_checkboxes + self.coffee_chat_checkboxes + self.other_chat_checkboxes: if chk.isChecked(): chat_id = chk.property("chat_id") title = next((c['title'] for c in self.chats if c['id'] == chat_id), "") selected.append({'id': chat_id, 'title': title}) return selected def _execute_user_action(self, action_type): if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} action_verb = "исключить" if action_type == "remove" else "пригласить" preposition = "из" if action_type == "remove" else "в" user_names_list = list(user_infos.values()) user_names_str = "\n".join([f"• {name}" for name in user_names_list]) chat_count = len(selected_chats) chat_str = "" # Финальная логика склонения с учетом падежа (для "из" и "в") if chat_count % 10 == 1 and chat_count % 100 != 11: if action_type == 'remove': # из 1 выбранного чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранного чата" else: # в 1 выбранный чат (Винительный падеж, ед.ч.) chat_str = f"{chat_count} выбранный чат" elif 2 <= chat_count % 10 <= 4 and (chat_count % 100 < 10 or chat_count % 100 >= 20): if action_type == 'remove': # из 3 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" else: # в 3 выбранных чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранных чата" else: # из 5 выбранных чатов / в 5 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" msg = ( f"Вы уверены, что хотите {action_verb} следующих пользователей:\n\n" f"{user_names_str}\n\n" f"{preposition} {chat_str}?" ) confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение действия") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return results = [] total = len(selected_chats) * len(user_infos) processed = 0 try: action_label = "исключение" if action_type == "remove" else "приглашение" self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") for chat in selected_chats: for user_id, user_info in user_infos.items(): try: if action_type == "remove": self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id) results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.") else: params = {'chat_id': chat['id'], 'user_id': user_id} if self.visible_messages_checkbox.isChecked(): params['visible_messages_count'] = 250 self._vk_call_with_retry(self.vk.messages.addChatUser, **params) results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.") except VkApiError as e: if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"): return results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}") finally: processed += 1 self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...") finally: self._set_busy(False) QMessageBox.information(self, "Результаты", "\n".join(results)) self.vk_url_input.clear() self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) def remove_user(self): self._execute_user_action("remove") def add_user_to_chat(self): self._execute_user_action("add") def set_user_admin(self): """Назначает пользователя администратором чата.""" # 1. Проверки на наличие выбранных пользователей и чатов if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", "Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return # 2. Подготовка данных для подтверждения user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} user_names_str = "\n".join([f"• {name}" for name in user_infos.values()]) msg = ( f"Вы уверены, что хотите назначить АДМИНИСТРАТОРАМИ следующих пользователей:\n\n" f"{user_names_str}\n\n" f"в {len(selected_chats)} выбранных чатах?" ) # 3. Диалог подтверждения confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение прав") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) confirm_dialog.setDefaultButton(no_button) confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return # 4. Выполнение API запросов results = [] total = len(selected_chats) * len(user_infos) processed = 0 try: self._set_busy(True, f"Статус: назначение админов (0/{total})...") for chat in selected_chats: # VK API требует peer_id. Для чатов это 2000000000 + local_id try: peer_id = 2000000000 + int(chat['id']) except ValueError: results.append(f"✗ Ошибка ID чата: {chat['id']}") continue for user_id, user_info in user_infos.items(): try: self._vk_call_with_retry( self.vk.messages.setMemberRole, peer_id=peer_id, member_id=user_id, role="admin" ) results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.") except VkApiError as e: if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"): return results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}") finally: processed += 1 self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...") finally: self._set_busy(False) # 5. Вывод результата QMessageBox.information(self, "Результаты назначения", "\n".join(results)) # Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод) self.vk_url_input.clear() self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) if __name__ == "__main__": if "--auth" in sys.argv: try: idx = sys.argv.index("--auth") auth_url = sys.argv[idx + 1] output_path = sys.argv[idx + 2] except Exception: sys.exit(1) auth_webview.main_auth(auth_url, output_path) sys.exit(0) app = QApplication(sys.argv) app.setStyle("Fusion") app.setPalette(app.style().standardPalette()) # Установка иконки для ВСЕХ окон приложения icon_path = get_resource_path("icon.ico") if os.path.exists(icon_path): app.setWindowIcon(QIcon(icon_path)) window = VkChatManager() window.show() sys.exit(app.exec())