2 Commits

Author SHA1 Message Date
aad6e8c5af feat(auth): pywebview вместо webengine
- добавлен auth_webview.py и режим --auth

- build.py обновлён, WebEngine исключён

- pywebview добавлен в requirements
2026-02-04 00:02:32 +03:00
4629037890 feat(app): улучшения UX, логирование и безопасность 2026-02-03 22:26:41 +03:00
4 changed files with 538 additions and 216 deletions

78
auth_webview.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import sys
import time
import json
import threading
from urllib.parse import urlparse, parse_qs, unquote
import webview
def extract_token(url_string):
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
return token, expires_in
def main_auth(auth_url, output_path):
def poll_url():
try:
url = window.get_current_url()
except Exception:
url = None
if url:
token, expires_in = extract_token(url)
if token:
data = {"token": token, "expires_in": expires_in}
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f)
window.destroy()
return
threading.Timer(0.5, poll_url).start()
def on_loaded():
threading.Timer(0.5, poll_url).start()
window = webview.create_window("VK Авторизация", auth_url)
window.events.loaded += on_loaded
storage_path = os.path.join(os.path.dirname(output_path), "webview_profile")
webview.start(private_mode=False, storage_path=storage_path)
if __name__ == "__main__":
main()

View File

@@ -5,11 +5,28 @@ import sys
# --- Конфигурация ---
APP_NAME = "AnabasisManager"
VERSION = "1.3" # Ваша версия
VERSION = "1.5" # Ваша версия
MAIN_SCRIPT = "main.py"
ICON_PATH = "icon.ico"
DIST_DIR = os.path.join("dist", APP_NAME)
ARCHIVE_NAME = f"{APP_NAME}-{VERSION}" # Формат Название-Версия
SAFE_CLEAN_ROOT_FILES = {"main.py", "requirements.txt", "build.py"}
REMOVE_LIST = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
"Qt6Charts.dll", "Qt6Multimedia.dll", "Qt63DCore.dll",
"translations",
"Qt6QuickTemplates2.dll"
]
def ensure_project_root():
missing = [name for name in SAFE_CLEAN_ROOT_FILES if not os.path.exists(name)]
if missing:
print("[ERROR] Скрипт нужно запускать из корня проекта.")
print(f"[ERROR] Не найдены: {', '.join(missing)}")
sys.exit(1)
def run_build():
@@ -20,12 +37,14 @@ def run_build():
"--noconfirm",
"--onedir",
"--windowed",
"--exclude-module", "PySide6.QtWebEngineCore",
"--exclude-module", "PySide6.QtWebEngineWidgets",
"--exclude-module", "PySide6.QtWebEngineQuick",
f"--name={APP_NAME}",
f"--icon={ICON_PATH}" if os.path.exists(ICON_PATH) else "",
f"--add-data={ICON_PATH}{os.pathsep}." if os.path.exists(ICON_PATH) else "",
"--collect-all", "PySide6.QtWebEngineCore",
"--collect-all", "PySide6.QtWebEngineWidgets",
MAIN_SCRIPT
f"--add-data=auth_webview.py{os.pathsep}.",
MAIN_SCRIPT
]
command = [arg for arg in command if arg]
@@ -46,16 +65,7 @@ def run_cleanup():
if not os.path.exists(pyside_path):
pyside_path = DIST_DIR
to_remove = [
"Qt6Pdf.dll", "Qt6PdfQuick.dll", "Qt6PdfWidgets.dll",
"Qt6VirtualKeyboard.dll", "Qt6Positioning.dll",
"Qt6PrintSupport.dll", "Qt6Svg.dll", "Qt6Sql.dll",
"Qt6Charts.dll", "Qt6Multimedia.dll", "Qt63DCore.dll",
"translations",
"Qt6QuickTemplates2.dll"
]
for item in to_remove:
for item in REMOVE_LIST:
path = os.path.join(pyside_path, item)
if os.path.exists(path):
try:
@@ -81,6 +91,7 @@ def create_archive():
if __name__ == "__main__":
ensure_project_root()
# Предварительная очистка
for folder in ["build", "dist"]:
if os.path.exists(folder):
@@ -91,6 +102,6 @@ if __name__ == "__main__":
create_archive()
print("\n" + "=" * 30)
print(f"ПРОЦЕСС ЗАВЕРШЕН")
print("ПРОЦЕСС ЗАВЕРШЕН")
print(f"Файл для отправки: dist/{ARCHIVE_NAME}.zip")
print("=" * 30)
print("=" * 30)

