- добавлен auth_webview.py и режим --auth - build.py обновлён, WebEngine исключён - pywebview добавлен в requirements
1034 lines
44 KiB
Python
1034 lines
44 KiB
Python
import sys
|
||
import base64
|
||
import ctypes
|
||
import shutil
|
||
import subprocess
|
||
from vk_api import VkApi
|
||
import json
|
||
import time
|
||
import auth_webview
|
||
import os
|
||
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
|
||
QPushButton, QVBoxLayout, QWidget, QMessageBox,
|
||
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
|
||
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
|
||
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
|
||
from PySide6.QtGui import QIcon, QAction
|
||
from urllib.parse import urlparse, parse_qs, unquote
|
||
from vk_api.exceptions import VkApiError
|
||
from PySide6.QtCore import QStandardPaths
|
||
from ctypes import wintypes
|
||
|
||
# --- Управление токенами и настройками ---
|
||
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
|
||
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
|
||
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
|
||
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
|
||
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
|
||
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
|
||
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
|
||
|
||
|
||
class _DataBlob(ctypes.Structure):
|
||
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
|
||
|
||
|
||
_crypt32 = None
|
||
_kernel32 = None
|
||
if os.name == "nt":
|
||
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
|
||
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
||
_crypt32.CryptProtectData.argtypes = [
|
||
ctypes.POINTER(_DataBlob),
|
||
wintypes.LPCWSTR,
|
||
ctypes.POINTER(_DataBlob),
|
||
ctypes.c_void_p,
|
||
ctypes.c_void_p,
|
||
wintypes.DWORD,
|
||
ctypes.POINTER(_DataBlob),
|
||
]
|
||
_crypt32.CryptProtectData.restype = wintypes.BOOL
|
||
_crypt32.CryptUnprotectData.argtypes = [
|
||
ctypes.POINTER(_DataBlob),
|
||
ctypes.POINTER(wintypes.LPWSTR),
|
||
ctypes.POINTER(_DataBlob),
|
||
ctypes.c_void_p,
|
||
ctypes.c_void_p,
|
||
wintypes.DWORD,
|
||
ctypes.POINTER(_DataBlob),
|
||
]
|
||
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
|
||
|
||
|
||
def _crypt_protect_data(data, description=""):
|
||
buffer = ctypes.create_string_buffer(data)
|
||
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||
data_out = _DataBlob()
|
||
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
|
||
raise ctypes.WinError(ctypes.get_last_error())
|
||
try:
|
||
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||
finally:
|
||
_kernel32.LocalFree(data_out.pbData)
|
||
|
||
|
||
def _crypt_unprotect_data(data):
|
||
buffer = ctypes.create_string_buffer(data)
|
||
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
|
||
data_out = _DataBlob()
|
||
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
|
||
raise ctypes.WinError(ctypes.get_last_error())
|
||
try:
|
||
return ctypes.string_at(data_out.pbData, data_out.cbData)
|
||
finally:
|
||
_kernel32.LocalFree(data_out.pbData)
|
||
|
||
|
||
def _encrypt_token(token):
|
||
if os.name != "nt":
|
||
raise RuntimeError("DPAPI is available only on Windows.")
|
||
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
|
||
return base64.b64encode(encrypted_bytes).decode("ascii")
|
||
|
||
|
||
def _decrypt_token(token_data):
|
||
if os.name != "nt":
|
||
raise RuntimeError("DPAPI is available only on Windows.")
|
||
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
|
||
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
|
||
return decrypted_bytes.decode("utf-8")
|
||
|
||
def get_resource_path(relative_path):
|
||
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
|
||
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
|
||
return os.path.join(sys._MEIPASS, relative_path)
|
||
# Для cx_Freeze и обычного запуска
|
||
return os.path.join(os.path.abspath("."), relative_path)
|
||
|
||
def save_token(token, expires_in=0):
|
||
"""Сохраняет токен. Если expires_in=0, токен считается бессрочным."""
|
||
try:
|
||
expires_in = int(expires_in)
|
||
except (ValueError, TypeError):
|
||
expires_in = 0
|
||
|
||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||
|
||
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
|
||
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
|
||
|
||
stored_token = token
|
||
encrypted = False
|
||
if os.name == "nt":
|
||
try:
|
||
stored_token = _encrypt_token(token)
|
||
encrypted = True
|
||
except Exception as e:
|
||
print(f"Ошибка шифрования токена: {e}")
|
||
|
||
data = {
|
||
"token": stored_token,
|
||
"expiration_time": expiration_time,
|
||
"encrypted": encrypted
|
||
}
|
||
|
||
try:
|
||
with open(TOKEN_FILE, "w") as f:
|
||
json.dump(data, f)
|
||
|
||
status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString()
|
||
print(f"Токен сохранен. Срок действия: {status}")
|
||
return expiration_time
|
||
except IOError as e:
|
||
print(f"Ошибка сохранения токена: {e}")
|
||
return None
|
||
|
||
|
||
def load_token():
|
||
"""Загружает токен и проверяет его валидность."""
|
||
try:
|
||
if not os.path.exists(TOKEN_FILE):
|
||
return None, None
|
||
|
||
with open(TOKEN_FILE, "r") as f:
|
||
data = json.load(f)
|
||
|
||
token = data.get("token")
|
||
encrypted = data.get("encrypted", False)
|
||
if token and encrypted:
|
||
try:
|
||
token = _decrypt_token(token)
|
||
except Exception as e:
|
||
print(f"Ошибка расшифровки токена: {e}")
|
||
if os.path.exists(TOKEN_FILE):
|
||
os.remove(TOKEN_FILE)
|
||
return None, None
|
||
expiration_time = data.get("expiration_time")
|
||
|
||
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
|
||
if token and (expiration_time == 0 or expiration_time > time.time()):
|
||
return token, expiration_time
|
||
else:
|
||
if os.path.exists(TOKEN_FILE):
|
||
os.remove(TOKEN_FILE)
|
||
return None, None
|
||
except Exception as e:
|
||
print(f"Ошибка загрузки: {e}")
|
||
return None, None
|
||
|
||
class MultiLinkDialog(QDialog):
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.setWindowTitle("Ввод нескольких ссылок")
|
||
self.setMinimumSize(400, 300)
|
||
layout = QVBoxLayout(self)
|
||
label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
|
||
layout.addWidget(label)
|
||
self.links_text_edit = QTextEdit()
|
||
layout.addWidget(self.links_text_edit)
|
||
button_box = QDialogButtonBox()
|
||
button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
|
||
button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
|
||
button_box.accepted.connect(self.accept)
|
||
button_box.rejected.connect(self.reject)
|
||
layout.addWidget(button_box)
|
||
|
||
def get_links(self):
|
||
return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()]
|
||
|
||
|
||
class VkChatManager(QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("Anabasis Chat Manager")
|
||
self.setGeometry(300, 300, 600, 800)
|
||
|
||
self.token = None
|
||
self.token_expiration_time = None
|
||
self.chats = []
|
||
self.office_chat_checkboxes = []
|
||
self.retail_chat_checkboxes = []
|
||
self.warehouse_chat_checkboxes = []
|
||
self.coffee_chat_checkboxes = []
|
||
self.other_chat_checkboxes = []
|
||
self.vk_session = None
|
||
self.vk = None
|
||
self.user_ids_to_process = []
|
||
self._busy = False
|
||
self.suppress_resolve = False
|
||
|
||
self.resolve_timer = QTimer(self)
|
||
self.resolve_timer.setSingleShot(True)
|
||
self.resolve_timer.setInterval(750)
|
||
self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input)
|
||
|
||
self._cleanup_cache_if_needed()
|
||
self._ensure_log_dir()
|
||
self.init_ui()
|
||
self.load_saved_token_on_startup()
|
||
self.setup_token_timer()
|
||
|
||
def init_ui(self):
|
||
central_widget = QWidget()
|
||
self.setCentralWidget(central_widget)
|
||
layout = QVBoxLayout(central_widget)
|
||
layout.setContentsMargins(10, 10, 10, 10)
|
||
layout.setSpacing(5)
|
||
|
||
self.instructions = QTextBrowser()
|
||
self.instructions.setPlainText(
|
||
"Инструкция:\n"
|
||
"1. Авторизуйтесь через VK.\n"
|
||
"2. Выберите чаты.\n"
|
||
"3. Вставьте ссылку на пользователя в поле ниже. ID определится автоматически.\n"
|
||
"4. Для массовых операций, нажмите кнопку 'Список' и вставьте ссылки в окне.\n"
|
||
"5. Нажмите 'ИСКЛЮЧИТЬ' или 'ПРИГЛАСИТЬ'."
|
||
)
|
||
self.instructions.setFixedHeight(120)
|
||
layout.addWidget(self.instructions)
|
||
|
||
layout.addWidget(QLabel("Access Token VK:"))
|
||
self.token_input = QLineEdit()
|
||
self.token_input.setPlaceholderText("Токен появится здесь после авторизации...")
|
||
self.token_input.setReadOnly(True)
|
||
layout.addWidget(self.token_input)
|
||
|
||
self.token_timer_label = QLabel("Срок действия токена: Н/Д")
|
||
self.token_timer_label.setAlignment(Qt.AlignRight)
|
||
layout.addWidget(self.token_timer_label)
|
||
|
||
self.auth_btn = QPushButton("Авторизоваться через VK")
|
||
self.auth_btn.clicked.connect(self.start_auth)
|
||
layout.addWidget(self.auth_btn)
|
||
|
||
self.chat_tabs = QTabWidget()
|
||
self.chat_tabs.hide()
|
||
self.office_tab = self.create_chat_tab()
|
||
self.chat_tabs.addTab(self.office_tab, "AG Офис")
|
||
self.retail_tab = self.create_chat_tab()
|
||
self.chat_tabs.addTab(self.retail_tab, "AG Розница")
|
||
self.warehouse_tab = self.create_chat_tab()
|
||
self.chat_tabs.addTab(self.warehouse_tab, "AG Склад")
|
||
self.coffee_tab = self.create_chat_tab()
|
||
self.chat_tabs.addTab(self.coffee_tab, "AG Кофейни")
|
||
self.other_tab = self.create_chat_tab()
|
||
self.chat_tabs.addTab(self.other_tab, "Прочие")
|
||
layout.addWidget(QLabel("Выберите чаты:"))
|
||
select_buttons_layout = QHBoxLayout()
|
||
self.select_all_btn = QPushButton("Выбрать все на вкладке")
|
||
self.select_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(True))
|
||
self.deselect_all_btn = QPushButton("Снять выбор на вкладке")
|
||
self.deselect_all_btn.clicked.connect(lambda: self.set_all_checkboxes_on_current_tab(False))
|
||
self.refresh_chats_btn = QPushButton("Обновить чаты")
|
||
self.refresh_chats_btn.clicked.connect(self.load_chats)
|
||
select_buttons_layout.addWidget(self.select_all_btn)
|
||
select_buttons_layout.addWidget(self.deselect_all_btn)
|
||
select_buttons_layout.addWidget(self.refresh_chats_btn)
|
||
layout.addLayout(select_buttons_layout)
|
||
layout.addWidget(self.chat_tabs)
|
||
|
||
layout.addWidget(QLabel("Ссылка на страницу VK (ID определится автоматически):"))
|
||
link_input_layout = QHBoxLayout()
|
||
self.vk_url_input = QLineEdit()
|
||
self.vk_url_input.setPlaceholderText("https://vk.com/id1")
|
||
self.vk_url_input.textChanged.connect(self.on_vk_url_input_changed)
|
||
link_input_layout.addWidget(self.vk_url_input)
|
||
|
||
self.multi_link_btn = QPushButton("Список")
|
||
self.multi_link_btn.setToolTip("Ввести несколько ссылок списком")
|
||
self.multi_link_btn.clicked.connect(self.open_multi_link_dialog)
|
||
link_input_layout.addWidget(self.multi_link_btn)
|
||
layout.addLayout(link_input_layout)
|
||
|
||
self.remove_user_btn = QPushButton("ИСКЛЮЧИТЬ ПОЛЬЗОВАТЕЛЕЙ")
|
||
self.remove_user_btn.setMinimumHeight(50)
|
||
self.remove_user_btn.clicked.connect(self.remove_user)
|
||
layout.addWidget(self.remove_user_btn)
|
||
self.visible_messages_checkbox = QCheckBox("Показать 250 последних сообщений при добавлении")
|
||
layout.addWidget(self.visible_messages_checkbox)
|
||
self.add_user_btn = QPushButton("ПРИГЛАСИТЬ ПОЛЬЗОВАТЕЛЕЙ")
|
||
self.add_user_btn.setMinimumHeight(50)
|
||
self.add_user_btn.clicked.connect(self.add_user_to_chat)
|
||
layout.addWidget(self.add_user_btn)
|
||
self.status_label = QLabel("Статус: не авторизован")
|
||
self.status_label.setAlignment(Qt.AlignCenter)
|
||
layout.addWidget(self.status_label)
|
||
|
||
layout.addStretch(1)
|
||
|
||
self.create_menu()
|
||
|
||
self.set_ui_state(False)
|
||
|
||
def on_vk_url_input_changed(self, text):
|
||
if self.suppress_resolve:
|
||
return
|
||
if self.vk_url_input.hasFocus():
|
||
self.resolve_timer.start()
|
||
|
||
def open_multi_link_dialog(self):
|
||
dialog = MultiLinkDialog(self)
|
||
if dialog.exec():
|
||
links = dialog.get_links()
|
||
if links:
|
||
self._set_vk_url_input_text("")
|
||
self._process_links_list(links)
|
||
else:
|
||
QMessageBox.information(self, "Информация", "Список ссылок пуст.")
|
||
|
||
def resolve_single_user_id_from_input(self):
|
||
url = self.vk_url_input.text().strip()
|
||
if not url:
|
||
self.user_ids_to_process.clear()
|
||
self.status_label.setText("Статус: Введите ссылку или откройте список.")
|
||
self.set_ui_state(self.token is not None)
|
||
return
|
||
self._process_links_list([url])
|
||
|
||
def _process_links_list(self, links_list):
|
||
if not self.vk:
|
||
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
|
||
return
|
||
|
||
self.user_ids_to_process.clear()
|
||
resolved_ids = []
|
||
failed_links = []
|
||
|
||
self._set_busy(True, "Статус: Определяю ID...")
|
||
try:
|
||
for link in links_list:
|
||
try:
|
||
path = urlparse(link).path
|
||
screen_name = path.split('/')[-1] if path else ''
|
||
if not screen_name and len(path.split('/')) > 1:
|
||
screen_name = path.split('/')[-2]
|
||
|
||
if not screen_name:
|
||
failed_links.append(link)
|
||
continue
|
||
|
||
resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
|
||
if resolved_object and resolved_object.get('type') == 'user':
|
||
resolved_ids.append(resolved_object['object_id'])
|
||
else:
|
||
failed_links.append(link)
|
||
except VkApiError as e:
|
||
self._log_error("resolveScreenName", e)
|
||
failed_links.append(f"{link} ({self._format_vk_error(e)})")
|
||
except Exception:
|
||
failed_links.append(link)
|
||
finally:
|
||
self._set_busy(False)
|
||
|
||
self.user_ids_to_process = resolved_ids
|
||
|
||
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
|
||
if len(links_list) > 1:
|
||
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
|
||
|
||
if failed_links:
|
||
QMessageBox.warning(self, "Ошибка получения ID",
|
||
f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links))
|
||
|
||
self.status_label.setText(status_message)
|
||
self.set_ui_state(self.token is not None)
|
||
|
||
def create_menu(self):
|
||
"""Создает верхнее меню."""
|
||
menu_bar = self.menuBar()
|
||
|
||
# Меню "Инструменты"
|
||
tools_menu = menu_bar.addMenu("Инструменты")
|
||
|
||
# Действие "Назначить администратором"
|
||
make_admin_action = QAction("Назначить администратором", self)
|
||
make_admin_action.setStatusTip("Назначить выбранных пользователей администраторами в выбранных чатах")
|
||
make_admin_action.triggered.connect(self.set_user_admin)
|
||
tools_menu.addAction(make_admin_action)
|
||
|
||
logout_action = QAction("Выйти и очистить", self)
|
||
logout_action.setStatusTip("Выйти, удалить токен и кэш")
|
||
logout_action.triggered.connect(self.logout_and_clear)
|
||
tools_menu.addAction(logout_action)
|
||
|
||
def create_chat_tab(self):
|
||
# This implementation correctly creates a scrollable area for chat lists.
|
||
tab_content_widget = QWidget()
|
||
tab_layout = QVBoxLayout(tab_content_widget)
|
||
tab_layout.setContentsMargins(0, 0, 0, 0)
|
||
tab_layout.setSpacing(0)
|
||
|
||
scroll_area = QScrollArea()
|
||
scroll_area.setWidgetResizable(True)
|
||
scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
|
||
scroll_area.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded)
|
||
|
||
tab_layout.addWidget(scroll_area)
|
||
|
||
checkbox_container_widget = QWidget()
|
||
checkbox_layout = QVBoxLayout(checkbox_container_widget)
|
||
checkbox_layout.setContentsMargins(5, 5, 5, 5)
|
||
checkbox_layout.setSpacing(2)
|
||
checkbox_layout.addStretch()
|
||
|
||
scroll_area.setWidget(checkbox_container_widget)
|
||
|
||
return tab_content_widget
|
||
|
||
def setup_token_timer(self):
|
||
self.token_countdown_timer = QTimer(self)
|
||
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
|
||
self.token_countdown_timer.start(1000)
|
||
|
||
def update_token_timer_display(self):
|
||
if self.token_expiration_time is None:
|
||
self.token_timer_label.setText("Срок действия токена: Н/Д")
|
||
return
|
||
|
||
# ИСПРАВЛЕНИЕ: обрабатываем бессрочный токен
|
||
if self.token_expiration_time == 0:
|
||
self.token_timer_label.setText("Срок действия: Бессрочно")
|
||
return
|
||
|
||
remaining_seconds = int(self.token_expiration_time - time.time())
|
||
|
||
if remaining_seconds <= 0:
|
||
self.token_timer_label.setText("Срок действия истек!")
|
||
if self.token_countdown_timer.isActive():
|
||
self.token_countdown_timer.stop()
|
||
self.set_ui_state(False)
|
||
self.status_label.setText("Статус: Срок действия истек, авторизуйтесь заново.")
|
||
self.token, self.token_expiration_time = None, None
|
||
self.token_input.clear()
|
||
return
|
||
|
||
minutes, seconds = divmod(remaining_seconds, 60)
|
||
hours, minutes = divmod(minutes, 60)
|
||
self.token_timer_label.setText(f"Срок: {hours:02d}ч {minutes:02d}м {seconds:02d}с")
|
||
|
||
def set_ui_state(self, authorized):
|
||
self.auth_btn.setEnabled(not authorized)
|
||
for btn in [self.select_all_btn, self.deselect_all_btn, self.refresh_chats_btn,
|
||
self.vk_url_input, self.multi_link_btn,
|
||
self.visible_messages_checkbox]:
|
||
btn.setEnabled(authorized)
|
||
|
||
has_ids = authorized and bool(self.user_ids_to_process)
|
||
self.remove_user_btn.setEnabled(has_ids)
|
||
self.add_user_btn.setEnabled(has_ids)
|
||
|
||
self.chat_tabs.setVisible(authorized)
|
||
if authorized:
|
||
# Когда авторизованы, задаем минимальную высоту, достаточную для ~10-12 чатов
|
||
self.chat_tabs.setMinimumHeight(300)
|
||
else:
|
||
# Когда не авторизованы, сбрасываем минимальную высоту
|
||
self.chat_tabs.setMinimumHeight(0)
|
||
|
||
if not authorized:
|
||
self.user_ids_to_process.clear()
|
||
self._set_vk_url_input_text("")
|
||
self._clear_chat_tabs()
|
||
|
||
def _set_busy(self, busy, status_text=None):
|
||
if status_text:
|
||
self.status_label.setText(status_text)
|
||
if busy:
|
||
self._busy = True
|
||
QApplication.setOverrideCursor(Qt.WaitCursor)
|
||
for widget in [
|
||
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
|
||
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
|
||
self.remove_user_btn, self.add_user_btn
|
||
]:
|
||
widget.setEnabled(False)
|
||
else:
|
||
self._busy = False
|
||
QApplication.restoreOverrideCursor()
|
||
if self.token is None:
|
||
self.set_ui_state(False)
|
||
else:
|
||
self.set_ui_state(True)
|
||
|
||
def _ensure_log_dir(self):
|
||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||
|
||
def _log_error(self, context, exc):
|
||
try:
|
||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||
self._rotate_log_if_needed()
|
||
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
|
||
message = self._format_vk_error(exc)
|
||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(f"[{timestamp}] {context}: {message}\n")
|
||
except Exception:
|
||
pass
|
||
|
||
def _rotate_log_if_needed(self):
|
||
try:
|
||
if not os.path.exists(LOG_FILE):
|
||
return
|
||
if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES:
|
||
return
|
||
if os.path.exists(LOG_BACKUP_FILE):
|
||
os.remove(LOG_BACKUP_FILE)
|
||
os.replace(LOG_FILE, LOG_BACKUP_FILE)
|
||
except Exception:
|
||
pass
|
||
|
||
def _format_vk_error(self, exc):
|
||
error = getattr(exc, "error", None)
|
||
code = None
|
||
message = str(exc)
|
||
if isinstance(error, dict):
|
||
code = error.get("error_code")
|
||
message = error.get("error_msg") or message
|
||
hints = {
|
||
5: "Ошибка авторизации. Проверьте токен.",
|
||
6: "Слишком много запросов. Подождите и повторите.",
|
||
7: "Недостаточно прав.",
|
||
9: "Слишком много однотипных действий.",
|
||
10: "Внутренняя ошибка VK. Повторите позже.",
|
||
15: "Доступ запрещен.",
|
||
100: "Некорректный параметр запроса.",
|
||
113: "Неверный идентификатор пользователя.",
|
||
200: "Доступ к чату запрещен.",
|
||
}
|
||
if code in hints:
|
||
message = f"{message} ({hints[code]})"
|
||
if code is not None:
|
||
return f"[{code}] {message}"
|
||
return message
|
||
|
||
def _set_vk_url_input_text(self, text):
|
||
self.suppress_resolve = True
|
||
try:
|
||
self.vk_url_input.blockSignals(True)
|
||
self.vk_url_input.setText(text)
|
||
finally:
|
||
self.vk_url_input.blockSignals(False)
|
||
self.suppress_resolve = False
|
||
|
||
def logout_and_clear(self):
|
||
confirm = QMessageBox.question(
|
||
self,
|
||
"Подтверждение выхода",
|
||
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
|
||
QMessageBox.Yes | QMessageBox.No
|
||
)
|
||
if confirm != QMessageBox.Yes:
|
||
return
|
||
|
||
self.token = None
|
||
self.token_expiration_time = None
|
||
self.vk_session = None
|
||
self.vk = None
|
||
self.user_ids_to_process.clear()
|
||
self._set_vk_url_input_text("")
|
||
self.token_input.clear()
|
||
self.token_timer_label.setText("Срок действия токена: Н/Д")
|
||
self.status_label.setText("Статус: не авторизован")
|
||
if self.token_countdown_timer.isActive():
|
||
self.token_countdown_timer.stop()
|
||
self._clear_chat_tabs()
|
||
self.set_ui_state(False)
|
||
|
||
try:
|
||
if os.path.exists(TOKEN_FILE):
|
||
os.remove(TOKEN_FILE)
|
||
except Exception as e:
|
||
print(f"Ошибка удаления токена: {e}")
|
||
|
||
try:
|
||
self._try_remove_web_cache()
|
||
except Exception as e:
|
||
print(f"Ошибка удаления кэша: {e}")
|
||
|
||
def _cleanup_cache_if_needed(self):
|
||
if os.path.exists(CACHE_CLEANUP_MARKER):
|
||
try:
|
||
self._try_remove_web_cache()
|
||
if os.path.exists(CACHE_CLEANUP_MARKER):
|
||
os.remove(CACHE_CLEANUP_MARKER)
|
||
except Exception as e:
|
||
print(f"Ошибка отложенной очистки кэша: {e}")
|
||
|
||
def _try_remove_web_cache(self):
|
||
if not os.path.exists(WEB_ENGINE_CACHE_DIR):
|
||
return
|
||
attempts = 5
|
||
last_error = None
|
||
for _ in range(attempts):
|
||
try:
|
||
shutil.rmtree(WEB_ENGINE_CACHE_DIR)
|
||
last_error = None
|
||
break
|
||
except Exception as e:
|
||
last_error = e
|
||
time.sleep(0.2)
|
||
if last_error:
|
||
os.makedirs(APP_DATA_DIR, exist_ok=True)
|
||
with open(CACHE_CLEANUP_MARKER, "w") as f:
|
||
f.write("pending")
|
||
raise last_error
|
||
|
||
def load_saved_token_on_startup(self):
|
||
loaded_token, expiration_time = load_token()
|
||
if loaded_token:
|
||
self.handle_auth_token_on_load(loaded_token, expiration_time)
|
||
else:
|
||
self.set_ui_state(False)
|
||
|
||
def set_all_checkboxes_on_current_tab(self, checked):
|
||
current_index = self.chat_tabs.currentIndex()
|
||
checkbox_lists = [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.retail_warehouse_checkboxes, self.retail_coffee_checkboxes, self.other_chat_checkboxes]
|
||
if 0 <= current_index < len(checkbox_lists):
|
||
for checkbox in checkbox_lists[current_index]:
|
||
checkbox.setChecked(checked)
|
||
|
||
def start_auth(self):
|
||
self.status_label.setText("Статус: ожидание авторизации...")
|
||
auth_url = (
|
||
"https://oauth.vk.com/authorize?"
|
||
"client_id=2685278&"
|
||
"display=page&"
|
||
"redirect_uri=https://oauth.vk.com/blank.html&"
|
||
"scope=1073737727&"
|
||
"response_type=token&"
|
||
"v=5.131"
|
||
)
|
||
output_path = os.path.join(APP_DATA_DIR, "auth_result.json")
|
||
try:
|
||
if os.path.exists(output_path):
|
||
os.remove(output_path)
|
||
except Exception:
|
||
pass
|
||
|
||
cmd = [sys.executable, "--auth", auth_url, output_path]
|
||
try:
|
||
subprocess.check_call(cmd)
|
||
except Exception as e:
|
||
self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}")
|
||
return
|
||
|
||
if not os.path.exists(output_path):
|
||
self.status_label.setText("Статус: авторизация не удалась")
|
||
return
|
||
|
||
try:
|
||
with open(output_path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
token = data.get("token")
|
||
expires_in = data.get("expires_in", 0)
|
||
except Exception:
|
||
token = None
|
||
expires_in = 0
|
||
|
||
try:
|
||
if os.path.exists(output_path):
|
||
os.remove(output_path)
|
||
except Exception:
|
||
pass
|
||
|
||
self.handle_new_auth_token(token, expires_in)
|
||
|
||
def handle_new_auth_token(self, token, expires_in):
|
||
if not token:
|
||
self.status_label.setText("Статус: Авторизация не удалась")
|
||
self.set_ui_state(False)
|
||
return
|
||
|
||
self.token = token
|
||
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
|
||
self.token_expiration_time = save_token(self.token, expires_in)
|
||
|
||
self.token_input.setText(self.token[:50] + "...")
|
||
self.status_label.setText("Статус: авторизован")
|
||
self.vk_session = VkApi(token=self.token)
|
||
self.vk = self.vk_session.get_api()
|
||
self.set_ui_state(True)
|
||
self.load_chats()
|
||
|
||
def handle_auth_token_on_load(self, token, expiration_time):
|
||
self.token = token
|
||
self.token_expiration_time = expiration_time
|
||
|
||
self.token_input.setText(self.token[:50] + "...")
|
||
self.status_label.setText("Статус: авторизован (токен загружен)")
|
||
self.vk_session = VkApi(token=self.token)
|
||
self.vk = self.vk_session.get_api()
|
||
self.set_ui_state(True)
|
||
self.load_chats()
|
||
|
||
def _clear_chat_tabs(self):
|
||
self.chats.clear()
|
||
for chk_list in [self.office_chat_checkboxes, self.retail_chat_checkboxes, self.warehouse_chat_checkboxes, self.coffee_chat_checkboxes, self.other_chat_checkboxes]:
|
||
for checkbox in chk_list:
|
||
checkbox.setParent(None)
|
||
checkbox.deleteLater()
|
||
chk_list.clear()
|
||
|
||
def _vk_error_code(self, exc):
|
||
error = getattr(exc, "error", None)
|
||
if isinstance(error, dict):
|
||
return error.get("error_code")
|
||
return getattr(exc, "code", None)
|
||
|
||
def _vk_call_with_retry(self, func, *args, **kwargs):
|
||
max_attempts = 5
|
||
for attempt in range(1, max_attempts + 1):
|
||
try:
|
||
return func(*args, **kwargs)
|
||
except VkApiError as e:
|
||
code = self._vk_error_code(e)
|
||
if code not in (6, 9, 10) or attempt == max_attempts:
|
||
raise
|
||
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
|
||
if code == 9:
|
||
delay = max(delay, 1.0)
|
||
time.sleep(delay)
|
||
|
||
def load_chats(self):
|
||
self._clear_chat_tabs()
|
||
|
||
# Get the checkbox layouts from each tab
|
||
layouts = [
|
||
self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||
self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||
self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||
self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
|
||
self.other_tab.findChild(QWidget).findChild(QVBoxLayout)
|
||
]
|
||
|
||
try:
|
||
self._set_busy(True, "Статус: загрузка чатов...")
|
||
conversations = []
|
||
start_from = None
|
||
seen_start_tokens = set()
|
||
while True:
|
||
params = {"count": 200, "filter": "all"}
|
||
if start_from:
|
||
if start_from in seen_start_tokens:
|
||
break
|
||
params["start_from"] = start_from
|
||
seen_start_tokens.add(start_from)
|
||
|
||
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
|
||
page_items = response.get("items", [])
|
||
if not page_items:
|
||
break
|
||
conversations.extend(page_items)
|
||
|
||
start_from = response.get("next_from")
|
||
if not start_from:
|
||
break
|
||
for conv in conversations:
|
||
if conv['conversation']['peer']['type'] == 'chat':
|
||
chat_id = conv['conversation']['peer']['local_id']
|
||
title = conv['conversation']['chat_settings']['title']
|
||
self.chats.append({'id': chat_id, 'title': title})
|
||
|
||
checkbox = QCheckBox(f"{title} (id: {chat_id})")
|
||
checkbox.setProperty("chat_id", chat_id)
|
||
|
||
# Insert checkbox at the top of the layout (before the stretch)
|
||
if "AG офис" in title:
|
||
layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
|
||
self.office_chat_checkboxes.append(checkbox)
|
||
elif "AG розница" in title:
|
||
layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
|
||
self.retail_chat_checkboxes.append(checkbox)
|
||
elif "AG склад" in title:
|
||
layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
|
||
self.warehouse_chat_checkboxes.append(checkbox)
|
||
elif "AG кофейни" in title:
|
||
layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
|
||
self.coffee_chat_checkboxes.append(checkbox)
|
||
else:
|
||
layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
|
||
self.other_chat_checkboxes.append(checkbox)
|
||
|
||
self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
|
||
self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
|
||
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
|
||
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
|
||
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
|
||
except VkApiError as e:
|
||
self._log_error("load_chats", e)
|
||
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
|
||
self.set_ui_state(False)
|
||
finally:
|
||
self._set_busy(False)
|
||
|
||
def get_user_info_by_id(self, user_id):
|
||
try:
|
||
user = self.vk.users.get(user_ids=user_id)[0]
|
||
return f"{user.get('first_name', '')} {user.get('last_name', '')}"
|
||
except Exception:
|
||
return f"Пользователь {user_id}"
|
||
|
||
def _get_selected_chats(self):
|
||
selected = []
|
||
for chk in self.office_chat_checkboxes + self.retail_chat_checkboxes + self.warehouse_chat_checkboxes + self.coffee_chat_checkboxes + self.other_chat_checkboxes:
|
||
if chk.isChecked():
|
||
chat_id = chk.property("chat_id")
|
||
title = next((c['title'] for c in self.chats if c['id'] == chat_id), "")
|
||
selected.append({'id': chat_id, 'title': title})
|
||
return selected
|
||
|
||
def _execute_user_action(self, action_type):
|
||
if not self.user_ids_to_process:
|
||
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
|
||
return
|
||
selected_chats = self._get_selected_chats()
|
||
if not selected_chats:
|
||
QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.")
|
||
return
|
||
|
||
user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process}
|
||
|
||
action_verb = "исключить" if action_type == "remove" else "пригласить"
|
||
preposition = "из" if action_type == "remove" else "в"
|
||
|
||
user_names_list = list(user_infos.values())
|
||
user_names_str = "\n".join([f"• {name}" for name in user_names_list])
|
||
|
||
chat_count = len(selected_chats)
|
||
chat_str = ""
|
||
|
||
# Финальная логика склонения с учетом падежа (для "из" и "в")
|
||
if chat_count % 10 == 1 and chat_count % 100 != 11:
|
||
if action_type == 'remove':
|
||
# из 1 выбранного чата (Родительный падеж, ед.ч.)
|
||
chat_str = f"{chat_count} выбранного чата"
|
||
else:
|
||
# в 1 выбранный чат (Винительный падеж, ед.ч.)
|
||
chat_str = f"{chat_count} выбранный чат"
|
||
elif 2 <= chat_count % 10 <= 4 and (chat_count % 100 < 10 or chat_count % 100 >= 20):
|
||
if action_type == 'remove':
|
||
# из 3 выбранных чатов (Родительный падеж, мн.ч.)
|
||
chat_str = f"{chat_count} выбранных чатов"
|
||
else:
|
||
# в 3 выбранных чата (Родительный падеж, ед.ч.)
|
||
chat_str = f"{chat_count} выбранных чата"
|
||
else:
|
||
# из 5 выбранных чатов / в 5 выбранных чатов (Родительный падеж, мн.ч.)
|
||
chat_str = f"{chat_count} выбранных чатов"
|
||
|
||
msg = (
|
||
f"Вы уверены, что хотите {action_verb} следующих пользователей:\n\n"
|
||
f"{user_names_str}\n\n"
|
||
f"{preposition} {chat_str}?"
|
||
)
|
||
|
||
confirm_dialog = QMessageBox(self)
|
||
confirm_dialog.setWindowTitle("Подтверждение действия")
|
||
confirm_dialog.setText(msg)
|
||
confirm_dialog.setIcon(QMessageBox.Question)
|
||
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole)
|
||
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole)
|
||
confirm_dialog.setDefaultButton(no_button) # "Нет" - кнопка по умолчанию
|
||
|
||
confirm_dialog.exec()
|
||
|
||
if confirm_dialog.clickedButton() != yes_button:
|
||
return
|
||
|
||
results = []
|
||
total = len(selected_chats) * len(user_infos)
|
||
processed = 0
|
||
try:
|
||
action_label = "исключение" if action_type == "remove" else "приглашение"
|
||
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
|
||
for chat in selected_chats:
|
||
for user_id, user_info in user_infos.items():
|
||
try:
|
||
if action_type == "remove":
|
||
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
|
||
results.append(f"✓ '{user_info}' исключен из '{chat['title']}'.")
|
||
else:
|
||
params = {'chat_id': chat['id'], 'user_id': user_id}
|
||
if self.visible_messages_checkbox.isChecked():
|
||
params['visible_messages_count'] = 250
|
||
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
|
||
results.append(f"✓ '{user_info}' приглашен в '{chat['title']}'.")
|
||
except VkApiError as e:
|
||
self._log_error("execute_user_action", e)
|
||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||
finally:
|
||
processed += 1
|
||
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
|
||
finally:
|
||
self._set_busy(False)
|
||
|
||
QMessageBox.information(self, "Результаты", "\n".join(results))
|
||
self.vk_url_input.clear()
|
||
self.user_ids_to_process.clear()
|
||
self.set_ui_state(self.token is not None)
|
||
|
||
def remove_user(self):
|
||
self._execute_user_action("remove")
|
||
|
||
def add_user_to_chat(self):
|
||
self._execute_user_action("add")
|
||
|
||
def set_user_admin(self):
|
||
"""Назначает пользователя администратором чата."""
|
||
# 1. Проверки на наличие выбранных пользователей и чатов
|
||
if not self.user_ids_to_process:
|
||
QMessageBox.warning(self, "Ошибка", "Нет ID пользователей для операции.")
|
||
return
|
||
|
||
selected_chats = self._get_selected_chats()
|
||
if not selected_chats:
|
||
QMessageBox.warning(self, "Ошибка", "Выберите хотя бы один чат.")
|
||
return
|
||
|
||
# 2. Подготовка данных для подтверждения
|
||
user_infos = {uid: self.get_user_info_by_id(uid) for uid in self.user_ids_to_process}
|
||
user_names_str = "\n".join([f"• {name}" for name in user_infos.values()])
|
||
|
||
msg = (
|
||
f"Вы уверены, что хотите назначить АДМИНИСТРАТОРАМИ следующих пользователей:\n\n"
|
||
f"{user_names_str}\n\n"
|
||
f"в {len(selected_chats)} выбранных чатах?"
|
||
)
|
||
|
||
# 3. Диалог подтверждения
|
||
confirm_dialog = QMessageBox(self)
|
||
confirm_dialog.setWindowTitle("Подтверждение прав")
|
||
confirm_dialog.setText(msg)
|
||
confirm_dialog.setIcon(QMessageBox.Question)
|
||
yes_button = confirm_dialog.addButton("Да", QMessageBox.YesRole)
|
||
no_button = confirm_dialog.addButton("Нет", QMessageBox.NoRole)
|
||
confirm_dialog.setDefaultButton(no_button)
|
||
|
||
confirm_dialog.exec()
|
||
|
||
if confirm_dialog.clickedButton() != yes_button:
|
||
return
|
||
|
||
# 4. Выполнение API запросов
|
||
results = []
|
||
total = len(selected_chats) * len(user_infos)
|
||
processed = 0
|
||
try:
|
||
self._set_busy(True, f"Статус: назначение админов (0/{total})...")
|
||
for chat in selected_chats:
|
||
# VK API требует peer_id. Для чатов это 2000000000 + local_id
|
||
try:
|
||
peer_id = 2000000000 + int(chat['id'])
|
||
except ValueError:
|
||
results.append(f"✗ Ошибка ID чата: {chat['id']}")
|
||
continue
|
||
|
||
for user_id, user_info in user_infos.items():
|
||
try:
|
||
self._vk_call_with_retry(
|
||
self.vk.messages.setMemberRole,
|
||
peer_id=peer_id,
|
||
member_id=user_id,
|
||
role="admin"
|
||
)
|
||
results.append(f"✓ '{user_info}' назначен админом в '{chat['title']}'.")
|
||
except VkApiError as e:
|
||
self._log_error("set_user_admin", e)
|
||
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
|
||
finally:
|
||
processed += 1
|
||
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
|
||
finally:
|
||
self._set_busy(False)
|
||
|
||
# 5. Вывод результата
|
||
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
|
||
|
||
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
|
||
self.vk_url_input.clear()
|
||
self.user_ids_to_process.clear()
|
||
self.set_ui_state(self.token is not None)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if "--auth" in sys.argv:
|
||
try:
|
||
idx = sys.argv.index("--auth")
|
||
auth_url = sys.argv[idx + 1]
|
||
output_path = sys.argv[idx + 2]
|
||
except Exception:
|
||
sys.exit(1)
|
||
auth_webview.main_auth(auth_url, output_path)
|
||
sys.exit(0)
|
||
app = QApplication(sys.argv)
|
||
app.setStyle("Fusion")
|
||
app.setPalette(app.style().standardPalette())
|
||
|
||
# Установка иконки для ВСЕХ окон приложения
|
||
icon_path = get_resource_path("icon.ico")
|
||
if os.path.exists(icon_path):
|
||
app.setWindowIcon(QIcon(icon_path))
|
||
|
||
window = VkChatManager()
|
||
window.show()
|
||
sys.exit(app.exec())
|