import json import os import shutil import sys import time from PySide6.QtCore import QProcess from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, QTabWidget, QDialog, QDialogButtonBox, QProgressBar) from vk_api.exceptions import VkApiError import auth_webview from app_version import APP_VERSION from services import ( AutoUpdateService, UpdateChecker, VkService, detect_update_repository_url, load_chat_conversations, load_token as token_store_load_token, resolve_user_ids as chat_resolve_user_ids, save_token as token_store_save_token, ) from ui.dialogs import MultiLinkDialog as UIMultiLinkDialog from ui.main_window import instructions_text # --- Управление токенами и настройками --- APP_DATA_DIR = os.path.join( QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation), "AnabasisVKChatManager", ) TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") SETTINGS_FILE = os.path.join(APP_DATA_DIR, "settings.json") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup") LOG_FILE = os.path.join(APP_DATA_DIR, "app.log") LOG_MAX_BYTES = 1024 * 1024 # 1 MB LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1") AUTH_RELOGIN_BACKOFF_SECONDS = 5.0 # Legacy owner/repo format for GitHub-only fallback. UPDATE_REPOSITORY = "" # Full repository URL is preferred (supports GitHub/Gitea). UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_CHANNEL_DEFAULT = "stable" UPDATE_REQUEST_TIMEOUT = 8 AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin", "_unused") def get_resource_path(relative_path): """ Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """ if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай) return os.path.join(sys._MEIPASS, relative_path) # Для cx_Freeze и обычного запуска return os.path.join(os.path.abspath("."), relative_path) class BulkActionWorker(QObject): progress = Signal(int, int, str) finished = Signal(dict) auth_error = Signal(str, object, str) failed = Signal(str) done = Signal() def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages): super().__init__() self.vk_call_with_retry = vk_call_with_retry self.vk = vk_api self.action_type = action_type self.selected_chats = selected_chats self.user_infos = user_infos self.visible_messages = visible_messages self.total = len(self.selected_chats) * len(self.user_infos) @staticmethod def _is_auth_error(exc): return VkService.is_auth_error(exc, str(exc).lower()) def _emit_progress(self, processed): label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add") self.progress.emit(processed, self.total, label) def run(self): results = [] processed = 0 try: for chat in self.selected_chats: peer_id = None if self.action_type == "admin": try: peer_id = 2000000000 + int(chat["id"]) except (ValueError, TypeError): for _user_id, user_info in self.user_infos.items(): results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})") processed += 1 self._emit_progress(processed) continue for user_id, user_info in self.user_infos.items(): try: if self.action_type == "remove": self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id) results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.") elif self.action_type == "add": params = {"chat_id": chat["id"], "user_id": user_id} if self.visible_messages: params["visible_messages_count"] = 250 self.vk_call_with_retry(self.vk.messages.addChatUser, **params) results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.") elif self.action_type == "admin": self.vk_call_with_retry( self.vk.messages.setMemberRole, peer_id=peer_id, member_id=user_id, role="admin", ) results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.") else: raise RuntimeError(f"Unknown action: {self.action_type}") except VkApiError as exc: if self._is_auth_error(exc): context = "set_user_admin" if self.action_type == "admin" else "execute_user_action" action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями" self.auth_error.emit(context, exc, action_name) return results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}") finally: processed += 1 self._emit_progress(processed) self.finished.emit({"results": results, "processed": processed, "total": self.total}) except Exception as exc: self.failed.emit(str(exc)) finally: self.done.emit() class VkChatManager(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Anabasis Chat Manager") self.setGeometry(300, 300, 600, 800) self.token = None self.token_expiration_time = None self.chats = [] self.office_chat_checkboxes = [] self.retail_chat_checkboxes = [] self.warehouse_chat_checkboxes = [] self.coffee_chat_checkboxes = [] self.other_chat_checkboxes = [] self.vk_service = VkService() self.vk_session = None self.vk = None self.user_ids_to_process = [] self._busy = False self.suppress_resolve = False self.auth_process = None self.auth_output_path = None self._auth_process_error_text = None self._auth_ui_busy = False self._auth_relogin_in_progress = False self._last_auth_relogin_ts = 0.0 self._active_action_button = None self._active_action_button_text = "" self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_channel = UPDATE_CHANNEL_DEFAULT self.update_checker = None self.update_thread = None self._update_check_silent = False self._bulk_worker_thread = None self._bulk_worker = None self._bulk_action_context = None self._bulk_action_success_message_title = "" self._bulk_clear_inputs_on_success = True self.resolve_timer = QTimer(self) self.resolve_timer.setSingleShot(True) self.resolve_timer.setInterval(750) self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input) self._cleanup_cache_if_needed() self._ensure_log_dir() self._load_settings() self.init_ui() self.load_saved_token_on_startup() self.setup_token_timer() QTimer.singleShot(1800, lambda: self.check_for_updates(silent_no_updates=True)) def init_ui(self): central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(5) self.instructions = QTextBrowser() self.instructions.setFixedHeight(120) self.instructions.setPlainText(instructions_text()) layout.addWidget(self.instructions) layout.addWidget(QLabel("Access Token VK:")) self.token_input = QLineEdit() self.token_input.setPlaceholderText("Токен появится здесь после авторизации...") self.token_input.setReadOnly(True) layout.addWidget(self.token_input) self.token_timer_label = QLabel("Срок действия токена: Н/Д") self.token_timer_label.setAlignment(Qt.AlignmentFlag.AlignRight) layout.addWidget(self.token_timer_label) self.auth_btn = QPushButton("Авторизоваться через VK") self.auth_btn.clicked.connect(self.start_auth) layout.addWidget(self.auth_btn) self.auth_progress = QProgressBar() self.auth_progress.setRange(0, 0) self.auth_progress.setTextVisible(False) self.auth_progress.hide() layout.addWidget(self.auth_progress) self.chat_tabs = QTabWidget() self.chat_tabs.hide() self.office_tab = self.create_chat_tab() self.chat_tabs.addTab(self.office_tab, "AG Офис") self.retail_tab = self.create_chat_tab() self.chat_tabs.addTab(self.retail_tab, "AG Розница") self.warehouse_tab = self.create_chat_tab() self.chat_tabs.addTab(self.warehouse_tab, "AG Склад") self.coffee_tab = self.create_chat_tab() self.chat_tabs.addTab(self.coffee_tab, "AG Кофейни") self.other_tab = self.create_chat_tab() self.chat_tabs.addTab(self.other_tab, "Прочие") layout.addWidget(QLabel("Выберите чаты:")) select_buttons_layout = QHBoxLayout() self.select_all_btn = QPushButton("Выбрать все на вкладке") self.select_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(True)) self.deselect_all_btn = QPushButton("Снять выбор на вкладке") self.deselect_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(False)) self.refresh_chats_btn = QPushButton("Обновить чаты") self.refresh_chats_btn.clicked.connect(self.load_chats) select_buttons_layout.addWidget(self.select_all_btn) select_buttons_layout.addWidget(self.deselect_all_btn) select_buttons_layout.addWidget(self.refresh_chats_btn) layout.addLayout(select_buttons_layout) layout.addWidget(self.chat_tabs) layout.addWidget(QLabel("Ссылка на страницу VK (ID определится автоматически):")) link_input_layout = QHBoxLayout() self.vk_url_input = QLineEdit() self.vk_url_input.setPlaceholderText("https://vk.com/id1") self.vk_url_input.textChanged.connect(self.on_vk_url_input_changed) link_input_layout.addWidget(self.vk_url_input) self.multi_link_btn = QPushButton("Список") self.multi_link_btn.setToolTip("Ввести несколько ссылок списком") self.multi_link_btn.clicked.connect(self.open_multi_link_dialog) link_input_layout.addWidget(self.multi_link_btn) layout.addLayout(link_input_layout) self.remove_user_btn = QPushButton("ИСКЛЮЧИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.remove_user_btn.setMinimumHeight(50) self.remove_user_btn.clicked.connect(self.remove_user) layout.addWidget(self.remove_user_btn) self.visible_messages_checkbox = QCheckBox("Показать 250 последних сообщений при добавлении") layout.addWidget(self.visible_messages_checkbox) self.add_user_btn = QPushButton("ПРИГЛАСИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.add_user_btn.setMinimumHeight(50) self.add_user_btn.clicked.connect(self.add_user_to_chat) layout.addWidget(self.add_user_btn) self.operation_progress = QProgressBar() self.operation_progress.setRange(0, 100) self.operation_progress.setValue(0) self.operation_progress.setTextVisible(True) self.operation_progress.hide() layout.addWidget(self.operation_progress) self.status_label = QLabel("Статус: не авторизован") self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter) layout.addWidget(self.status_label) layout.addStretch(1) self.create_menu() self.set_ui_state(False) def on_vk_url_input_changed(self, text): if self.suppress_resolve: return if self.vk_url_input.hasFocus(): self.resolve_timer.start() def open_multi_link_dialog(self): dialog = UIMultiLinkDialog(self) if dialog.exec(): links = dialog.get_links() if links: self._set_vk_url_input_text("") self._process_links_list(links) else: QMessageBox.information(self, "Информация", "Список ссылок пуст.") def resolve_single_user_id_from_input(self): url = self.vk_url_input.text().strip() if not url: self.user_ids_to_process.clear() self.status_label.setText("Статус: Введите ссылку или откройте список.") self.set_ui_state(self.token is not None) return self._process_links_list([url]) def create_menu(self): """Создает верхнее меню.""" menu_bar = self.menuBar() # Меню "Инструменты" tools_menu = menu_bar.addMenu("Инструменты") # Действие "Назначить администратором" make_admin_action = QAction("Назначить администратором", self) make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах") make_admin_action.triggered.connect(self.set_user_admin) tools_menu.addAction(make_admin_action) self.make_admin_action = make_admin_action check_updates_action = QAction("Проверить обновления", self) check_updates_action.setStatusTip("Проверить наличие новой версии приложения") check_updates_action.triggered.connect(self.check_for_updates) tools_menu.addAction(check_updates_action) self.check_updates_action = check_updates_action channel_menu = tools_menu.addMenu("Канал обновлений") self.update_channel_group = QActionGroup(self) self.update_channel_group.setExclusive(True) stable_channel_action = QAction("Релизы (stable)", self) stable_channel_action.setCheckable(True) stable_channel_action.setChecked(self.update_channel == "stable") stable_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("stable")) channel_menu.addAction(stable_channel_action) self.update_channel_group.addAction(stable_channel_action) self.update_channel_stable_action = stable_channel_action beta_channel_action = QAction("Бета (pre-release)", self) beta_channel_action.setCheckable(True) beta_channel_action.setChecked(self.update_channel == "beta") beta_channel_action.triggered.connect(lambda checked: checked and self.set_update_channel("beta")) channel_menu.addAction(beta_channel_action) self.update_channel_group.addAction(beta_channel_action) self.update_channel_beta_action = beta_channel_action logout_action = QAction("Выйти и очистить", self) logout_action.setStatusTip("Выйти, удалить токен и кэш") logout_action.triggered.connect(self.logout_and_clear) tools_menu.addAction(logout_action) self.logout_action = logout_action help_menu = menu_bar.addMenu("Справка") about_action = QAction("О приложении", self) about_action.setStatusTip("Показать информацию о приложении") about_action.triggered.connect(self.show_about_dialog) help_menu.addAction(about_action) self.about_action = about_action def show_about_dialog(self): dialog = QDialog(self) dialog.setWindowTitle("О приложении") dialog.setMinimumWidth(460) repo_url = self.update_repository_url if repo_url: repo_html = f'{repo_url}' else: repo_html = "не указан" content = QLabel( f"Anabasis Chat Manager
" f"Версия: {APP_VERSION}

