Files
AnabasisChatRemove/main.py

1122 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
import base64
import ctypes
import shutil
from vk_api import VkApi
import json
import time
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
from PySide6.QtGui import QIcon, QAction
from PySide6.QtWebEngineWidgets import QWebEngineView
from PySide6.QtWebEngineCore import QWebEnginePage, QWebEngineProfile
from urllib.parse import urlparse, parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from ctypes import wintypes
# --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def get_resource_path(relative_path):
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
return os.path.join(sys._MEIPASS, relative_path)
# Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path)
def save_token(token, expires_in=0):
"""Сохраняет токен. Если expires_in=0, токен считается бессрочным."""
try:
expires_in = int(expires_in)
except (ValueError, TypeError):
expires_in = 0
os.makedirs(APP_DATA_DIR, exist_ok=True)
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception as e:
print(f"Ошибка шифрования токена: {e}")
data = {
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted
}
try:
with open(TOKEN_FILE, "w") as f:
json.dump(data, f)
status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString()
print(f"Токен сохранен. Срок действия: {status}")
return expiration_time
except IOError as e:
print(f"Ошибка сохранения токена: {e}")
return None
def load_token():
"""Загружает токен и проверяет его валидность."""
try:
if not os.path.exists(TOKEN_FILE):
return None, None
with open(TOKEN_FILE, "r") as f:
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception as e:
print(f"Ошибка расшифровки токена: {e}")
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
expiration_time = data.get("expiration_time")
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
if token and (expiration_time == 0 or expiration_time > time.time()):
return token, expiration_time
else:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
except Exception as e:
print(f"Ошибка загрузки: {e}")
return None, None
class WebEnginePage(QWebEnginePage):
"""
Класс для обработки навигационных запросов в QWebEngineView,
специально для извлечения токена авторизации VK.
"""
# Добавлен параметр profile в конструктор, чтобы использовать пользовательский профиль
def __init__(self, profile=None, parent=None, browser_window_instance=None):
super().__init__(profile, parent) # Передаем profile в базовый класс
self.parent_browser_window = browser_window_instance
self.token_extracted = False
def acceptNavigationRequest(self, url, _type, isMainFrame):
"""
Переопределенный метод для перехвата URL-адреса, содержащего токен доступа.
"""
url_string = url.toString()
if "access_token" in url_string and not self.token_extracted:
self.token_extracted = True
if self.parent_browser_window:
self.parent_browser_window.process_auth_url(url_string)
return False
return super().acceptNavigationRequest(url, _type, isMainFrame)
class AuthBrowserWindow(QDialog):
"""
Отдельное окно-диалог для проведения OAuth авторизации VK.
"""
token_extracted_signal = Signal(str, int)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Авторизация VK")
self.setGeometry(350, 350, 800, 600)
layout = QVBoxLayout(self)
self.browser = QWebEngineView()
# Настройка QWebEngineProfile для сохранения куки и других данных
os.makedirs(WEB_ENGINE_CACHE_DIR, exist_ok=True)
# ИСПРАВЛЕНО: Сделать profile атрибутом экземпляра и дать ему self как родителя
self.profile = QWebEngineProfile("AnabasisVKWebProfile", self)
self.profile.setPersistentCookiesPolicy(QWebEngineProfile.AllowPersistentCookies)
self.profile.setPersistentStoragePath(WEB_ENGINE_CACHE_DIR)
# Создаем страницу с настроенным профилем
self.browser.page = WebEnginePage(profile=self.profile, parent=self.browser, browser_window_instance=self)
self.browser.setPage(self.browser.page)
layout.addWidget(self.browser)
self.status_label = QLabel("Ожидание авторизации...")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
def start_auth_flow(self):
"""Запускает OAuth авторизацию VK в этом окне."""
auth_url = (
"https://oauth.vk.com/authorize?"
"client_id=2685278&"
"display=page&"
"redirect_uri=https://oauth.vk.com/blank.html&"
"scope=1073737727&"
"response_type=token&"
"v=5.131"
)
self.browser.setUrl(QUrl(auth_url))
self.status_label.setText("Пожалуйста, войдите в VK и разрешите доступ...")
def process_auth_url(self, url_string):
"""
Извлекает токен доступа из URL перенаправления и испускает сигнал.
"""
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
if token:
self.token_extracted_signal.emit(token, expires_in)
self.accept()
else:
QMessageBox.warning(self, "Ошибка Авторизации",
"Не удалось получить токен. Проверьте URL или попробуйте еще раз.")
self.reject()
def closeEvent(self, event):
"""
Переопределенный метод для обработки закрытия окна.
Выполняет очистку ресурсов QWebEngine, чтобы избежать предупреждения
"Release of profile requested but WebEnginePage still not deleted. Expect troubles!".
"""
current_page = self.browser.page()
if current_page:
# Снимаем страницу с QWebEngineView, чтобы View перестала ей владеть
self.browser.setPage(None)
# Планируем удаление объекта страницы
current_page.deleteLater()
# Вызываем метод базового класса для корректного закрытия диалога
super().closeEvent(event)
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Ввод нескольких ссылок")
self.setMinimumSize(400, 300)
layout = QVBoxLayout(self)
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
layout.addWidget(label)
self.links_text_edit = QTextEdit()
layout.addWidget(self.links_text_edit)
button_box = QDialogButtonBox()
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
button_box.accepted.connect(self.accept)
button_box.rejected.connect(self.reject)
layout.addWidget(button_box)
def get_links(self):
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()]
class VkChatManager(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Anabasis Chat Manager")
self.setGeometry(300, 300, 600, 800)
self.token = None
self.token_expiration_time = None
self.chats = []
self.office_chat_checkboxes = []
self.retail_chat_checkboxes = []
self.warehouse_chat_checkboxes = []
self.coffee_chat_checkboxes = []
self.other_chat_checkboxes = []
self.vk_session = None
self.vk = None
self.user_ids_to_process = []
self._busy = False
self.suppress_resolve = False
self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True)
self.resolve_timer.setInterval(750)
self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input)
self._cleanup_cache_if_needed()
self._ensure_log_dir()
self.init_ui()
self.load_saved_token_on_startup()
self.setup_token_timer()
def init_ui(self):
central_widget = QWidget()
self.setCentralWidget(central_widget)
layout = QVBoxLayout(central_widget)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(5)
self.instructions = QTextBrowser()
self.instructions.setPlainText(
"Инструкция:\n"
"1. Авторизуйтесь через VK.\n"
"2. Выберите чаты.\n"
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
"4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
)
self.instructions.setFixedHeight(120)
layout.addWidget(self.instructions)
layout.addWidget(QLabel("Access Token VK:"))
self.token_input = QLineEdit()
self.token_input.setPlaceholderText("Токен появится здесь после авторизации...")
self.token_input.setReadOnly(True)
layout.addWidget(self.token_input)
self.token_timer_label = QLabel("Срок действия токена: Н")
self.token_timer_label.setAlignment(Qt.AlignRight)
layout.addWidget(self.token_timer_label)
self.auth_btn = QPushButton("Авторизоваться через VK")
self.auth_btn.clicked.connect(self.start_auth)
layout.addWidget(self.auth_btn)
self.chat_tabs = QTabWidget()
self.chat_tabs.hide()
self.office_tab = self.create_chat_tab()
self.chat_tabs.addTab(self.office_tab, "AG Офис")
self.retail_tab = self.create_chat_tab()
self.chat_tabs.addTab(self.retail_tab, "AG Розница")
self.warehouse_tab = self.create_chat_tab()
self.chat_tabs.addTab(self.warehouse_tab, "AG Склад")
self.coffee_tab = self.create_chat_tab()
self.chat_tabs.addTab(self.coffee_tab, "AG Кофейни")
self.other_tab = self.create_chat_tab()
self.chat_tabs.addTab(self.other_tab, "Прочие")
layout.addWidget(QLabel("Выберите чаты:"))
select_buttons_layout = QHBoxLayout()
self.select_all_btn = QPushButton("Выбрать все на вкладке")
self.select_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(True))
self.deselect_all_btn = QPushButton("Снять выбор на вкладке")
self.deselect_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(False))
self.refresh_chats_btn = QPushButton("Обновить чаты")
self.refresh_chats_btn.clicked.connect(self.load_chats)
select_buttons_layout.addWidget(self.select_all_btn)
select_buttons_layout.addWidget(self.deselect_all_btn)
select_buttons_layout.addWidget(self.refresh_chats_btn)
layout.addLayout(select_buttons_layout)
layout.addWidget(self.chat_tabs)
layout.addWidget(QLabel("Ссылка на страницу VK (ID определится автоматически):"))
link_input_layout = QHBoxLayout()
self.vk_url_input = QLineEdit()
self.vk_url_input.setPlaceholderText("https://vk.com/id1")
self.vk_url_input.textChanged.connect(self.on_vk_url_input_changed)
link_input_layout.addWidget(self.vk_url_input)
self.multi_link_btn = QPushButton("Список")
self.multi_link_btn.setToolTip("Ввести несколько ссылок списком")
self.multi_link_btn.clicked.connect(self.open_multi_link_dialog)
link_input_layout.addWidget(self.multi_link_btn)
layout.addLayout(link_input_layout)
self.remove_user_btn = QPushButton("ИСКЛЮЧИТЬ ПОЛЬЗОВАТЕЛЕЙ")
self.remove_user_btn.setMinimumHeight(50)
self.remove_user_btn.clicked.connect(self.remove_user)
layout.addWidget(self.remove_user_btn)
self.visible_messages_checkbox = QCheckBox("Показать 250 последних сообщений при добавлении")
layout.addWidget(self.visible_messages_checkbox)
self.add_user_btn = QPushButton("ПРИГЛАСИТЬ ПОЛЬЗОВАТЕЛЕЙ")
self.add_user_btn.setMinimumHeight(50)
self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn)
self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
layout.addStretch(1)
self.create_menu()
self.set_ui_state(False)
def on_vk_url_input_changed(self, text):
if self.suppress_resolve:
return
if self.vk_url_input.hasFocus():
self.resolve_timer.start()
def open_multi_link_dialog(self):
dialog = MultiLinkDialog(self)
if dialog.exec():
links = dialog.get_links()
if links:
self._set_vk_url_input_text("")
self._process_links_list(links)
else:
QMessageBox.information(self, "Информация", "Список ссылок пуст.")
def resolve_single_user_id_from_input(self):
url = self.vk_url_input.text().strip()
if not url:
self.user_ids_to_process.clear()
self.status_label.setText("Статус: Введите ссылку или откройте список.")
self.set_ui_state(self.token is not None)
return
self._process_links_list([url])
def _process_links_list(self, links_list):
if not self.vk:
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return
self.user_ids_to_process.clear()
resolved_ids = []
failed_links = []
self._set_busy(True, "Статус: Определяю ID...")
try:
for link in links_list:
try:
path = urlparse(link).path
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
if not screen_name:
failed_links.append(link)
continue
resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except VkApiError as e:
self._log_error("resolveScreenName", e)
failed_links.append(f"{link} ({self._format_vk_error(e)})")
except Exception:
failed_links.append(link)
finally:
self._set_busy(False)
self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links:
QMessageBox.warning(self, "Ошибка получения ID",
f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links))
self.status_label.setText(status_message)
self.set_ui_state(self.token is not None)
def create_menu(self):
"""Создает верхнее меню."""
menu_bar = self.menuBar()
# Меню "Инструменты"
tools_menu = menu_bar.addMenu("Инструменты")
# Действие "Назначить администратором"
make_admin_action = QAction("Назначить администратором", self)
make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах")
make_admin_action.triggered.connect(self.set_user_admin)
tools_menu.addAction(make_admin_action)
logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear)
tools_menu.addAction(logout_action)
def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists.
tab_content_widget = QWidget()
tab_layout = QVBoxLayout(tab_content_widget)
tab_layout.setContentsMargins(0, 0, 0, 0)
tab_layout.setSpacing(0)
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded)
tab_layout.addWidget(scroll_area)
checkbox_container_widget = QWidget()
checkbox_layout = QVBoxLayout(checkbox_container_widget)
checkbox_layout.setContentsMargins(5, 5, 5, 5)
checkbox_layout.setSpacing(2)
checkbox_layout.addStretch()
scroll_area.setWidget(checkbox_container_widget)
return tab_content_widget
def setup_token_timer(self):
self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
self.token_countdown_timer.start(1000)
def update_token_timer_display(self):
if self.token_expiration_time is None:
self.token_timer_label.setText("Срок действия токена: Н")
return
# ИСПРАВЛЕНИЕ: обрабатываем бессрочный токен
if self.token_expiration_time == 0:
self.token_timer_label.setText("Срок действия: Бессрочно")
return
remaining_seconds = int(self.token_expiration_time - time.time())
if remaining_seconds <= 0:
self.token_timer_label.setText("Срок действия истек!")
if self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self.set_ui_state(False)
self.status_label.setText("Статус: Срок действия истек, авторизуйтесь заново.")
self.token, self.token_expiration_time = None, None
self.token_input.clear()
return
minutes, seconds = divmod(remaining_seconds, 60)
hours, minutes = divmod(minutes, 60)
self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с")
def set_ui_state(self, authorized):
self.auth_btn.setEnabled(not authorized)
for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn,
self.vk_url_input, self.multi_link_btn,
self.visible_messages_checkbox]:
btn.setEnabled(authorized)
has_ids = authorized and bool(self.user_ids_to_process)
self.remove_user_btn.setEnabled(has_ids)
self.add_user_btn.setEnabled(has_ids)
self.chat_tabs.setVisible(authorized)
if authorized:
# Когда авторизованы, задаем минимальную высоту, достаточную для ~10-12 чатов
self.chat_tabs.setMinimumHeight(300)
else:
# Когда не авторизованы, сбрасываем минимальную высоту
self.chat_tabs.setMinimumHeight(0)
if not authorized:
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self._clear_chat_tabs()
def _set_busy(self, busy, status_text=None):
if status_text:
self.status_label.setText(status_text)
if busy:
self._busy = True
QApplication.setOverrideCursor(Qt.WaitCursor)
for widget in [
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
self.remove_user_btn, self.add_user_btn
]:
widget.setEnabled(False)
else:
self._busy = False
QApplication.restoreOverrideCursor()
if self.token is None:
self.set_ui_state(False)
else:
self.set_ui_state(True)
def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True)
def _log_error(self, context, exc):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
self._rotate_log_if_needed()
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
message = self._format_vk_error(exc)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {context}: {message}\n")
except Exception:
pass
def _rotate_log_if_needed(self):
try:
if not os.path.exists(LOG_FILE):
return
if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES:
return
if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception:
pass
def _format_vk_error(self, exc):
error = getattr(exc, "error", None)
code = None
message = str(exc)
if isinstance(error, dict):
code = error.get("error_code")
message = error.get("error_msg") or message
hints = {
5: "Ошибка авторизации. Проверьте токен.",
6: "Слишком много запросов. Подождите и повторите.",
7: "Недостаточно прав.",
9: "Слишком много однотипных действий.",
10: "Внутренняя ошибка VK. Повторите позже.",
15: "Доступ запрещен.",
100: "Некорректный параметр запроса.",
113: "Неверный идентификатор пользователя.",
200: "Доступ к чату запрещен.",
}
if code in hints:
message = f"{message} ({hints[code]})"
if code is not None:
return f"[{code}] {message}"
return message
def _set_vk_url_input_text(self, text):
self.suppress_resolve = True
try:
self.vk_url_input.blockSignals(True)
self.vk_url_input.setText(text)
finally:
self.vk_url_input.blockSignals(False)
self.suppress_resolve = False
def logout_and_clear(self):
confirm = QMessageBox.question(
self,
"Подтверждение выхода",
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
QMessageBox.Yes | QMessageBox.No
)
if confirm != QMessageBox.Yes:
return
self.token = None
self.token_expiration_time = None
self.vk_session = None
self.vk = None
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self.token_input.clear()
self.token_timer_label.setText("Срок действия токена: Н")
self.status_label.setText("Статус: не авторизован")
if self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self._clear_chat_tabs()
self.set_ui_state(False)
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception as e:
print(f"Ошибка удаления токена: {e}")
try:
self._try_remove_web_cache()
except Exception as e:
print(f"Ошибка удаления кэша: {e}")
def _cleanup_cache_if_needed(self):
if os.path.exists(CACHE_CLEANUP_MARKER):
try:
self._try_remove_web_cache()
if os.path.exists(CACHE_CLEANUP_MARKER):
os.remove(CACHE_CLEANUP_MARKER)
except Exception as e:
print(f"Ошибка отложенной очистки кэша: {e}")
def _try_remove_web_cache(self):
if not os.path.exists(WEB_ENGINE_CACHE_DIR):
return
attempts = 5
last_error = None
for _ in range(attempts):
try:
shutil.rmtree(WEB_ENGINE_CACHE_DIR)
last_error = None
break
except Exception as e:
last_error = e
time.sleep(0.2)
if last_error:
os.makedirs(APP_DATA_DIR, exist_ok=True)
with open(CACHE_CLEANUP_MARKER, "w") as f:
f.write("pending")
raise last_error
def load_saved_token_on_startup(self):
loaded_token, expiration_time = load_token()
if loaded_token:
self.handle_auth_token_on_load(loaded_token, expiration_time)
else:
self.set_ui_state(False)
def set_all_checkboxes_on_current_tab(self, checked):
current_index = self.chat_tabs.currentIndex()
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.retail_warehouse_checkboxes, self.retail_coffee_checkboxes, self.other_chat_checkboxes]
if 0 <= current_index < len(checkbox_lists):
for checkbox in checkbox_lists[current_index]:
checkbox.setChecked(checked)
def start_auth(self):
self.status_label.setText("Статус: ожидание авторизации...")
auth_window = AuthBrowserWindow(self)
auth_window.token_extracted_signal.connect(self.handle_new_auth_token)
auth_window.start_auth_flow()
auth_window.exec()
def handle_new_auth_token(self, token, expires_in):
if not token:
self.status_label.setText("Статус: Авторизация не удалась")
self.set_ui_state(False)
return
self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = save_token(self.token, expires_in)
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован")
self.vk_session = VkApi(token=self.token)
self.vk = self.vk_session.get_api()
self.set_ui_state(True)
self.load_chats()
def handle_auth_token_on_load(self, token, expiration_time):
self.token = token
self.token_expiration_time = expiration_time
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован (токен загружен)")
self.vk_session = VkApi(token=self.token)
self.vk = self.vk_session.get_api()
self.set_ui_state(True)
self.load_chats()
def _clear_chat_tabs(self):
self.chats.clear()
for chk_list in [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]:
for checkbox in chk_list:
checkbox.setParent(None)
checkbox.deleteLater()
chk_list.clear()
def _vk_error_code(self, exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
def _vk_call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
code = self._vk_error_code(e)
if code not in (6, 9, 10) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if code == 9:
delay = max(delay, 1.0)
time.sleep(delay)
def load_chats(self):
self._clear_chat_tabs()
# Get the checkbox layouts from each tab
layouts = [
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
self.other_tab.findChild(QWidget).findChild(QVBoxLayout)
]
try:
self._set_busy(True, "Статус: загрузка чатов...")
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
for conv in conversations:
if conv['conversation']['peer']['type'] == 'chat':
chat_id = conv['conversation']['peer']['local_id']
title = conv['conversation']['chat_settings']['title']
self.chats.append({'id': chat_id, 'title': title})
checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id)
# Insert checkbox at the top of the layout (before the stretch)
if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox)
elif "AG розница" in title:
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
self.retail_chat_checkboxes.append(checkbox)
elif "AG склад" in title:
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
self.warehouse_chat_checkboxes.append(checkbox)
elif "AG кофейни" in title:
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
self.coffee_chat_checkboxes.append(checkbox)
else:
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
self.other_chat_checkboxes.append(checkbox)
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
self._log_error("load_chats", e)
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
self._set_busy(False)
def get_user_info_by_id(self, user_id):
try:
user = self.vk.users.get(user_ids=user_id)[0]
return f"{user.get('first_name', '')} {user.get('last_name', '')}"
except Exception:
return f"Пользователь {user_id}"
def _get_selected_chats(self):
selected = []
for chk in self.office_chat_checkboxes + self.retail_chat_checkboxes + self.warehouse_chat_checkboxes + self.coffee_chat_checkboxes + self.other_chat_checkboxes:
if chk.isChecked():
chat_id = chk.property("chat_id")
title = next((c['title'] for c in self.chats if c['id'] == chat_id), "")
selected.append({'id': chat_id, 'title': title})
return selected
def _execute_user_action(self, action_type):
if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
return
selected_chats = self._get_selected_chats()
if not selected_chats:
QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.")
return
user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process}
action_verb = "исключить" if action_type == "remove" else "пригласить"
preposition = "из" if action_type == "remove" else "в"
user_names_list = list(user_infos.values())
user_names_str = "\n".join([f"{name}" for name in user_names_list])
chat_count = len(selected_chats)
chat_str = ""
# Финальная логика склонения с учетом падежа (для "из" и "в")
if chat_count % 10 == 1 and chat_count % 100 != 11:
if action_type == 'remove':
# из 1 выбранного чата (Родительный падеж, ед.ч.)
chat_str = f"{chat_count} выбранного чата"
else:
# в 1 выбранный чат (Винительный падеж, ед.ч.)
chat_str = f"{chat_count} выбранный чат"
elif 2 <= chat_count % 10 <= 4 and (chat_count % 100 < 10 or chat_count % 100 >= 20):
if action_type == 'remove':
# из 3 выбранных чатов (Родительный падеж, мн.ч.)
chat_str = f"{chat_count} выбранных чатов"
else:
# в 3 выбранных чата (Родительный падеж, ед.ч.)
chat_str = f"{chat_count} выбранных чата"
else:
# из 5 выбранных чатов / в 5 выбранных чатов (Родительный падеж, мн.ч.)
chat_str = f"{chat_count} выбранных чатов"
msg = (
f"Вы уверены, что хотите {action_verb} следующих пользователей:\n\n"
f"{user_names_str}\n\n"
f"{preposition} {chat_str}?"
)
confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение действия")
confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole)
confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию
confirm_dialog.exec()
if confirm_dialog.clickedButton() != yes_button:
return
results = []
total = len(selected_chats) * len(user_infos)
processed = 0
try:
action_label = "исключение" if action_type == "remove" else "приглашение"
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
for chat in selected_chats:
for user_id, user_info in user_infos.items():
try:
if action_type == "remove":
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
self._log_error("execute_user_action", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def remove_user(self):
self._execute_user_action("remove")
def add_user_to_chat(self):
self._execute_user_action("add")
def set_user_admin(self):
"""Назначает пользователя администратором чата."""
# 1. Проверки на наличие выбранных пользователей и чатов
if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", "Нет ID пользователей для операции.")
return
selected_chats = self._get_selected_chats()
if not selected_chats:
QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.")
return
# 2. Подготовка данных для подтверждения
user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process}
user_names_str = "\n".join([f"{name}" for name in user_infos.values()])
msg = (
f"Вы уверены, что хотите назначить АДМИНИСТРАТОРАМИ следующих пользователей:\n\n"
f"{user_names_str}\n\n"
f"в {len(selected_chats)} выбранных чатах?"
)
# 3. Диалог подтверждения
confirm_dialog = QMessageBox(self)
confirm_dialog.setWindowTitle("Подтверждение прав")
confirm_dialog.setText(msg)
confirm_dialog.setIcon(QMessageBox.Question)
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole)
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole)
confirm_dialog.setDefaultButton(no_button)
confirm_dialog.exec()
if confirm_dialog.clickedButton() != yes_button:
return
# 4. Выполнение API запросов
results = []
total = len(selected_chats) * len(user_infos)
processed = 0
try:
self._set_busy(True, f"Статус: назначение админов (0/{total})...")
for chat in selected_chats:
# VK API требует peer_id. Для чатов это 2000000000 + local_id
try:
peer_id = 2000000000 + int(chat['id'])
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
self._log_error("set_user_admin", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle("Fusion")
app.setPalette(app.style().standardPalette())
# Установка иконки для ВСЕХ окон приложения
icon_path = get_resource_path("icon.ico")
if os.path.exists(icon_path):
app.setWindowIcon(QIcon(icon_path))
window = VkChatManager()
window.show()
sys.exit(app.exec())