630
main.py
View File

@@ -1,7 +1,12 @@
import sys
import base64
import ctypes
import shutil
import subprocess
from vk_api import VkApi
import json
import time
import auth_webview
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
@@ -9,16 +14,89 @@ from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox)
from PySide6.QtCore import Qt, QUrl, QDateTime, Signal, QTimer
from PySide6.QtGui import QIcon, QAction
from PySide6.QtWebEngineWidgets import QWebEngineView
from PySide6.QtWebEngineCore import QWebEnginePage, QWebEngineProfile
from urllib.parse import urlparse, parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from ctypes import wintypes
# --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
TOKEN_FILE = os.path.join(APP_DATA_DIR, "token.json")
WEB_ENGINE_CACHE_DIR = os.path.join(APP_DATA_DIR, "web_engine_cache")
CACHE_CLEANUP_MARKER = os.path.join(APP_DATA_DIR, "pending_cache_cleanup")
LOG_FILE = os.path.join(APP_DATA_DIR, "app.log")
LOG_MAX_BYTES = 1024 * 1024 # 1 MB
LOG_BACKUP_FILE = os.path.join(APP_DATA_DIR, "app.log.1")
class _DataBlob(ctypes.Structure):
_fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
_crypt32 = None
_kernel32 = None
if os.name == "nt":
_crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
_kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
_crypt32.CryptProtectData.argtypes = [
ctypes.POINTER(_DataBlob),
wintypes.LPCWSTR,
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptProtectData.restype = wintypes.BOOL
_crypt32.CryptUnprotectData.argtypes = [
ctypes.POINTER(_DataBlob),
ctypes.POINTER(wintypes.LPWSTR),
ctypes.POINTER(_DataBlob),
ctypes.c_void_p,
ctypes.c_void_p,
wintypes.DWORD,
ctypes.POINTER(_DataBlob),
]
_crypt32.CryptUnprotectData.restype = wintypes.BOOL
def _crypt_protect_data(data, description=""):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _crypt_unprotect_data(data):
buffer = ctypes.create_string_buffer(data)
data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
data_out = _DataBlob()
if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
raise ctypes.WinError(ctypes.get_last_error())
try:
return ctypes.string_at(data_out.pbData, data_out.cbData)
finally:
_kernel32.LocalFree(data_out.pbData)
def _encrypt_token(token):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
return base64.b64encode(encrypted_bytes).decode("ascii")
def _decrypt_token(token_data):
if os.name != "nt":
raise RuntimeError("DPAPI is available only on Windows.")
encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
return decrypted_bytes.decode("utf-8")
def get_resource_path(relative_path):
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
@@ -39,9 +117,19 @@ def save_token(token, expires_in=0):
# ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
stored_token = token
encrypted = False
if os.name == "nt":
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception as e:
print(f"Ошибка шифрования токена: {e}")
data = {
"token": token,
"expiration_time": expiration_time
"token": stored_token,
"expiration_time": expiration_time,
"encrypted": encrypted
}
try:
@@ -66,6 +154,15 @@ def load_token():
data = json.load(f)
token = data.get("token")
encrypted = data.get("encrypted", False)
if token and encrypted:
try:
token = _decrypt_token(token)
except Exception as e:
print(f"Ошибка расшифровки токена: {e}")
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
return None, None
expiration_time = data.get("expiration_time")
# ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
@@ -79,142 +176,6 @@ def load_token():
print(f"Ошибка загрузки: {e}")
return None, None
class WebEnginePage(QWebEnginePage):
"""
Класс для обработки навигационных запросов в QWebEngineView,
специально для извлечения токена авторизации VK.
"""
# Добавлен параметр profile в конструктор, чтобы использовать пользовательский профиль
def __init__(self, profile=None, parent=None, browser_window_instance=None):
super().__init__(profile, parent) # Передаем profile в базовый класс
self.parent_browser_window = browser_window_instance
self.token_extracted = False
def acceptNavigationRequest(self, url, _type, isMainFrame):
"""
Переопределенный метод для перехвата URL-адреса, содержащего токен доступа.
"""
url_string = url.toString()
if "access_token" in url_string and not self.token_extracted:
self.token_extracted = True
if self.parent_browser_window:
self.parent_browser_window.process_auth_url(url_string)
return False
return super().acceptNavigationRequest(url, _type, isMainFrame)
class AuthBrowserWindow(QDialog):
"""
Отдельное окно-диалог для проведения OAuth авторизации VK.
"""
token_extracted_signal = Signal(str, int)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Авторизация VK")
self.setGeometry(350, 350, 800, 600)
layout = QVBoxLayout(self)
self.browser = QWebEngineView()
# Настройка QWebEngineProfile для сохранения куки и других данных
os.makedirs(WEB_ENGINE_CACHE_DIR, exist_ok=True)
# ИСПРАВЛЕНО: Сделать profile атрибутом экземпляра и дать ему self как родителя
self.profile = QWebEngineProfile("AnabasisVKWebProfile", self)
self.profile.setPersistentCookiesPolicy(QWebEngineProfile.AllowPersistentCookies)
self.profile.setPersistentStoragePath(WEB_ENGINE_CACHE_DIR)
# Создаем страницу с настроенным профилем
self.browser.page = WebEnginePage(profile=self.profile, parent=self.browser, browser_window_instance=self)
self.browser.setPage(self.browser.page)
layout.addWidget(self.browser)
self.status_label = QLabel("Ожидание авторизации...")
self.status_label.setAlignment(Qt.AlignCenter)
layout.addWidget(self.status_label)
def start_auth_flow(self):
"""Запускает OAuth авторизацию VK в этом окне."""
auth_url = (
"https://oauth.vk.com/authorize?"
"client_id=2685278&"
"display=page&"
"redirect_uri=https://oauth.vk.com/blank.html&"
"scope=1073737727&"
"response_type=token&"
"v=5.131"
)
self.browser.setUrl(QUrl(auth_url))
self.status_label.setText("Пожалуйста, войдите в VK и разрешите доступ...")
def process_auth_url(self, url_string):
"""
Извлекает токен доступа из URL перенаправления и испускает сигнал.
"""
token = None
expires_in = 3600
parsed = urlparse(url_string)
if parsed.fragment:
params = parse_qs(parsed.fragment)
else:
params = parse_qs(parsed.query)
if 'access_token' in params:
token = params['access_token'][0]
if 'expires_in' in params:
try:
expires_in = int(params['expires_in'][0])
except ValueError:
pass
if not token:
start_marker = "access_token%253D"
end_marker = "%25"
start_index = url_string.find(start_marker)
if start_index != -1:
token_start_index = start_index + len(start_marker)
remaining_url = url_string[token_start_index:]
end_index = remaining_url.find(end_marker)
if end_index != -1:
raw_token = remaining_url[:end_index]
else:
amp_index = remaining_url.find('&')
if amp_index != -1:
raw_token = remaining_url[:amp_index]
else:
raw_token = remaining_url
token = unquote(raw_token)
if token:
self.token_extracted_signal.emit(token, expires_in)
self.accept()
else:
QMessageBox.warning(self, "Ошибка Авторизации",
"Не удалось получить токен. Проверьте URL или попробуйте еще раз.")
self.reject()
def closeEvent(self, event):
"""
Переопределенный метод для обработки закрытия окна.
Выполняет очистку ресурсов QWebEngine, чтобы избежать предупреждения
"Release of profile requested but WebEnginePage still not deleted. Expect troubles!".
"""
current_page = self.browser.page()
if current_page:
# Снимаем страницу с QWebEngineView, чтобы View перестала ей владеть
self.browser.setPage(None)
# Планируем удаление объекта страницы
current_page.deleteLater()
# Вызываем метод базового класса для корректного закрытия диалога
super().closeEvent(event)
class MultiLinkDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
@@ -253,12 +214,16 @@ class VkChatManager(QMainWindow):
self.vk_session = None
self.vk = None
self.user_ids_to_process = []
self._busy = False
self.suppress_resolve = False
self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True)
self.resolve_timer.setInterval(750)
self.resolve_timer.timeout.connect(self.resolve_single_user_id_from_input)
self._cleanup_cache_if_needed()
self._ensure_log_dir()
self.init_ui()
self.load_saved_token_on_startup()
self.setup_token_timer()
@@ -356,6 +321,8 @@ class VkChatManager(QMainWindow):
self.set_ui_state(False)
def on_vk_url_input_changed(self, text):
if self.suppress_resolve:
return
if self.vk_url_input.hasFocus():
self.resolve_timer.start()
@@ -364,7 +331,7 @@ class VkChatManager(QMainWindow):
if dialog.exec():
links = dialog.get_links()
if links:
self.vk_url_input.clear()
self._set_vk_url_input_text("")
self._process_links_list(links)
else:
QMessageBox.information(self, "Информация", "Список ссылок пуст.")
@@ -387,33 +354,37 @@ class VkChatManager(QMainWindow):
resolved_ids = []
failed_links = []
self.status_label.setText("Статус: Определяю ID...")
QApplication.processEvents()
self._set_busy(True, "Статус: Определяю ID...")
try:
for link in links_list:
try:
path = urlparse(link).path
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
for link in links_list:
try:
path = urlparse(link).path
screen_name = path.split('/')[-1] if path else ''
if not screen_name and len(path.split('/')) > 1:
screen_name = path.split('/')[-2]
if not screen_name:
failed_links.append(link)
continue
if not screen_name:
resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except VkApiError as e:
self._log_error("resolveScreenName", e)
failed_links.append(f"{link} ({self._format_vk_error(e)})")
except Exception:
failed_links.append(link)
continue
resolved_object = self.vk.utils.resolveScreenName(screen_name=screen_name)
if resolved_object and resolved_object.get('type') == 'user':
resolved_ids.append(resolved_object['object_id'])
else:
failed_links.append(link)
except Exception:
failed_links.append(link)
finally:
self._set_busy(False)
self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
if len(links_list) > 1:
self.vk_url_input.setText(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
if failed_links:
QMessageBox.warning(self, "Ошибка получения ID",
@@ -435,6 +406,11 @@ class VkChatManager(QMainWindow):
make_admin_action.triggered.connect(self.set_user_admin)
tools_menu.addAction(make_admin_action)
logout_action = QAction("Выйти и очистить", self)
logout_action.setStatusTip("Выйти, удалить токен и кэш")
logout_action.triggered.connect(self.logout_and_clear)
tools_menu.addAction(logout_action)
def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists.
tab_content_widget = QWidget()
@@ -511,7 +487,150 @@ class VkChatManager(QMainWindow):
if not authorized:
self.user_ids_to_process.clear()
self.vk_url_input.clear()
self._set_vk_url_input_text("")
self._clear_chat_tabs()
def _set_busy(self, busy, status_text=None):
if status_text:
self.status_label.setText(status_text)
if busy:
self._busy = True
QApplication.setOverrideCursor(Qt.WaitCursor)
for widget in [
self.refresh_chats_btn, self.select_all_btn, self.deselect_all_btn,
self.vk_url_input, self.multi_link_btn, self.visible_messages_checkbox,
self.remove_user_btn, self.add_user_btn
]:
widget.setEnabled(False)
else:
self._busy = False
QApplication.restoreOverrideCursor()
if self.token is None:
self.set_ui_state(False)
else:
self.set_ui_state(True)
def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True)
def _log_error(self, context, exc):
try:
os.makedirs(APP_DATA_DIR, exist_ok=True)
self._rotate_log_if_needed()
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
message = self._format_vk_error(exc)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {context}: {message}\n")
except Exception:
pass
def _rotate_log_if_needed(self):
try:
if not os.path.exists(LOG_FILE):
return
if os.path.getsize(LOG_FILE) < LOG_MAX_BYTES:
return
if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception:
pass
def _format_vk_error(self, exc):
error = getattr(exc, "error", None)
code = None
message = str(exc)
if isinstance(error, dict):
code = error.get("error_code")
message = error.get("error_msg") or message
hints = {
5: "Ошибка авторизации. Проверьте токен.",
6: "Слишком много запросов. Подождите и повторите.",
7: "Недостаточно прав.",
9: "Слишком много однотипных действий.",
10: "Внутренняя ошибка VK. Повторите позже.",
15: "Доступ запрещен.",
100: "Некорректный параметр запроса.",
113: "Неверный идентификатор пользователя.",
200: "Доступ к чату запрещен.",
}
if code in hints:
message = f"{message} ({hints[code]})"
if code is not None:
return f"[{code}] {message}"
return message
def _set_vk_url_input_text(self, text):
self.suppress_resolve = True
try:
self.vk_url_input.blockSignals(True)
self.vk_url_input.setText(text)
finally:
self.vk_url_input.blockSignals(False)
self.suppress_resolve = False
def logout_and_clear(self):
confirm = QMessageBox.question(
self,
"Подтверждение выхода",
"Вы уверены, что хотите выйти и удалить сохраненный токен и кэш?",
QMessageBox.Yes | QMessageBox.No
)
if confirm != QMessageBox.Yes:
return
self.token = None
self.token_expiration_time = None
self.vk_session = None
self.vk = None
self.user_ids_to_process.clear()
self._set_vk_url_input_text("")
self.token_input.clear()
self.token_timer_label.setText("Срок действия токена: Н")
self.status_label.setText("Статус: не авторизован")
if self.token_countdown_timer.isActive():
self.token_countdown_timer.stop()
self._clear_chat_tabs()
self.set_ui_state(False)
try:
if os.path.exists(TOKEN_FILE):
os.remove(TOKEN_FILE)
except Exception as e:
print(f"Ошибка удаления токена: {e}")
try:
self._try_remove_web_cache()
except Exception as e:
print(f"Ошибка удаления кэша: {e}")
def _cleanup_cache_if_needed(self):
if os.path.exists(CACHE_CLEANUP_MARKER):
try:
self._try_remove_web_cache()
if os.path.exists(CACHE_CLEANUP_MARKER):
os.remove(CACHE_CLEANUP_MARKER)
except Exception as e:
print(f"Ошибка отложенной очистки кэша: {e}")
def _try_remove_web_cache(self):
if not os.path.exists(WEB_ENGINE_CACHE_DIR):
return
attempts = 5
last_error = None
for _ in range(attempts):
try:
shutil.rmtree(WEB_ENGINE_CACHE_DIR)
last_error = None
break
except Exception as e:
last_error = e
time.sleep(0.2)
if last_error:
os.makedirs(APP_DATA_DIR, exist_ok=True)
with open(CACHE_CLEANUP_MARKER, "w") as f:
f.write("pending")
raise last_error
def load_saved_token_on_startup(self):
loaded_token, expiration_time = load_token()
@@ -529,10 +648,49 @@ class VkChatManager(QMainWindow):
def start_auth(self):
self.status_label.setText("Статус: ожидание авторизации...")
auth_window = AuthBrowserWindow(self)
auth_window.token_extracted_signal.connect(self.handle_new_auth_token)
auth_window.start_auth_flow()
auth_window.exec()
auth_url = (
"https://oauth.vk.com/authorize?"
"client_id=2685278&"
"display=page&"
"redirect_uri=https://oauth.vk.com/blank.html&"
"scope=1073737727&"
"response_type=token&"
"v=5.131"
)
output_path = os.path.join(APP_DATA_DIR, "auth_result.json")
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
cmd = [sys.executable, "--auth", auth_url, output_path]
try:
subprocess.check_call(cmd)
except Exception as e:
self.status_label.setText(f"Статус: ошибка запуска авторизации: {e}")
return
if not os.path.exists(output_path):
self.status_label.setText("Статус: авторизация не удалась")
return
try:
with open(output_path, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("token")
expires_in = data.get("expires_in", 0)
except Exception:
token = None
expires_in = 0
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
self.handle_new_auth_token(token, expires_in)
def handle_new_auth_token(self, token, expires_in):
if not token:
@@ -570,6 +728,26 @@ class VkChatManager(QMainWindow):
checkbox.deleteLater()
chk_list.clear()
def _vk_error_code(self, exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
def _vk_call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
code = self._vk_error_code(e)
if code not in (6, 9, 10) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if code == 9:
delay = max(delay, 1.0)
time.sleep(delay)
def load_chats(self):
self._clear_chat_tabs()
@@ -583,7 +761,27 @@ class VkChatManager(QMainWindow):
]
try:
conversations = self.vk.messages.getConversations(count=200, filter="all")['items']
self._set_busy(True, "Статус: загрузка чатов...")
conversations = []
start_from = None
seen_start_tokens = set()
while True:
params = {"count": 200, "filter": "all"}
if start_from:
if start_from in seen_start_tokens:
break
params["start_from"] = start_from
seen_start_tokens.add(start_from)
response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
page_items = response.get("items", [])
if not page_items:
break
conversations.extend(page_items)
start_from = response.get("next_from")
if not start_from:
break
for conv in conversations:
if conv['conversation']['peer']['type'] == 'chat':
chat_id = conv['conversation']['peer']['local_id']
@@ -616,8 +814,11 @@ class VkChatManager(QMainWindow):
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
except VkApiError as e:
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {e}")
self._log_error("load_chats", e)
QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
self.set_ui_state(False)
finally:
self._set_busy(False)
def get_user_info_by_id(self, user_id):
try:
@@ -694,20 +895,31 @@ class VkChatManager(QMainWindow):
return
results = []
for chat in selected_chats:
for user_id, user_info in user_infos.items():
try:
if action_type == "remove":
self.vk.messages.removeChatUser(chat_id=chat['id'], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self.vk.messages.addChatUser(**params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {e}")
total = len(selected_chats) * len(user_infos)
processed = 0
try:
action_label = "исключение" if action_type == "remove" else "приглашение"
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
for chat in selected_chats:
for user_id, user_info in user_infos.items():
try:
if action_type == "remove":
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
self._log_error("execute_user_action", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
@@ -758,24 +970,35 @@ class VkChatManager(QMainWindow):
# 4. Выполнение API запросов
results = []
for chat in selected_chats:
# VK API требует peer_id. Для чатов это 2000000000 + local_id
try:
peer_id = 2000000000 + int(chat['id'])
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
total = len(selected_chats) * len(user_infos)
processed = 0
try:
self._set_busy(True, f"Статус: назначение админов (0/{total})...")
for chat in selected_chats:
# VK API требует peer_id. Для чатов это 2000000000 + local_id
try:
self.vk.messages.setMemberRole(
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {e}")
peer_id = 2000000000 + int(chat['id'])
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
self._log_error("set_user_admin", e)
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
@@ -787,6 +1010,15 @@ class VkChatManager(QMainWindow):
if __name__ == "__main__":
if "--auth" in sys.argv:
try:
idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2]
except Exception:
sys.exit(1)
auth_webview.main_auth(auth_url, output_path)
sys.exit(0)
app = QApplication(sys.argv)
app.setStyle("Fusion")
app.setPalette(app.style().standardPalette())
@@ -798,4 +1030,4 @@ if __name__ == "__main__":
window = VkChatManager()
window.show()
sys.exit(app.exec())
sys.exit(app.exec())

View File

@@ -1,2 +1,3 @@
PySide6~=6.9.1
PySide6~=6.10.2
vk-api~=11.9.9
pywebview