import sys import base64 import ctypes import shutil import subprocess from vk_api import VkApi import json import time import auth_webview import os from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, QPushButton, QVBoxLayout, QWidget, QMessageBox, QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout, QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox) from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer from PySide6.QtGui import QIcon, QAction from urllib.parse import urlparse, parse_qs, unquote from vk_api.exceptions import VkApiError from PySide6.QtCore import QStandardPaths from ctypes import wintypes # --- Управление токенами и настройками --- APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager") TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup") LOG_FILE = os.path.join(APP_DATA_DIR, "app.log") LOG_MAX_BYTES = 1024 * 1024 # 1 MB LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1") class _DataBlob(ctypes.Structure): _fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))] _crypt32 = None _kernel32 = None if os.name == "nt": _crypt32 = ctypes.WinDLL("crypt32", use_last_error=True) _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) _crypt32.CryptProtectData.argtypes = [ ctypes.POINTER(_DataBlob), wintypes.LPCWSTR, ctypes.POINTER(_DataBlob), ctypes.c_void_p, ctypes.c_void_p, wintypes.DWORD, ctypes.POINTER(_DataBlob), ] _crypt32.CryptProtectData.restype = wintypes.BOOL _crypt32.CryptUnprotectData.argtypes = [ ctypes.POINTER(_DataBlob), ctypes.POINTER(wintypes.LPWSTR), ctypes.POINTER(_DataBlob), ctypes.c_void_p, ctypes.c_void_p, wintypes.DWORD, ctypes.POINTER(_DataBlob), ] _crypt32.CryptUnprotectData.restype = wintypes.BOOL def _crypt_protect_data(data, description=""): buffer = ctypes.create_string_buffer(data) data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) data_out = _DataBlob() if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)): raise ctypes.WinError(ctypes.get_last_error()) try: return ctypes.string_at(data_out.pbData, data_out.cbData) finally: _kernel32.LocalFree(data_out.pbData) def _crypt_unprotect_data(data): buffer = ctypes.create_string_buffer(data) data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte))) data_out = _DataBlob() if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)): raise ctypes.WinError(ctypes.get_last_error()) try: return ctypes.string_at(data_out.pbData, data_out.cbData) finally: _kernel32.LocalFree(data_out.pbData) def _encrypt_token(token): if os.name != "nt": raise RuntimeError("DPAPI is available only on Windows.") encrypted_bytes = _crypt_protect_data(token.encode("utf-8")) return base64.b64encode(encrypted_bytes).decode("ascii") def _decrypt_token(token_data): if os.name != "nt": raise RuntimeError("DPAPI is available only on Windows.") encrypted_bytes = base64.b64decode(token_data.encode("ascii")) decrypted_bytes = _crypt_unprotect_data(encrypted_bytes) return decrypted_bytes.decode("utf-8") def get_resource_path(relative_path): """ Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """ if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай) return os.path.join(sys._MEIPASS, relative_path) # Для cx_Freeze и обычного запуска return os.path.join(os.path.abspath("."), relative_path) def save_token(token, expires_in=0): """Сохраняет токен. Если expires_in=0, токен считается бессрочным.""" try: expires_in = int(expires_in) except (ValueError, TypeError): expires_in = 0 os.makedirs(APP_DATA_DIR, exist_ok=True) # ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0 expiration_time = (time.time() + expires_in) if expires_in > 0 else 0 stored_token = token encrypted = False if os.name == "nt": try: stored_token = _encrypt_token(token) encrypted = True except Exception as e: print(f"Ошибка шифрования токена: {e}") data = { "token": stored_token, "expiration_time": expiration_time, "encrypted": encrypted } try: with open(TOKEN_FILE, "w") as f: json.dump(data, f) status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString() print(f"Токен сохранен. Срок действия: {status}") return expiration_time except IOError as e: print(f"Ошибка сохранения токена: {e}") return None def load_token(): """Загружает токен и проверяет его валидность.""" try: if not os.path.exists(TOKEN_FILE): return None, None with open(TOKEN_FILE, "r") as f: data = json.load(f) token = data.get("token") encrypted = data.get("encrypted", False) if token and encrypted: try: token = _decrypt_token(token) except Exception as e: print(f"Ошибка расшифровки токена: {e}") if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) return None, None expiration_time = data.get("expiration_time") # ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего if token and (expiration_time == 0 or expiration_time > time.time()): return token, expiration_time else: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) return None, None except Exception as e: print(f"Ошибка загрузки: {e}") return None, None class MultiLinkDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Ввод нескольких ссылок") self.setMinimumSize(400, 300) layout = QVBoxLayout(self) label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:") layout.addWidget(label) self.links_text_edit = QTextEdit() layout.addWidget(self.links_text_edit) button_box = QDialogButtonBox() button_box.addButton("ОК", QDialogButtonBox.AcceptRole) button_box.addButton("Отмена", QDialogButtonBox.RejectRole) button_box.accepted.connect(self.accept) button_box.rejected.connect(self.reject) layout.addWidget(button_box) def get_links(self): return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()] class VkChatManager(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Anabasis Chat Manager") self.setGeometry(300, 300, 600, 800) self.token = None self.token_expiration_time = None self.chats = [] self.office_chat_checkboxes = [] self.retail_chat_checkboxes = [] self.warehouse_chat_checkboxes = [] self.coffee_chat_checkboxes = [] self.other_chat_checkboxes = [] self.vk_session = None self.vk = None self.user_ids_to_process = [] self._busy = False self.suppress_resolve = False self.resolve_timer = QTimer(self) self.resolve_timer.setSingleShot(True) self.resolve_timer.setInterval(750) self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input) self._cleanup_cache_if_needed() self._ensure_log_dir() self.init_ui() self.load_saved_token_on_startup() self.setup_token_timer() def init_ui(self): central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(5) self.instructions = QTextBrowser() self.instructions.setPlainText( "Инструкция:\n" "1. Авторизуйтесь через VK.\n" "2. Выберите чаты.\n" "3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n" "4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n" "5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'." ) self.instructions.setFixedHeight(120) layout.addWidget(self.instructions) layout.addWidget(QLabel("Access Token VK:")) self.token_input = QLineEdit() self.token_input.setPlaceholderText("Токен появится здесь после авторизации...") self.token_input.setReadOnly(True) layout.addWidget(self.token_input) self.token_timer_label = QLabel("Срок действия токена: Н/Д") self.token_timer_label.setAlignment(Qt.AlignRight) layout.addWidget(self.token_timer_label) self.auth_btn = QPushButton("Авторизоваться через VK") self.auth_btn.clicked.connect(self.start_auth) layout.addWidget(self.auth_btn) self.chat_tabs = QTabWidget() self.chat_tabs.hide() self.office_tab = self.create_chat_tab() self.chat_tabs.addTab(self.office_tab, "AG Офис") self.retail_tab = self.create_chat_tab() self.chat_tabs.addTab(self.retail_tab, "AG Розница") self.warehouse_tab = self.create_chat_tab() self.chat_tabs.addTab(self.warehouse_tab, "AG Склад") self.coffee_tab = self.create_chat_tab() self.chat_tabs.addTab(self.coffee_tab, "AG Кофейни") self.other_tab = self.create_chat_tab() self.chat_tabs.addTab(self.other_tab, "Прочие") layout.addWidget(QLabel("Выберите чаты:")) select_buttons_layout = QHBoxLayout() self.select_all_btn = QPushButton("Выбрать все на вкладке") self.select_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(True)) self.deselect_all_btn = QPushButton("Снять выбор на вкладке") self.deselect_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(False)) self.refresh_chats_btn = QPushButton("Обновить чаты") self.refresh_chats_btn.clicked.connect(self.load_chats) select_buttons_layout.addWidget(self.select_all_btn) select_buttons_layout.addWidget(self.deselect_all_btn) select_buttons_layout.addWidget(self.refresh_chats_btn) layout.addLayout(select_buttons_layout) layout.addWidget(self.chat_tabs) layout.addWidget(QLabel("Ссылка на страницу VK (ID определится автоматически):")) link_input_layout = QHBoxLayout() self.vk_url_input = QLineEdit() self.vk_url_input.setPlaceholderText("https://vk.com/id1") self.vk_url_input.textChanged.connect(self.on_vk_url_input_changed) link_input_layout.addWidget(self.vk_url_input) self.multi_link_btn = QPushButton("Список") self.multi_link_btn.setToolTip("Ввести несколько ссылок списком") self.multi_link_btn.clicked.connect(self.open_multi_link_dialog) link_input_layout.addWidget(self.multi_link_btn) layout.addLayout(link_input_layout) self.remove_user_btn = QPushButton("ИСКЛЮЧИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.remove_user_btn.setMinimumHeight(50) self.remove_user_btn.clicked.connect(self.remove_user) layout.addWidget(self.remove_user_btn) self.visible_messages_checkbox = QCheckBox("Показать 250 последних сообщений при добавлении") layout.addWidget(self.visible_messages_checkbox) self.add_user_btn = QPushButton("ПРИГЛАСИТЬ ПОЛЬЗОВАТЕЛЕЙ") self.add_user_btn.setMinimumHeight(50) self.add_user_btn.clicked.connect(self.add_user_to_chat) layout.addWidget(self.add_user_btn) self.status_label = QLabel("Статус: не авторизован") self.status_label.setAlignment(Qt.AlignCenter) layout.addWidget(self.status_label) layout.addStretch(1) self.create_menu() self.set_ui_state(False) def on_vk_url_input_changed(self, text): if self.suppress_resolve: return if self.vk_url_input.hasFocus(): self.resolve_timer.start() def open_multi_link_dialog(self): dialog = MultiLinkDialog(self) if dialog.exec(): links = dialog.get_links() if links: self._set_vk_url_input_text("") self._process_links_list(links) else: QMessageBox.information(self, "Информация", "Список ссылок пуст.") def resolve_single_user_id_from_input(self): url = self.vk_url_input.text().strip() if not url: self.user_ids_to_process.clear() self.status_label.setText("Статус: Введите ссылку или откройте список.") self.set_ui_state(self.token is not None) return self._process_links_list([url]) def _process_links_list(self, links_list): if not self.vk: QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") return self.user_ids_to_process.clear() resolved_ids = [] failed_links = [] self._set_busy(True, "Статус: Определяю ID...") try: for link in links_list: try: path = urlparse(link).path screen_name = path.split('/')[-1] if path else '' if not screen_name and len(path.split('/')) > 1: screen_name = path.split('/')[-2] if not screen_name: failed_links.append(link) continue resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name) if resolved_object and resolved_object.get('type') == 'user': resolved_ids.append(resolved_object['object_id']) else: failed_links.append(link) except VkApiError as e: self._log_error("resolveScreenName", e) failed_links.append(f"{link} ({self._format_vk_error(e)})") except Exception: failed_links.append(link) finally: self._set_busy(False) self.user_ids_to_process = resolved_ids status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)." if len(links_list) > 1: self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") if failed_links: QMessageBox.warning(self, "Ошибка получения ID", f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links)) self.status_label.setText(status_message) self.set_ui_state(self.token is not None) def create_menu(self): """Создает верхнее меню.""" menu_bar = self.menuBar() # Меню "Инструменты" tools_menu = menu_bar.addMenu("Инструменты") # Действие "Назначить администратором" make_admin_action = QAction("Назначить администратором", self) make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах") make_admin_action.triggered.connect(self.set_user_admin) tools_menu.addAction(make_admin_action) logout_action = QAction("Выйти и очистить", self) logout_action.setStatusTip("Выйти, удалить токен и кэш") logout_action.triggered.connect(self.logout_and_clear) tools_menu.addAction(logout_action) def create_chat_tab(self): # This implementation correctly creates a scrollable area for chat lists. tab_content_widget = QWidget() tab_layout = QVBoxLayout(tab_content_widget) tab_layout.setContentsMargins(0, 0, 0, 0) tab_layout.setSpacing(0) scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) tab_layout.addWidget(scroll_area) checkbox_container_widget = QWidget() checkbox_layout = QVBoxLayout(checkbox_container_widget) checkbox_layout.setContentsMargins(5, 5, 5, 5) checkbox_layout.setSpacing(2) checkbox_layout.addStretch() scroll_area.setWidget(checkbox_container_widget) return tab_content_widget def setup_token_timer(self): self.token_countdown_timer = QTimer(self) self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.start(1000) def update_token_timer_display(self): if self.token_expiration_time is None: self.token_timer_label.setText("Срок действия токена: Н/Д") return # ИСПРАВЛЕНИЕ: обрабатываем бессрочный токен if self.token_expiration_time == 0: self.token_timer_label.setText("Срок действия: Бессрочно") return remaining_seconds = int(self.token_expiration_time - time.time()) if remaining_seconds <= 0: self.token_timer_label.setText("Срок действия истек!") if self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self.set_ui_state(False) self.status_label.setText("Статус: Срок действия истек, авторизуйтесь заново.") self.token, self.token_expiration_time = None, None self.token_input.clear() return minutes, seconds = divmod(remaining_seconds, 60) hours, minutes = divmod(minutes, 60) self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с") def set_ui_state(self, authorized): self.auth_btn.setEnabled(not authorized) for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox]: btn.setEnabled(authorized) has_ids = authorized and bool(self.user_ids_to_process) self.remove_user_btn.setEnabled(has_ids) self.add_user_btn.setEnabled(has_ids) self.chat_tabs.setVisible(authorized) if authorized: # Когда авторизованы, задаем минимальную высоту, достаточную для ~10-12 чатов self.chat_tabs.setMinimumHeight(300) else: # Когда не авторизованы, сбрасываем минимальную высоту self.chat_tabs.setMinimumHeight(0) if not authorized: self.user_ids_to_process.clear() self._set_vk_url_input_text("") self._clear_chat_tabs() def _set_busy(self, busy, status_text=None): if status_text: self.status_label.setText(status_text) if busy: self._busy = True QApplication.setOverrideCursor(Qt.WaitCursor) for widget in [ self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn, self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox, self.remove_user_btn, self.add_user_btn ]: widget.setEnabled(False) else: self._busy = False QApplication.restoreOverrideCursor() if self.token is None: self.set_ui_state(False) else: self.set_ui_state(True) def _ensure_log_dir(self): os.makedirs(APP_DATA_DIR, exist_ok=True) def _log_error(self, context, exc): try: os.makedirs(APP_DATA_DIR, exist_ok=True) self._rotate_log_if_needed() timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss") message = self._format_vk_error(exc) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {context}: {message}\n") except Exception: pass def _rotate_log_if_needed(self): try: if not os.path.exists(LOG_FILE): return if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES: return if os.path.exists(LOG_BACKUP_FILE): os.remove(LOG_BACKUP_FILE) os.replace(LOG_FILE, LOG_BACKUP_FILE) except Exception: pass def _format_vk_error(self, exc): error = getattr(exc, "error", None) code = None message = str(exc) if isinstance(error, dict): code = error.get("error_code") message = error.get("error_msg") or message hints = { 5: "Ошибка авторизации. Проверьте токен.", 6: "Слишком много запросов. Подождите и повторите.", 7: "Недостаточно прав.", 9: "Слишком много однотипных действий.", 10: "Внутренняя ошибка VK. Повторите позже.", 15: "Доступ запрещен.", 100: "Некорректный параметр запроса.", 113: "Неверный идентификатор пользователя.", 200: "Доступ к чату запрещен.", } if code in hints: message = f"{message} ({hints[code]})" if code is not None: return f"[{code}] {message}" return message def _set_vk_url_input_text(self, text): self.suppress_resolve = True try: self.vk_url_input.blockSignals(True) self.vk_url_input.setText(text) finally: self.vk_url_input.blockSignals(False) self.suppress_resolve = False def logout_and_clear(self): confirm = QMessageBox.question( self, "Подтверждение выхода", "Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?", QMessageBox.Yes | QMessageBox.No ) if confirm != QMessageBox.Yes: return self.token = None self.token_expiration_time = None self.vk_session = None self.vk = None self.user_ids_to_process.clear() self._set_vk_url_input_text("") self.token_input.clear() self.token_timer_label.setText("Срок действия токена: Н/Д") self.status_label.setText("Статус: не авторизован") if self.token_countdown_timer.isActive(): self.token_countdown_timer.stop() self._clear_chat_tabs() self.set_ui_state(False) try: if os.path.exists(TOKEN_FILE): os.remove(TOKEN_FILE) except Exception as e: print(f"Ошибка удаления токена: {e}") try: self._try_remove_web_cache() except Exception as e: print(f"Ошибка удаления кэша: {e}") def _cleanup_cache_if_needed(self): if os.path.exists(CACHE_CLEANUP_MARKER): try: self._try_remove_web_cache() if os.path.exists(CACHE_CLEANUP_MARKER): os.remove(CACHE_CLEANUP_MARKER) except Exception as e: print(f"Ошибка отложенной очистки кэша: {e}") def _try_remove_web_cache(self): if not os.path.exists(WEB_ENGINE_CACHE_DIR): return attempts = 5 last_error = None for _ in range(attempts): try: shutil.rmtree(WEB_ENGINE_CACHE_DIR) last_error = None break except Exception as e: last_error = e time.sleep(0.2) if last_error: os.makedirs(APP_DATA_DIR, exist_ok=True) with open(CACHE_CLEANUP_MARKER, "w") as f: f.write("pending") raise last_error def load_saved_token_on_startup(self): loaded_token, expiration_time = load_token() if loaded_token: self.handle_auth_token_on_load(loaded_token, expiration_time) else: self.set_ui_state(False) def set_all_checkboxes_on_current_tab(self, checked): current_index = self.chat_tabs.currentIndex() checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.retail_warehouse_checkboxes, self.retail_coffee_checkboxes, self.other_chat_checkboxes] if 0 <= current_index < len(checkbox_lists): for checkbox in checkbox_lists[current_index]: checkbox.setChecked(checked) def start_auth(self): self.status_label.setText("Статус: ожидание авторизации...") auth_url = ( "https://oauth.vk.com/authorize?" "client_id=2685278&" "display=page&" "redirect_uri=https://oauth.vk.com/blank.html&" "scope=1073737727&" "response_type=token&" "v=5.131" ) output_path = os.path.join(APP_DATA_DIR, "auth_result.json") try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass cmd = [sys.executable, "--auth", auth_url, output_path] try: subprocess.check_call(cmd) except Exception as e: self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}") return if not os.path.exists(output_path): self.status_label.setText("Статус: авторизация не удалась") return try: with open(output_path, "r", encoding="utf-8") as f: data = json.load(f) token = data.get("token") expires_in = data.get("expires_in", 0) except Exception: token = None expires_in = 0 try: if os.path.exists(output_path): os.remove(output_path) except Exception: pass self.handle_new_auth_token(token, expires_in) def handle_new_auth_token(self, token, expires_in): if not token: self.status_label.setText("Статус: Авторизация не удалась") self.set_ui_state(False) return self.token = token # Сохраняем и получаем корректный expiration_time (0 или будущее время) self.token_expiration_time = save_token(self.token, expires_in) self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован") self.vk_session = VkApi(token=self.token) self.vk = self.vk_session.get_api() self.set_ui_state(True) self.load_chats() def handle_auth_token_on_load(self, token, expiration_time): self.token = token self.token_expiration_time = expiration_time self.token_input.setText(self.token[:50] + "...") self.status_label.setText("Статус: авторизован (токен загружен)") self.vk_session = VkApi(token=self.token) self.vk = self.vk_session.get_api() self.set_ui_state(True) self.load_chats() def _clear_chat_tabs(self): self.chats.clear() for chk_list in [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]: for checkbox in chk_list: checkbox.setParent(None) checkbox.deleteLater() chk_list.clear() def _vk_error_code(self, exc): error = getattr(exc, "error", None) if isinstance(error, dict): return error.get("error_code") return getattr(exc, "code", None) def _vk_call_with_retry(self, func, *args, **kwargs): max_attempts = 5 for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except VkApiError as e: code = self._vk_error_code(e) if code not in (6, 9, 10) or attempt == max_attempts: raise delay = min(2.0, 0.35 * (2 ** (attempt - 1))) if code == 9: delay = max(delay, 1.0) time.sleep(delay) def load_chats(self): self._clear_chat_tabs() # Get the checkbox layouts from each tab layouts = [ self.office_tab.findChild(QWidget).findChild(QVBoxLayout), self.retail_tab.findChild(QWidget).findChild(QVBoxLayout), self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout), self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout), self.other_tab.findChild(QWidget).findChild(QVBoxLayout) ] try: self._set_busy(True, "Статус: загрузка чатов...") conversations = [] start_from = None seen_start_tokens = set() while True: params = {"count": 200, "filter": "all"} if start_from: if start_from in seen_start_tokens: break params["start_from"] = start_from seen_start_tokens.add(start_from) response = self._vk_call_with_retry(self.vk.messages.getConversations, **params) page_items = response.get("items", []) if not page_items: break conversations.extend(page_items) start_from = response.get("next_from") if not start_from: break for conv in conversations: if conv['conversation']['peer']['type'] == 'chat': chat_id = conv['conversation']['peer']['local_id'] title = conv['conversation']['chat_settings']['title'] self.chats.append({'id': chat_id, 'title': title}) checkbox = QCheckBox(f"{title} (id: {chat_id})") checkbox.setProperty("chat_id", chat_id) # Insert checkbox at the top of the layout (before the stretch) if "AG офис" in title: layouts[0].insertWidget(layouts[0].count() - 1, checkbox) self.office_chat_checkboxes.append(checkbox) elif "AG розница" in title: layouts[1].insertWidget(layouts[1].count() - 1, checkbox) self.retail_chat_checkboxes.append(checkbox) elif "AG склад" in title: layouts[2].insertWidget(layouts[2].count() - 1, checkbox) self.warehouse_chat_checkboxes.append(checkbox) elif "AG кофейни" in title: layouts[3].insertWidget(layouts[3].count() - 1, checkbox) self.coffee_chat_checkboxes.append(checkbox) else: layouts[4].insertWidget(layouts[4].count() - 1, checkbox) self.other_chat_checkboxes.append(checkbox) self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})") self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})") self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") except VkApiError as e: self._log_error("load_chats", e) QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}") self.set_ui_state(False) finally: self._set_busy(False) def get_user_info_by_id(self, user_id): try: user = self.vk.users.get(user_ids=user_id)[0] return f"{user.get('first_name', '')} {user.get('last_name', '')}" except Exception: return f"Пользователь {user_id}" def _get_selected_chats(self): selected = [] for chk in self.office_chat_checkboxes + self.retail_chat_checkboxes + self.warehouse_chat_checkboxes + self.coffee_chat_checkboxes + self.other_chat_checkboxes: if chk.isChecked(): chat_id = chk.property("chat_id") title = next((c['title'] for c in self.chats if c['id'] == chat_id), "") selected.append({'id': chat_id, 'title': title}) return selected def _execute_user_action(self, action_type): if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} action_verb = "исключить" if action_type == "remove" else "пригласить" preposition = "из" if action_type == "remove" else "в" user_names_list = list(user_infos.values()) user_names_str = "\n".join([f"• {name}" for name in user_names_list]) chat_count = len(selected_chats) chat_str = "" # Финальная логика склонения с учетом падежа (для "из" и "в") if chat_count % 10 == 1 and chat_count % 100 != 11: if action_type == 'remove': # из 1 выбранного чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранного чата" else: # в 1 выбранный чат (Винительный падеж, ед.ч.) chat_str = f"{chat_count} выбранный чат" elif 2 <= chat_count % 10 <= 4 and (chat_count % 100 < 10 or chat_count % 100 >= 20): if action_type == 'remove': # из 3 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" else: # в 3 выбранных чата (Родительный падеж, ед.ч.) chat_str = f"{chat_count} выбранных чата" else: # из 5 выбранных чатов / в 5 выбранных чатов (Родительный падеж, мн.ч.) chat_str = f"{chat_count} выбранных чатов" msg = ( f"Вы уверены, что хотите {action_verb} следующих пользователей:\n\n" f"{user_names_str}\n\n" f"{preposition} {chat_str}?" ) confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение действия") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return results = [] total = len(selected_chats) * len(user_infos) processed = 0 try: action_label = "исключение" if action_type == "remove" else "приглашение" self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") for chat in selected_chats: for user_id, user_info in user_infos.items(): try: if action_type == "remove": self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id) results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.") else: params = {'chat_id': chat['id'], 'user_id': user_id} if self.visible_messages_checkbox.isChecked(): params['visible_messages_count'] = 250 self._vk_call_with_retry(self.vk.messages.addChatUser, **params) results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.") except VkApiError as e: self._log_error("execute_user_action", e) results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}") finally: processed += 1 self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...") finally: self._set_busy(False) QMessageBox.information(self, "Результаты", "\n".join(results)) self.vk_url_input.clear() self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) def remove_user(self): self._execute_user_action("remove") def add_user_to_chat(self): self._execute_user_action("add") def set_user_admin(self): """Назначает пользователя администратором чата.""" # 1. Проверки на наличие выбранных пользователей и чатов if not self.user_ids_to_process: QMessageBox.warning(self, "Ошибка", "Нет ID пользователей для операции.") return selected_chats = self._get_selected_chats() if not selected_chats: QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.") return # 2. Подготовка данных для подтверждения user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process} user_names_str = "\n".join([f"• {name}" for name in user_infos.values()]) msg = ( f"Вы уверены, что хотите назначить АДМИНИСТРАТОРАМИ следующих пользователей:\n\n" f"{user_names_str}\n\n" f"в {len(selected_chats)} выбранных чатах?" ) # 3. Диалог подтверждения confirm_dialog = QMessageBox(self) confirm_dialog.setWindowTitle("Подтверждение прав") confirm_dialog.setText(msg) confirm_dialog.setIcon(QMessageBox.Question) yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole) no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole) confirm_dialog.setDefaultButton(no_button) confirm_dialog.exec() if confirm_dialog.clickedButton() != yes_button: return # 4. Выполнение API запросов results = [] total = len(selected_chats) * len(user_infos) processed = 0 try: self._set_busy(True, f"Статус: назначение админов (0/{total})...") for chat in selected_chats: # VK API требует peer_id. Для чатов это 2000000000 + local_id try: peer_id = 2000000000 + int(chat['id']) except ValueError: results.append(f"✗ Ошибка ID чата: {chat['id']}") continue for user_id, user_info in user_infos.items(): try: self._vk_call_with_retry( self.vk.messages.setMemberRole, peer_id=peer_id, member_id=user_id, role="admin" ) results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.") except VkApiError as e: self._log_error("set_user_admin", e) results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}") finally: processed += 1 self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...") finally: self._set_busy(False) # 5. Вывод результата QMessageBox.information(self, "Результаты назначения", "\n".join(results)) # Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод) self.vk_url_input.clear() self.user_ids_to_process.clear() self.set_ui_state(self.token is not None) if __name__ == "__main__": if "--auth" in sys.argv: try: idx = sys.argv.index("--auth") auth_url = sys.argv[idx + 1] output_path = sys.argv[idx + 2] except Exception: sys.exit(1) auth_webview.main_auth(auth_url, output_path) sys.exit(0) app = QApplication(sys.argv) app.setStyle("Fusion") app.setPalette(app.style().standardPalette()) # Установка иконки для ВСЕХ окон приложения icon_path = get_resource_path("icon.ico") if os.path.exists(icon_path): app.setWindowIcon(QIcon(icon_path)) window = VkChatManager() window.show() sys.exit(app.exec())