" "Инструмент для массового управления пользователями в чатах VK.
" "Поддерживается проверка обновлений и автообновление Windows-сборки.

" f"Репозиторий: {repo_html}" ) content.setTextFormat(Qt.TextFormat.RichText) content.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) content.setOpenExternalLinks(True) content.setWordWrap(True) button_box = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok, parent=dialog) button_box.accepted.connect(dialog.accept) layout = QVBoxLayout(dialog) layout.addWidget(content) layout.addWidget(button_box) dialog.exec() def create_chat_tab(self): # This implementation correctly creates a scrollable area for chat lists. tab_content_widget = QWidget() tab_layout = QVBoxLayout(tab_content_widget) tab_layout.setContentsMargins(0, 0, 0, 0) tab_layout.setSpacing(0) scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) tab_layout.addWidget(scroll_area) checkbox_container_widget = QWidget() checkbox_layout = QVBoxLayout(checkbox_container_widget) checkbox_layout.setContentsMargins(5, 5, 5, 5) checkbox_layout.setSpacing(2) checkbox_layout.addStretch() scroll_area.setWidget(checkbox_container_widget) return tab_content_widget def _set_update_action_state(self, in_progress): if hasattr(self, "check_updates_action"): self.check_updates_action.setEnabled(not in_progress) def _normalize_update_channel(self, value): channel = (value or "").strip().lower() if channel in ("beta", "betas", "pre", "prerelease", "pre-release"): return "beta" return "stable" def _load_settings(self): self.update_channel = UPDATE_CHANNEL_DEFAULT if not os.path.exists(SETTINGS_FILE): return try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: settings = json.load(f) self.update_channel = self._normalize_update_channel(settings.get("update_channel")) except Exception as e: self._log_event("settings_load", f"Ошибка загрузки настроек: {e}", level="WARN") def _save_settings(self): try: os.makedirs(APP_DATA_DIR, exist_ok=True) settings = { "update_channel": self.update_channel, } with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(settings, f, ensure_ascii=False, indent=2) except Exception as e: self._log_event("settings_save", f"Ошибка сохранения настроек: {e}", level="WARN") def set_update_channel(self, channel): normalized = self._normalize_update_channel(channel) if normalized == self.update_channel: return self.update_channel = normalized self._save_settings() self.status_label.setText( f"Статус: канал обновлений переключен на {'бета' if normalized == 'beta' else 'релизы'}." ) self._log_event("update_channel", f"update_channel={self.update_channel}") def check_for_updates(self, silent_no_updates=False): if self.update_thread and self.update_thread.isRunning(): return self._update_check_silent = silent_no_updates self._set_update_action_state(True) channel_label = "бета" if self.update_channel == "beta" else "релизы" self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...") self.update_checker = UpdateChecker( self.update_repository_url, APP_VERSION, request_timeout=UPDATE_REQUEST_TIMEOUT, channel=self.update_channel, ) self.update_thread = QThread(self) self.update_checker.moveToThread(self.update_thread) self.update_thread.started.connect(self.update_checker.run) self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_checker.check_finished.connect(self.update_thread.quit) self.update_checker.check_failed.connect(self.update_thread.quit) self.update_checker.check_finished.connect(self.update_checker.deleteLater) self.update_checker.check_failed.connect(self.update_checker.deleteLater) self.update_thread.finished.connect(self.update_thread.deleteLater) self.update_thread.start() def _on_update_check_finished(self, result): self._set_update_action_state(False) self.update_checker = None self.update_thread = None if result.get("has_update"): latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown" self.status_label.setText(f"Статус: доступно обновление {latest_version}") message_box = QMessageBox(self) message_box.setIcon(QMessageBox.Icon.Information) message_box.setWindowTitle("Доступно обновление") message_box.setText( f"Текущая версия: {result.get('current_version')}\n" f"Доступная версия: {latest_version}\n\n" "Открыть страницу загрузки?" ) release_notes = (result.get("release_notes") or "").strip() if release_notes: preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()] preview_text = "\n".join(preview_lines[:8]) if len(preview_lines) > 8: preview_text += "\n..." message_box.setInformativeText(f"Что нового:\n{preview_text}") message_box.setDetailedText(release_notes) update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole) setup_button = None installer_url = result.get("installer_url") if installer_url: setup_button = message_box.addButton("Скачать и установить (setup)", QMessageBox.ButtonRole.AcceptRole) releases_button = message_box.addButton("Релизы", QMessageBox.ButtonRole.ActionRole) cancel_button = message_box.addButton("Позже", QMessageBox.ButtonRole.RejectRole) message_box.setDefaultButton(update_now_button) message_box.exec() clicked = message_box.clickedButton() download_url = result.get("download_url") checksum_url = result.get("checksum_url") download_name = result.get("download_name") release_url = result.get("release_url") if clicked is update_now_button and download_url: if not self._start_auto_update(download_url, latest_version, checksum_url, download_name): if release_url: QDesktopServices.openUrl(QUrl(release_url)) return if setup_button is not None and clicked is setup_button and installer_url: QDesktopServices.openUrl(QUrl(installer_url)) return if clicked is download_button and download_url: QDesktopServices.openUrl(QUrl(download_url)) elif clicked in (download_button, releases_button) and release_url: QDesktopServices.openUrl(QUrl(release_url)) elif clicked not in (cancel_button,): self._log_event("update_check", "Диалог обновления закрыт без действия.", level="WARN") return channel_label = "бета" if self.update_channel == "beta" else "релизы" self.status_label.setText(f"Статус: используется актуальная версия ({APP_VERSION}, канал: {channel_label}).") if not self._update_check_silent: QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.") def _on_update_check_failed(self, error_text): self._set_update_action_state(False) self.update_checker = None self.update_thread = None self._log_event("update_check_failed", error_text, level="WARN") if not self.update_repository_url: self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).") if not self._update_check_silent: QMessageBox.warning( self, "Обновления не настроены", "Не задан URL репозитория для обновлений.\n" "Укажите UPDATE_REPOSITORY_URL в main.py или переменную окружения " "ANABASIS_UPDATE_URL (например: https://git.daemonlord.ru/owner/repo).", ) return self.status_label.setText("Статус: не удалось проверить обновления.") if not self._update_check_silent: QMessageBox.warning(self, "Проверка обновлений", error_text) def setup_token_timer(self): self.token_countdown_timer = QTimer(self) self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.start(1000) def update_token_timer_display(self): if self.token_expiration_time is None: self.token_timer_label.setText("Срок действия токена: Н/Д") return # ИСПРАВЛЕНИЕ: обрабатываем бессрочный токен if self.token_expiration_time == 0: self.token_timer_label.setText("Срок действия: Бессрочно") return remaining_seconds = int(self.token_expiration_time - time.time()) if remaining_seconds <= 0: self.token_timer_label.setText("Срок действия истек!") if self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self.set_ui_state(False) self.status_label.setText("Статус: Срок действия истек, авторизуйтесь заново.") self.token, self.token_expiration_time = None, None self.token_input.clear() return minutes, seconds = divmod(remaining_seconds, 60) hours, minutes = divmod(minutes, 60) self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с") def set_ui_state(self, authorized): self.auth_btn.setEnabled((not authorized) and (not self._auth_ui_busy)) for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox]: btn.setEnabled(authorized) has_ids = authorized and bool(self.user_ids_to_process) self.remove_user_btn.setEnabled(has_ids) self.add_user_btn.setEnabled(has_ids) self.chat_tabs.setVisible(authorized) if authorized: # Когда авторизованы, задаем минимальную высоту, достаточную для ~10-12 чатов self.chat_tabs.setMinimumHeight(300) else: # Когда не авторизованы, сбрасываем минимальную высоту self.chat_tabs.setMinimumHeight(0) if not authorized: self.user_ids_to_process.clear() self._set_vk_url_input_text("") self._clear_chat_tabs() if hasattr(self, "make_admin_action"): self.make_admin_action.setEnabled(authorized and (not self._auth_ui_busy)) if hasattr(self, "logout_action"): self.logout_action.setEnabled(not self._auth_ui_busy) def _set_auth_ui_state(self, in_progress): self._auth_ui_busy = in_progress self.auth_progress.setVisible(in_progress) if hasattr(self, "logout_action"): self.logout_action.setEnabled(not in_progress) if hasattr(self, "make_admin_action"): self.make_admin_action.setEnabled(not in_progress and self.token is not None) self.auth_btn.setEnabled((self.token is None) and (not in_progress)) def _set_busy(self, busy, status_text=None): if status_text: self.status_label.setText(status_text) if busy: self._busy = True QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) for widget in [ self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox, self.remove_user_btn, self.add_user_btn ]: widget.setEnabled(False) else: self._busy = False QApplication.restoreOverrideCursor() if self.token is None: self.set_ui_state(False) else: self.set_ui_state(True) def _start_operation_progress(self, total, label_text, action_button=None): total = max(1, int(total)) self.operation_progress.setRange(0, total) self.operation_progress.setValue(0) self.operation_progress.setFormat(f"{label_text}: %v/%m") self.operation_progress.show() self._active_action_button = action_button self._active_action_button_text = action_button.text() if action_button else "" if action_button is not None: action_button.setText(f"{label_text} (0/{total})") action_button.setEnabled(False) def _update_operation_progress(self, processed, total, label_text): total = max(1, int(total)) processed = max(0, min(int(processed), total)) self.operation_progress.setRange(0, total) self.operation_progress.setValue(processed) self.operation_progress.setFormat(f"{label_text}: {processed}/{total}") if self._active_action_button is not None: self._active_action_button.setText(f"{label_text} ({processed}/{total})") def _finish_operation_progress(self): self.operation_progress.hide() self.operation_progress.setValue(0) self.operation_progress.setFormat("%p%") if self._active_action_button is not None: self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text()) self._active_action_button = None self._active_action_button_text = "" def _start_bulk_action_worker( self, action_type, selected_chats, user_infos, action_label, action_button=None, success_message_title="Результаты", clear_inputs_on_success=True, ): total = max(1, len(selected_chats) * len(user_infos)) self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action" self._bulk_action_success_message_title = success_message_title self._bulk_clear_inputs_on_success = clear_inputs_on_success self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") self._start_operation_progress(total, action_label, action_button=action_button) worker = BulkActionWorker( vk_call_with_retry=self._vk_call_with_retry, vk_api=self.vk, action_type=action_type, selected_chats=selected_chats, user_infos=user_infos, visible_messages=self.visible_messages_checkbox.isChecked(), ) thread = QThread(self) worker.moveToThread(thread) thread.started.connect(worker.run) worker.progress.connect(self._on_bulk_action_progress) worker.finished.connect(self._on_bulk_action_finished) worker.auth_error.connect(self._on_bulk_action_auth_error) worker.failed.connect(self._on_bulk_action_failed) worker.done.connect(self._on_bulk_action_done) worker.done.connect(thread.quit) worker.done.connect(worker.deleteLater) thread.finished.connect(thread.deleteLater) self._bulk_worker = worker self._bulk_worker_thread = thread thread.start() def _on_bulk_action_progress(self, processed, total, label): label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label) self._update_operation_progress(processed, total, label_text) self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...") def _on_bulk_action_finished(self, payload): results = payload.get("results", []) if isinstance(payload, dict) else [] QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results)) if self._bulk_clear_inputs_on_success: self.vk_url_input.clear() self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) def _on_bulk_action_auth_error(self, context, exc, action_name): self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name) def _on_bulk_action_failed(self, error_text): QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}") def _on_bulk_action_done(self): self._finish_operation_progress() self._set_busy(False) self._bulk_worker = None self._bulk_worker_thread = None def _ensure_log_dir(self): os.makedirs(APP_DATA_DIR, exist_ok=True) def _log(self, level, context, message): try: os.makedirs(APP_DATA_DIR, exist_ok=True) self._rotate_log_if_needed() timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss") with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] [{level}] {context}: {message}\n") except Exception: pass def _log_error(self, context, exc): self._log("ERROR", context, self._format_vk_error(exc)) def _log_event(self, context, message, level="INFO"): self._log(level, context, message) def _rotate_log_if_needed(self): try: if not os.path.exists(LOG_FILE): return if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES: return if os.path.exists(LOG_BACKUP_FILE): os.remove(LOG_BACKUP_FILE) os.replace(LOG_FILE, LOG_BACKUP_FILE) except Exception: pass def _format_vk_error(self, exc): error = getattr(exc, "error", None) code = None message = str(exc) if isinstance(error, dict): code = error.get("error_code") message = error.get("error_msg") or message hints = { 5: "Ошибка авторизации. Проверьте токен.", 6: "Слишком много запросов. Подождите и повторите.", 7: "Недостаточно прав.", 9: "Слишком много однотипных действий.", 10: "Внутренняя ошибка VK. Повторите позже.", 15: "Доступ запрещен.", 100: "Некорректный параметр запроса.", 113: "Неверный идентификатор пользователя.", 200: "Доступ к чату запрещен.", } if code in hints: message = f"{message} ({hints[code]})" if code is not None: return f"[{code}] {message}" return message def _set_vk_url_input_text(self, text): self.suppress_resolve = True try: self.vk_url_input.blockSignals(True) self.vk_url_input.setText(text) finally: self.vk_url_input.blockSignals(False) self.suppress_resolve = False def logout_and_clear(self): confirm = QMessageBox.question( self, "Подтверждение выхода", "Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No ) if confirm != QMessageBox.StandardButton.Yes: return self._clear_auth_state(stop_timer=True, remove_token_file=True) try: self._try_remove_web_cache() except Exception as e: self._log_event("logout_and_clear", f"Cache cleanup failed: {e}", level="WARN") def _cleanup_cache_if_needed(self): if os.path.exists(CACHE_CLEANUP_MARKER): try: self._try_remove_web_cache() if os.path.exists(CACHE_CLEANUP_MARKER): os.remove(CACHE_CLEANUP_MARKER) except Exception as e: print(f"Ошибка отложенной очистки кэша: {e}") def _try_remove_web_cache(self): if not os.path.exists(WEB_ENGINE_CACHE_DIR): return attempts = 5 last_error = None for _ in range(attempts): try: shutil.rmtree(WEB_ENGINE_CACHE_DIR) last_error = None break except Exception as e: last_error = e time.sleep(0.2) if last_error: os.makedirs(APP_DATA_DIR, exist_ok=True) with open(CACHE_CLEANUP_MARKER, "w") as f: f.write("pending") raise last_error def load_saved_token_on_startup(self): loaded_token, expiration_time = token_store_load_token(TOKEN_FILE) if loaded_token: self.handle_auth_token_on_load(loaded_token, expiration_time) else: self.set_ui_state(False) def set_all_checkboxes_on_current_tab(self, checked): current_index = self.chat_tabs.currentIndex() checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes] if 0 <= current_index < len(checkbox_lists): for checkbox in checkbox_lists[current_index]: checkbox.setChecked(checked) def _build_auth_command(self, auth_url, output_path): return self.vk_service.build_auth_command(auth_url, output_path, entry_script_path=os.path.abspath(__file__)) def _on_auth_process_error(self, process_error): self._auth_process_error_text = f"Ошибка запуска авторизации: {process_error}" # For failed starts Qt may not emit finished(), so release UI here. if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning: output_path = self.auth_output_path self.auth_output_path = None self.auth_process = None self._set_auth_ui_state(False) self.status_label.setText(f"Статус: {self._auth_process_error_text}") self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR") self._auth_process_error_text = None self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) try: if output_path and os.path.exists(output_path): os.remove(output_path) except Exception: pass def _on_auth_process_finished(self, exit_code, _exit_status): output_path = self.auth_output_path self.auth_output_path = None self.auth_process = None self._set_auth_ui_state(False) if self._auth_process_error_text: self.status_label.setText(f"Статус: {self._auth_process_error_text}") self._log_event("auth_process_error", self._auth_process_error_text, level="ERROR") self._auth_process_error_text = None self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) return if exit_code != 0: self.status_label.setText(f"Статус: авторизация не удалась (код {exit_code}).") self._log_event("auth_process_finished", f"exit_code={exit_code}", level="WARN") self._auth_relogin_in_progress = False self.set_ui_state(self.token is not None) return token = None expires_in = 0 if output_path and os.path.exists(output_path): try: with open(output_path, "r", encoding="utf-8") as f: data = json.load(f) token = data.get("token") expires_in = data.get("expires_in", 0) except Exception as e: self._log_event("auth_result_parse", f"Ошибка чтения результата авторизации: {e}", level="ERROR") finally: try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass else: self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN") self._auth_relogin_in_progress = False self.handle_new_auth_token(token, expires_in) def start_auth(self, keep_status_text=False): if self.auth_process and self.auth_process.state() != QProcess.ProcessState.NotRunning: self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.") return if keep_status_text and hasattr(self, "_relogin_status_text"): status_text = self._relogin_status_text self._relogin_status_text = None else: status_text = "Статус: ожидание авторизации..." self.status_label.setText(status_text) auth_url = ( "https://oauth.vk.com/authorize?" "client_id=2685278&" "display=page&" "redirect_uri=https://oauth.vk.com/blank.html&" "scope=1073737727&" "response_type=token&" "v=5.131" ) output_path = os.path.join(APP_DATA_DIR, "auth_result.json") try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass program, args = self._build_auth_command(auth_url, output_path) self.auth_output_path = output_path self._auth_process_error_text = None process = QProcess(self) process.finished.connect(self._on_auth_process_finished) process.errorOccurred.connect(self._on_auth_process_error) self.auth_process = process self._set_auth_ui_state(True) process.start(program, args) def handle_new_auth_token(self, token, expires_in): if not token: self.status_label.setText("Статус: Авторизация не удалась") self.set_ui_state(False) self._auth_relogin_in_progress = False return self.token = token # Сохраняем и получаем корректный expiration_time (0 или будущее время) try: self.token_expiration_time = token_store_save_token( self.token, TOKEN_FILE, APP_DATA_DIR, expires_in=expires_in, ) except Exception as e: try: expires_value = int(expires_in) except (TypeError, ValueError): expires_value = 0 self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0 self._log_event("token_store_save", str(e), level="WARN") QMessageBox.warning( self, "Предупреждение", "Не удалось безопасно сохранить токен на диске. " "Текущая сессия активна, но после перезапуска потребуется повторная авторизация.", ) self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован") self.vk_service.set_token(self.token) self.vk_session = self.vk_service.session self.vk = self.vk_service.api self.set_ui_state(True) self._auth_relogin_in_progress = False self.load_chats() def handle_auth_token_on_load(self, token, expiration_time): self.token = token self.token_expiration_time = expiration_time self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован (токен загружен)") self.vk_service.set_token(self.token) self.vk_session = self.vk_service.session self.vk = self.vk_service.api self.set_ui_state(True) self._auth_relogin_in_progress = False self.load_chats() def _clear_chat_tabs(self): self.chats.clear() for chk_list in [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]: for checkbox in chk_list: checkbox.setParent(None) checkbox.deleteLater() chk_list.clear() def _vk_error_code(self, exc): return self.vk_service.vk_error_code(exc) def _is_auth_token_error(self, exc): message = self._format_vk_error(exc).lower() return self.vk_service.is_auth_error(exc, message) def _clear_auth_state(self, stop_timer=False, remove_token_file=True): self.token = None self.token_expiration_time = None self.vk_service.clear() self.vk_session = None self.vk = None self.user_ids_to_process.clear() self._set_vk_url_input_text("") self.token_input.clear() if stop_timer and self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self.token_timer_label.setText("Срок действия токена: Н/Д") self.status_label.setText("Статус: не авторизован") self._clear_chat_tabs() self.set_ui_state(False) if remove_token_file: try: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) except Exception: pass def _force_relogin(self, exc, action_name): now = time.monotonic() if self._auth_relogin_in_progress: self._log_event("force_relogin_skip", f"already_in_progress action={action_name}", level="WARN") return elapsed = now - self._last_auth_relogin_ts if elapsed < AUTH_RELOGIN_BACKOFF_SECONDS: wait_seconds = int(AUTH_RELOGIN_BACKOFF_SECONDS - elapsed) + 1 self.status_label.setText(f"Статус: повторная авторизация через {wait_seconds} сек.") self._log_event("force_relogin_backoff", f"action={action_name}; wait={wait_seconds}s", level="WARN") return self._auth_relogin_in_progress = True self._last_auth_relogin_ts = now error_code = self._vk_error_code(exc) self._log_event( "force_relogin", f"action={action_name}; code={error_code}; message={self._format_vk_error(exc)}", level="WARN", ) self._clear_auth_state() self._relogin_status_text = "Статус: Токен отозван VK, выполните повторный вход." QMessageBox.warning( self, "Требуется повторная авторизация", f"Во время {action_name} получена ошибка авторизации:\n" f"{self._format_vk_error(exc)}\n\n" "Сейчас откроется окно авторизации VK." ) self.start_auth(keep_status_text=True) def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False): self._log_error(context, exc) if self._is_auth_token_error(exc): self._force_relogin(exc, action_name or context) return True if ui_message_prefix: QMessageBox.critical(self, "Ошибка", f"{ui_message_prefix}: {self._format_vk_error(exc)}") if disable_ui: self.set_ui_state(False) return False def _vk_call_with_retry(self, func, *args, **kwargs): return self.vk_service.call_with_retry(func, *args, **kwargs) def get_user_info_by_id(self, user_id): try: user = self.vk.users.get(user_ids=user_id)[0] return f"{user.get('first_name', '')} {user.get('last_name', '')}" except Exception: return f"Пользователь {user_id}" def _get_selected_chats(self): selected = [] for chk in self.office_chat_checkboxes + self.retail_chat_checkboxes + self.warehouse_chat_checkboxes + self.coffee_chat_checkboxes + self.other_chat_checkboxes: if chk.isChecked(): chat_id = chk.property("chat_id") title = next((c['title'] for c in self.chats if c['id'] == chat_id), "") selected.append({'id': chat_id, 'title': title}) return selected def _execute_user_action(self, action_type, action_button=None): if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} action_verb = "исключить" if action_type == "remove" else "пригласить" preposition = "из" if action_type == "remove" else "в" user_names_list = list(user_infos.values()) user_names_str = "\n".join([f"• {name}" for name in user_names_list]) chat_count = len(selected_chats) chat_str = "" # Финальная логика склонения с учетом падежа (для "из" и "в") if chat_count % 10 == 1 and chat_count % 100 != 11: if action_type == 'remove': # из 1 выбранного чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранного чата" else: # в 1 выбранный чат (Винительный падеж, ед.ч.) chat_str = f"{chat_count} выбранный чат" elif 2 <= chat_count % 10 <= 4 and (chat_count % 100 < 10 or chat_count % 100 >= 20): if action_type == 'remove': # из 3 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" else: # в 3 выбранных чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранных чата" else: # из 5 выбранных чатов / в 5 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" msg = ( f"Вы уверены, что хотите {action_verb} следующих пользователей:\n\n" f"{user_names_str}\n\n" f"{preposition} {chat_str}?" ) confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение действия") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Icon.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole) confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return action_label = "исключение" if action_type == "remove" else "приглашение" self._start_bulk_action_worker( action_type=action_type, selected_chats=selected_chats, user_infos=user_infos, action_label=action_label, action_button=action_button, success_message_title="Результаты", clear_inputs_on_success=True, ) return def remove_user(self): self._execute_user_action("remove", action_button=self.remove_user_btn) def add_user_to_chat(self): self._execute_user_action("add", action_button=self.add_user_btn) def set_user_admin(self): """Назначает пользователя администратором чата.""" # 1. Проверки на наличие выбранных пользователей и чатов if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", "Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return # 2. Подготовка данных для подтверждения user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} user_names_str = "\n".join([f"• {name}" for name in user_infos.values()]) msg = ( f"Вы уверены, что хотите назначить АДМИНИСТРАТОРАМИ следующих пользователей:\n\n" f"{user_names_str}\n\n" f"в {len(selected_chats)} выбранных чатах?" ) # 3. Диалог подтверждения confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение прав") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Icon.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.ButtonRole.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.ButtonRole.NoRole) confirm_dialog.setDefaultButton(no_button) confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return self._start_bulk_action_worker( action_type="admin", selected_chats=selected_chats, user_infos=user_infos, action_label="назначение админов", action_button=None, success_message_title="Результаты назначения", clear_inputs_on_success=True, ) return # Refactor overrides: keep logic in service modules and thin UI orchestration here. def _process_links_list(self, links_list): if not self.vk: QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") return self.user_ids_to_process.clear() resolved_ids = [] failed_links = [] self._set_busy(True, "Статус: Определяю ID...") try: resolved_ids, failed_items = chat_resolve_user_ids( self._vk_call_with_retry, self.vk, links_list, ) for failed_link, failed_exc in failed_items: if isinstance(failed_exc, VkApiError): if self._handle_vk_api_error("resolveScreenName", failed_exc, action_name="получения ID пользователей"): return failed_links.append(f"{failed_link} ({self._format_vk_error(failed_exc)})") else: failed_links.append(failed_link) finally: self._set_busy(False) self.user_ids_to_process = resolved_ids status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)." if len(links_list) > 1: self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") if failed_links: QMessageBox.warning( self, "Ошибка получения ID", "Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links), ) self.status_label.setText(status_message) self.set_ui_state(self.token is not None) def load_chats(self): self._clear_chat_tabs() layouts = [ self.office_tab.findChild(QWidget).findChild(QVBoxLayout), self.retail_tab.findChild(QWidget).findChild(QVBoxLayout), self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout), self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout), self.other_tab.findChild(QWidget).findChild(QVBoxLayout), ] try: self._set_busy(True, "Статус: загрузка чатов...") conversations = load_chat_conversations(self._vk_call_with_retry, self.vk) for conv in conversations: if conv["conversation"]["peer"]["type"] != "chat": continue chat_id = conv["conversation"]["peer"]["local_id"] title = conv["conversation"]["chat_settings"]["title"] self.chats.append({"id": chat_id, "title": title}) checkbox = QCheckBox(f"{title} (id: {chat_id})") checkbox.setProperty("chat_id", chat_id) if "AG офис" in title: layouts[0].insertWidget(layouts[0].count() - 1, checkbox) self.office_chat_checkboxes.append(checkbox) elif "AG розница" in title: layouts[1].insertWidget(layouts[1].count() - 1, checkbox) self.retail_chat_checkboxes.append(checkbox) elif "AG склад" in title: layouts[2].insertWidget(layouts[2].count() - 1, checkbox) self.warehouse_chat_checkboxes.append(checkbox) elif "AG кофейни" in title: layouts[3].insertWidget(layouts[3].count() - 1, checkbox) self.coffee_chat_checkboxes.append(checkbox) else: layouts[4].insertWidget(layouts[4].count() - 1, checkbox) self.other_chat_checkboxes.append(checkbox) self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})") self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})") self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") except VkApiError as e: if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"): return QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}") self.set_ui_state(False) finally: self._set_busy(False) def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""): if os.name != "nt": QMessageBox.information( self, "Автообновление", "Автообновление пока поддерживается только в Windows-сборке.", ) return False if not getattr(sys, "frozen", False): QMessageBox.information( self, "Автообновление", "Автообновление доступно в собранной версии приложения (.exe).", ) return False if not download_url: QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.") return False self.status_label.setText(f"Статус: загрузка обновления {latest_version}...") self._set_busy(True) try: work_dir, source_dir = AutoUpdateService.prepare_update( download_url=download_url, checksum_url=checksum_url, download_name=download_name, ) app_exe = sys.executable AutoUpdateService.launch_gui_updater( app_exe=app_exe, source_dir=source_dir, work_dir=work_dir, target_pid=os.getpid(), version=latest_version, ) self._log_event("auto_update", f"Update {latest_version} started from {download_url}") self.status_label.setText("Статус: обновление запущено, закрываю приложение...") self.close() QTimer.singleShot(0, QApplication.instance().quit) return True except Exception as e: self._log_event("auto_update_failed", str(e), level="ERROR") QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}") return False finally: self._set_busy(False) if __name__ == "__main__": if "--auth" in sys.argv: try: idx = sys.argv.index("--auth") auth_url = sys.argv[idx + 1] output_path = sys.argv[idx + 2] except Exception: sys.exit(1) auth_webview.main_auth(auth_url, output_path) sys.exit(0) app = QApplication(sys.argv) app.setStyle("Fusion") app.setPalette(app.style().standardPalette()) # Установка иконки для ВСЕХ окон приложения icon_path = get_resource_path("icon.ico") if os.path.exists(icon_path): app.setWindowIcon(QIcon(icon_path)) window = VkChatManager() window.show() sys.exit(app.exec())