7 Commits

Author SHA1 Message Date
aad6e8c5af feat(auth): pywebview вместо webengine
- добавлен auth_webview.py и режим --auth

- build.py обновлён, WebEngine исключён

- pywebview добавлен в requirements
2026-02-04 00:02:32 +03:00
4629037890 feat(app): улучшения UX, логирование и безопасность 2026-02-03 22:26:41 +03:00
ad24cb6fca feat(chat): добавлена функция назначения администраторов чатов
- Добавлено верхнее меню "Инструменты".
- Реализован метод set_user_admin с вызовом API messages.setMemberRole.
- Добавлена конвертация локального chat_id в peer_id (2000000000+id) для корректной работы метода.
- Добавлены диалоги подтверждения и отчет о результатах выполнения.
2026-02-01 05:33:08 +03:00
9b263dd85f feat(build): автоматизация сборки и поддержка бессрочных токенов
- Исправлена ошибка ImportError: QtWebEngineCore путем перехода на PyInstaller.
- Добавлен скрипт build.py для автоматической сборки, очистки DLL и создания ZIP-архива.
- Реализована поддержка бессрочного доступа (offline_access) для VK Access Token.
- Обновлен README.md: добавлены разделы для разработчиков и описание структуры данных.
- Оптимизирован размер билда за счет удаления неиспользуемых библиотек Qt и папок локализации.
2026-02-01 05:32:51 +03:00
d7eaec4ba4 fix(chat): исправление логина и загрузки списков чатов
Signed-off-by: benya <benya@daemonlord.ru>
2026-02-01 05:32:40 +03:00
Alex
30fc78e89b feat(build): Добавлена кросс-платформенная поддержка в setup.py
Модифицирован скрипт сборки на основе cx_Freeze для обеспечения совместимости с основными операционными системами (Windows, macOS, Linux). Ранее скрипт был настроен преимущественно для Windows.

Ключевые изменения:
- **Динамическое имя файла:** Исполняемый файл получает расширение `.exe` только при сборке на Windows.
- **Разделение сборок:** Для каждой целевой ОС создается своя папка (например, `build_linux`), что позволяет хранить сборки для разных систем одновременно.
- **Платформо-зависимые опции:** Учтены особенности сборки для каждой ОС, включая `base="Win32GUI"` для Windows и `base=None` для Linux.

edit: changed .gitignore
2026-02-01 05:32:32 +03:00
Alex
6225fb15d4 feat(ui): массовые операции, вкладки и улучшение UX 2026-02-01 05:31:56 +03:00
4 changed files with 538 additions and 216 deletions

78
auth_webview.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import sys
import time
import json
import threading
from urllib.parse import urlparse, parse_qs, unquote
import webview
def extract_token(url_string):
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
return token, expires_in
def main_auth(auth_url, output_path):
def poll_url():
try:
url = window.get_current_url()
except Exception:
url = None
if url:
token, expires_in = extract_token(url)
if token:
data = {"token": token, "expires_in": expires_in}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f)
window.destroy()
return
threading.Timer(0.5, poll_url).start()
def on_loaded():
threading.Timer(0.5, poll_url).start()
window = webview.create_window("VK Авторизация", auth_url)
window.events.loaded += on_loaded
storage_path = os.path.join(os.path.dirname(output_path), "webview_profile")
webview.start(private_mode=False, storage_path=storage_path)
if __name__ == "__main__":
main()

View File

@@ -5,11 +5,28 @@ import sys
# --- Конфигурация --- # --- Конфигурация ---
APP_NAME = "AnabasisManager" APP_NAME = "AnabasisManager"
VERSION = "1.3" # Ваша версия VERSION = "1.5" # Ваша версия
MAIN_SCRIPT = "main.py" MAIN_SCRIPT = "main.py"
ICON_PATH = "icon.ico" ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME) DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"}
REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
"Qt6Charts.dll", "Qt6Multimedia.dll", "Qt63DCore.dll",
"translations",
"Qt6QuickTemplates2.dll"
]
def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing:
print("[ERROR] Скрипт нужно запускать из корня проекта.")
print(f"[ERROR] Не найдены: {', '.join(missing)}")
sys.exit(1)
def run_build(): def run_build():
@@ -20,12 +37,14 @@ def run_build():
"--noconfirm", "--noconfirm",
"--onedir", "--onedir",
"--windowed", "--windowed",
"--exclude-module", "PySide6.QtWebEngineCore",
"--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}", f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "", f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "", f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "",
"--collect-all", "PySide6.QtWebEngineCore", f"--add-data=auth_webview.py{os.pathsep}.",
"--collect-all", "PySide6.QtWebEngineWidgets", MAIN_SCRIPT
MAIN_SCRIPT
] ]
command = [arg for arg in command if arg] command = [arg for arg in command if arg]
@@ -46,16 +65,7 @@ def run_cleanup():
if not os.path.exists(pyside_path): if not os.path.exists(pyside_path):
pyside_path = DIST_DIR pyside_path = DIST_DIR
to_remove = [ for item in REMOVE_LIST:
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
"Qt6Charts.dll", "Qt6Multimedia.dll", "Qt63DCore.dll",
"translations",
"Qt6QuickTemplates2.dll"
]
for item in to_remove:
path = os.path.join(pyside_path, item) path = os.path.join(pyside_path, item)
if os.path.exists(path): if os.path.exists(path):
try: try:
@@ -81,6 +91,7 @@ def create_archive():
if __name__ == "__main__": if __name__ == "__main__":
ensure_project_root()
# Предварительная очистка # Предварительная очистка
for folder in ["build", "dist"]: for folder in ["build", "dist"]:
if os.path.exists(folder): if os.path.exists(folder):
@@ -91,6 +102,6 @@ if __name__ == "__main__":
create_archive() create_archive()
print("\n" + "=" * 30) print("\n" + "=" * 30)
print(f"ПРОЦЕСС ЗАВЕРШЕН") print("ПРОЦЕСС ЗАВЕРШЕН")
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip") print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
print("=" * 30) print("=" * 30)

630
main.py
View File

@@ -1,7 +1,12 @@
import sys import sys
import base64
import ctypes
import shutil
import subprocess
from vk_api import VkApi from vk_api import VkApi
import json import json
import time import time
import auth_webview
import os import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox, QPushButton, QVBoxLayout, QWidget, QMessageBox,
@@ -9,16 +14,89 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox) QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
from PySide6.QtGui import QIcon, QAction from PySide6.QtGui import QIcon, QAction
from PySide6.QtWebEngineWidgets import QWebEngineView
from PySide6.QtWebEngineCore import QWebEnginePage, QWebEngineProfile
from urllib.parse import urlparse, parse_qs, unquote from urllib.parse import urlparse, parse_qs, unquote
from vk_api.exceptions import VkApiError from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QStandardPaths
from ctypes import wintypes
# --- Управление токенами и настройками --- # --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager") APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json") TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache") WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def get_resource_path(relative_path): def get_resource_path(relative_path):
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """ """ Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
@@ -39,9 +117,19 @@ def save_token(token, expires_in=0):
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0 # ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0 expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception as e:
print(f"Ошибка шифрования токена: {e}")
data = { data = {
"token": token, "token": stored_token,
"expiration_time": expiration_time "expiration_time": expiration_time,
"encrypted": encrypted
} }
try: try:
@@ -66,6 +154,15 @@ def load_token():
data = json.load(f) data = json.load(f)
token = data.get("token") token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception as e:
print(f"Ошибка расшифровки токена: {e}")
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
expiration_time = data.get("expiration_time") expiration_time = data.get("expiration_time")
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего # ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
@@ -79,142 +176,6 @@ def load_token():
print(f"Ошибка загрузки: {e}") print(f"Ошибка загрузки: {e}")
return None, None return None, None
class WebEnginePage(QWebEnginePage):
"""
Класс для обработки навигационных запросов в QWebEngineView,
специально для извлечения токена авторизации VK.
"""
# Добавлен параметр profile в конструктор, чтобы использовать пользовательский профиль
def __init__(self, profile=None, parent=None, browser_window_instance=None):
super().__init__(profile, parent) # Передаем profile в базовый класс
self.parent_browser_window = browser_window_instance
self.token_extracted = False
def acceptNavigationRequest(self, url, _type, isMainFrame):
"""
Переопределенный метод для перехвата URL-адреса, содержащего токен доступа.
"""
url_string = url.toString()
if "access_token" in url_string and not self.token_extracted:
self.token_extracted = True
if self.parent_browser_window:
self.parent_browser_window.process_auth_url(url_string)
return False
return super().acceptNavigationRequest(url, _type, isMainFrame)
class AuthBrowserWindow(QDialog):
"""
Отдельное окно-диалог для проведения OAuth авторизации VK.
"""
token_extracted_signal = Signal(str, int)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Авторизация VK")
self.setGeometry(350, 350, 800, 600)
layout = QVBoxLayout(self)
self.browser = QWebEngineView()
# Настройка QWebEngineProfile для сохранения куки и других данных
os.makedirs(WEB_ENGINE_CACHE_DIR, exist_ok=True)
# ИСПРАВЛЕНО: Сделать profile атрибутом экземпляра и дать ему self как родителя
self.profile = QWebEngineProfile("AnabasisVKWebProfile", self)
self.profile.setPersistentCookiesPolicy(QWebEngineProfile.AllowPersistentCookies)
self.profile.setPersistentStoragePath(WEB_ENGINE_CACHE_DIR)
# Создаем страницу с настроенным профилем
self.browser.page = WebEnginePage(profile=self.profile, parent=self.browser, browser_window_instance=self)
self.browser.setPage(self.browser.page)
layout.addWidget(self.browser)
self.status_label = QLabel("Ожидание авторизации...")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
def start_auth_flow(self):
"""Запускает OAuth авторизацию VK в этом окне."""
auth_url = (
"https://oauth.vk.com/authorize?"
"client_id=2685278&"
"display=page&"
"redirect_uri=https://oauth.vk.com/blank.html&"
"scope=1073737727&"
"response_type=token&"
"v=5.131"
)
self.browser.setUrl(QUrl(auth_url))
self.status_label.setText("Пожалуйста, войдите в VK и разрешите доступ...")
def process_auth_url(self, url_string):
"""
Извлекает токен доступа из URL перенаправления и испускает сигнал.
"""
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
if token:
self.token_extracted_signal.emit(token, expires_in)
self.accept()
else:
QMessageBox.warning(self, "Ошибка Авторизации",
"Не удалось получить токен. Проверьте URL или попробуйте еще раз.")
self.reject()
def closeEvent(self, event):
"""
Переопределенный метод для обработки закрытия окна.
Выполняет очистку ресурсов QWebEngine, чтобы избежать предупреждения
"Release of profile requested but WebEnginePage still not deleted. Expect troubles!".
"""
current_page = self.browser.page()
if current_page:
# Снимаем страницу с QWebEngineView, чтобы View перестала ей владеть
self.browser.setPage(None)
# Планируем удаление объекта страницы
current_page.deleteLater()
# Вызываем метод базового класса для корректного закрытия диалога
super().closeEvent(event)
class MultiLinkDialog(QDialog): class MultiLinkDialog(QDialog):
def __init__(self, parent=None): def __init__(self, parent=None):
super().__init__(parent) super().__init__(parent)
@@ -253,12 +214,16 @@ class VkChatManager(QMainWindow):
self.vk_session = None self.vk_session = None
self.vk = None self.vk = None
self.user_ids_to_process = [] self.user_ids_to_process = []
self._busy = False
self.suppress_resolve = False
self.resolve_timer = QTimer(self) self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True) self.resolve_timer.setSingleShot(True)
self.resolve_timer.setInterval(750) self.resolve_timer.setInterval(750)
self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input) self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input)
self._cleanup_cache_if_needed()
self._ensure_log_dir()
self.init_ui() self.init_ui()
self.load_saved_token_on_startup() self.load_saved_token_on_startup()
self.setup_token_timer() self.setup_token_timer()
@@ -356,6 +321,8 @@ class VkChatManager(QMainWindow):
self.set_ui_state(False) self.set_ui_state(False)
def on_vk_url_input_changed(self, text): def on_vk_url_input_changed(self, text):
if self.suppress_resolve:
return
if self.vk_url_input.hasFocus(): if self.vk_url_input.hasFocus():
self.resolve_timer.start() self.resolve_timer.start()
@@ -364,7 +331,7 @@ class VkChatManager(QMainWindow):
if dialog.exec(): if dialog.exec():
links = dialog.get_links() links = dialog.get_links()
if links: if links:
self.vk_url_input.clear() self._set_vk_url_input_text("")
self._process_links_list(links) self._process_links_list(links)
else: else:
QMessageBox.information(self, "Информация", "Список ссылок пуст.") QMessageBox.information(self, "Информация", "Список ссылок пуст.")
@@ -387,33 +354,37 @@ class VkChatManager(QMainWindow):
resolved_ids = [] resolved_ids = []
failed_links = [] failed_links = []
self.status_label.setText("Статус: Определяю ID...") self._set_busy(True, "Статус: Определяю ID...")
QApplication.processEvents() try:
for link in links_list:
try:
path = urlparse(link).path
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
for link in links_list: if not screen_name:
try: failed_links.append(link)
path = urlparse(link).path continue
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
if not screen_name: resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except VkApiError as e:
self._log_error("resolveScreenName", e)
failed_links.append(f"{link} ({self._format_vk_error(e)})")
except Exception:
failed_links.append(link) failed_links.append(link)
continue finally:
self._set_busy(False)
resolved_object = self.vk.utils.resolveScreenName(screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except Exception:
failed_links.append(link)
self.user_ids_to_process = resolved_ids self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)." status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
if len(links_list) > 1: if len(links_list) > 1:
self.vk_url_input.setText(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links: if failed_links:
QMessageBox.warning(self, "Ошибка получения ID", QMessageBox.warning(self, "Ошибка получения ID",
@@ -435,6 +406,11 @@ class VkChatManager(QMainWindow):
make_admin_action.triggered.connect(self.set_user_admin) make_admin_action.triggered.connect(self.set_user_admin)
tools_menu.addAction(make_admin_action) tools_menu.addAction(make_admin_action)
logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear)
tools_menu.addAction(logout_action)
def create_chat_tab(self): def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists. # This implementation correctly creates a scrollable area for chat lists.
tab_content_widget = QWidget() tab_content_widget = QWidget()
@@ -511,7 +487,150 @@ class VkChatManager(QMainWindow):
if not authorized: if not authorized:
self.user_ids_to_process.clear() self.user_ids_to_process.clear()
self.vk_url_input.clear() self._set_vk_url_input_text("")
self._clear_chat_tabs()
def _set_busy(self, busy, status_text=None):
if status_text:
self.status_label.setText(status_text)
if busy:
self._busy = True
QApplication.setOverrideCursor(Qt.WaitCursor)
for widget in [
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
self.remove_user_btn, self.add_user_btn
]:
widget.setEnabled(False)
else:
self._busy = False
QApplication.restoreOverrideCursor()
if self.token is None:
self.set_ui_state(False)
else:
self.set_ui_state(True)
def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True)
def _log_error(self, context, exc):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
self._rotate_log_if_needed()
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
message = self._format_vk_error(exc)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {context}: {message}\n")
except Exception:
pass
def _rotate_log_if_needed(self):
try:
if not os.path.exists(LOG_FILE):
return
if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES:
return
if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception:
pass
def _format_vk_error(self, exc):
error = getattr(exc, "error", None)
code = None
message = str(exc)
if isinstance(error, dict):
code = error.get("error_code")
message = error.get("error_msg") or message
hints = {
5: "Ошибка авторизации. Проверьте токен.",
6: "Слишком много запросов. Подождите и повторите.",
7: "Недостаточно прав.",
9: "Слишком много однотипных действий.",
10: "Внутренняя ошибка VK. Повторите позже.",
15: "Доступ запрещен.",
100: "Некорректный параметр запроса.",
113: "Неверный идентификатор пользователя.",
200: "Доступ к чату запрещен.",
}
if code in hints:
message = f"{message} ({hints[code]})"
if code is not None:
return f"[{code}] {message}"
return message
def _set_vk_url_input_text(self, text):
self.suppress_resolve = True
try:
self.vk_url_input.blockSignals(True)
self.vk_url_input.setText(text)
finally:
self.vk_url_input.blockSignals(False)
self.suppress_resolve = False
def logout_and_clear(self):
confirm = QMessageBox.question(
self,
"Подтверждение выхода",
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
QMessageBox.Yes | QMessageBox.No
)
if confirm != QMessageBox.Yes:
return
self.token = None
self.token_expiration_time = None
self.vk_session = None
self.vk = None
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self.token_input.clear()
self.token_timer_label.setText("Срок действия токена: Н")
self.status_label.setText("Статус: не авторизован")
if self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self._clear_chat_tabs()
self.set_ui_state(False)
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception as e:
print(f"Ошибка удаления токена: {e}")
try:
self._try_remove_web_cache()
except Exception as e:
print(f"Ошибка удаления кэша: {e}")
def _cleanup_cache_if_needed(self):
if os.path.exists(CACHE_CLEANUP_MARKER):
try:
self._try_remove_web_cache()
if os.path.exists(CACHE_CLEANUP_MARKER):
os.remove(CACHE_CLEANUP_MARKER)
except Exception as e:
print(f"Ошибка отложенной очистки кэша: {e}")
def _try_remove_web_cache(self):
if not os.path.exists(WEB_ENGINE_CACHE_DIR):
return
attempts = 5
last_error = None
for _ in range(attempts):
try:
shutil.rmtree(WEB_ENGINE_CACHE_DIR)
last_error = None
break
except Exception as e:
last_error = e
time.sleep(0.2)
if last_error:
os.makedirs(APP_DATA_DIR, exist_ok=True)
with open(CACHE_CLEANUP_MARKER, "w") as f:
f.write("pending")
raise last_error
def load_saved_token_on_startup(self): def load_saved_token_on_startup(self):
loaded_token, expiration_time = load_token() loaded_token, expiration_time = load_token()
@@ -529,10 +648,49 @@ class VkChatManager(QMainWindow):
def start_auth(self): def start_auth(self):
self.status_label.setText("Статус: ожидание авторизации...") self.status_label.setText("Статус: ожидание авторизации...")
auth_window = AuthBrowserWindow(self) auth_url = (
auth_window.token_extracted_signal.connect(self.handle_new_auth_token) "https://oauth.vk.com/authorize?"
auth_window.start_auth_flow() "client_id=2685278&"
auth_window.exec() "display=page&"
"redirect_uri=https://oauth.vk.com/blank.html&"
"scope=1073737727&"
"response_type=token&"
"v=5.131"
)
output_path = os.path.join(APP_DATA_DIR, "auth_result.json")
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
cmd = [sys.executable, "--auth", auth_url, output_path]
try:
subprocess.check_call(cmd)
except Exception as e:
self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}")
return
if not os.path.exists(output_path):
self.status_label.setText("Статус: авторизация не удалась")
return
try:
with open(output_path, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
expires_in = data.get("expires_in", 0)
except Exception:
token = None
expires_in = 0
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
self.handle_new_auth_token(token, expires_in)
def handle_new_auth_token(self, token, expires_in): def handle_new_auth_token(self, token, expires_in):
if not token: if not token:
@@ -570,6 +728,26 @@ class VkChatManager(QMainWindow):
checkbox.deleteLater() checkbox.deleteLater()
chk_list.clear() chk_list.clear()
def _vk_error_code(self, exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
def _vk_call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
code = self._vk_error_code(e)
if code not in (6, 9, 10) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if code == 9:
delay = max(delay, 1.0)
time.sleep(delay)
def load_chats(self): def load_chats(self):
self._clear_chat_tabs() self._clear_chat_tabs()
@@ -583,7 +761,27 @@ class VkChatManager(QMainWindow):
] ]
try: try:
conversations = self.vk.messages.getConversations(count=200, filter="all")['items'] self._set_busy(True, "Статус: загрузка чатов...")
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
for conv in conversations: for conv in conversations:
if conv['conversation']['peer']['type'] == 'chat': if conv['conversation']['peer']['type'] == 'chat':
chat_id = conv['conversation']['peer']['local_id'] chat_id = conv['conversation']['peer']['local_id']
@@ -616,8 +814,11 @@ class VkChatManager(QMainWindow):
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e: except VkApiError as e:
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {e}") self._log_error("load_chats", e)
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False) self.set_ui_state(False)
finally:
self._set_busy(False)
def get_user_info_by_id(self, user_id): def get_user_info_by_id(self, user_id):
try: try:
@@ -694,20 +895,31 @@ class VkChatManager(QMainWindow):
return return
results = [] results = []
for chat in selected_chats: total = len(selected_chats) * len(user_infos)
for user_id, user_info in user_infos.items(): processed = 0
try: try:
if action_type == "remove": action_label = "исключение" if action_type == "remove" else "приглашение"
self.vk.messages.removeChatUser(chat_id=chat['id'], member_id=user_id) self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
results.append(f"'{user_info}' исключен из '{chat['title']}'.") for chat in selected_chats:
else: for user_id, user_info in user_infos.items():
params = {'chat_id': chat['id'], 'user_id': user_id} try:
if self.visible_messages_checkbox.isChecked(): if action_type == "remove":
params['visible_messages_count'] = 250 self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
self.vk.messages.addChatUser(**params) results.append(f"'{user_info}' исключен из '{chat['title']}'.")
results.append(f"'{user_info}' приглашен в '{chat['title']}'.") else:
except VkApiError as e: params = {'chat_id': chat['id'], 'user_id': user_id}
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {e}") if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
self._log_error("execute_user_action", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results)) QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear() self.vk_url_input.clear()
@@ -758,24 +970,35 @@ class VkChatManager(QMainWindow):
# 4. Выполнение API запросов # 4. Выполнение API запросов
results = [] results = []
for chat in selected_chats: total = len(selected_chats) * len(user_infos)
# VK API требует peer_id. Для чатов это 2000000000 + local_id processed = 0
try: try:
peer_id = 2000000000 + int(chat['id']) self._set_busy(True, f"Статус: назначение админов (0/{total})...")
except ValueError: for chat in selected_chats:
results.append(f"✗ Ошибка ID чата: {chat['id']}") # VK API требует peer_id. Для чатов это 2000000000 + local_id
continue
for user_id, user_info in user_infos.items():
try: try:
self.vk.messages.setMemberRole( peer_id = 2000000000 + int(chat['id'])
peer_id=peer_id, except ValueError:
member_id=user_id, results.append(f"✗ Ошибка ID чата: {chat['id']}")
role="admin" continue
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.") for user_id, user_info in user_infos.items():
except VkApiError as e: try:
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {e}") self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
self._log_error("set_user_admin", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата # 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results)) QMessageBox.information(self, "Результаты назначения", "\n".join(results))
@@ -787,6 +1010,15 @@ class VkChatManager(QMainWindow):
if __name__ == "__main__": if __name__ == "__main__":
if "--auth" in sys.argv:
try:
idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2]
except Exception:
sys.exit(1)
auth_webview.main_auth(auth_url, output_path)
sys.exit(0)
app = QApplication(sys.argv) app = QApplication(sys.argv)
app.setStyle("Fusion") app.setStyle("Fusion")
app.setPalette(app.style().standardPalette()) app.setPalette(app.style().standardPalette())
@@ -798,4 +1030,4 @@ if __name__ == "__main__":
window = VkChatManager() window = VkChatManager()
window.show() window.show()
sys.exit(app.exec()) sys.exit(app.exec())

View File

@@ -1,2 +1,3 @@
PySide6~=6.9.1 PySide6~=6.10.2
vk-api~=11.9.9 vk-api~=11.9.9
pywebview