diff --git a/app_version.py b/app_version.py
index ea1631a..186d4cf 100644
--- a/app_version.py
+++ b/app_version.py
@@ -1 +1 @@
-APP_VERSION = "1.7.0"
+APP_VERSION = "1.7.1"
diff --git a/main.py b/main.py
index 8ab6f33..8425bdb 100644
--- a/main.py
+++ b/main.py
@@ -1,18 +1,10 @@
import sys
-import base64
-import ctypes
-import shutil
import json
import time
+import shutil
import auth_webview
import os
-import re
-import hashlib
-import subprocess
import threading
-import tempfile
-import urllib.request
-import zipfile
from app_version import APP_VERSION
from services import (
AutoUpdateService,
@@ -29,15 +21,13 @@ from ui.main_window import instructions_text
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
QTextBrowser, QScrollArea, QCheckBox, QHBoxLayout,
- QSizePolicy, QDialog, QTextEdit, QTabWidget, QDialogButtonBox,
+ QSizePolicy, QTabWidget,
QProgressBar)
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QDesktopServices
-from urllib.parse import parse_qs, unquote
from vk_api.exceptions import VkApiError
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import QProcess
-from ctypes import wintypes
# --- Управление токенами и настройками ---
APP_DATA_DIR = os.path.join(QStandardPaths.writableLocation(QStandardPaths.AppDataLocation), "AnabasisVKChatManager")
@@ -55,75 +45,6 @@ UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_REQUEST_TIMEOUT = 8
-class _DataBlob(ctypes.Structure):
- _fields_ = [("cbData", wintypes.DWORD), ("pbData", ctypes.POINTER(ctypes.c_byte))]
-
-
-_crypt32 = None
-_kernel32 = None
-if os.name == "nt":
- _crypt32 = ctypes.WinDLL("crypt32", use_last_error=True)
- _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
- _crypt32.CryptProtectData.argtypes = [
- ctypes.POINTER(_DataBlob),
- wintypes.LPCWSTR,
- ctypes.POINTER(_DataBlob),
- ctypes.c_void_p,
- ctypes.c_void_p,
- wintypes.DWORD,
- ctypes.POINTER(_DataBlob),
- ]
- _crypt32.CryptProtectData.restype = wintypes.BOOL
- _crypt32.CryptUnprotectData.argtypes = [
- ctypes.POINTER(_DataBlob),
- ctypes.POINTER(wintypes.LPWSTR),
- ctypes.POINTER(_DataBlob),
- ctypes.c_void_p,
- ctypes.c_void_p,
- wintypes.DWORD,
- ctypes.POINTER(_DataBlob),
- ]
- _crypt32.CryptUnprotectData.restype = wintypes.BOOL
-
-
-def _crypt_protect_data(data, description=""):
- buffer = ctypes.create_string_buffer(data)
- data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
- data_out = _DataBlob()
- if not _crypt32.CryptProtectData(ctypes.byref(data_in), description, None, None, None, 0, ctypes.byref(data_out)):
- raise ctypes.WinError(ctypes.get_last_error())
- try:
- return ctypes.string_at(data_out.pbData, data_out.cbData)
- finally:
- _kernel32.LocalFree(data_out.pbData)
-
-
-def _crypt_unprotect_data(data):
- buffer = ctypes.create_string_buffer(data)
- data_in = _DataBlob(len(data), ctypes.cast(buffer, ctypes.POINTER(ctypes.c_byte)))
- data_out = _DataBlob()
- if not _crypt32.CryptUnprotectData(ctypes.byref(data_in), None, None, None, None, 0, ctypes.byref(data_out)):
- raise ctypes.WinError(ctypes.get_last_error())
- try:
- return ctypes.string_at(data_out.pbData, data_out.cbData)
- finally:
- _kernel32.LocalFree(data_out.pbData)
-
-
-def _encrypt_token(token):
- if os.name != "nt":
- raise RuntimeError("DPAPI is available only on Windows.")
- encrypted_bytes = _crypt_protect_data(token.encode("utf-8"))
- return base64.b64encode(encrypted_bytes).decode("ascii")
-
-
-def _decrypt_token(token_data):
- if os.name != "nt":
- raise RuntimeError("DPAPI is available only on Windows.")
- encrypted_bytes = base64.b64decode(token_data.encode("ascii"))
- decrypted_bytes = _crypt_unprotect_data(encrypted_bytes)
- return decrypted_bytes.decode("utf-8")
-
def get_resource_path(relative_path):
""" Получает абсолютный путь к ресурсу, работает для скрипта и для .exe """
if hasattr(sys, '_MEIPASS'): # Для PyInstaller (на всякий случай)
@@ -131,98 +52,6 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path)
-def save_token(token, expires_in=0):
- """Сохраняет токен. Если expires_in=0, токен считается бессрочным."""
- try:
- expires_in = int(expires_in)
- except (ValueError, TypeError):
- expires_in = 0
-
- os.makedirs(APP_DATA_DIR, exist_ok=True)
-
- # ИСПРАВЛЕНИЕ: если expires_in равно 0, то и время истечения ставим 0
- expiration_time = (time.time() + expires_in) if expires_in > 0 else 0
-
- stored_token = token
- encrypted = False
- if os.name == "nt":
- try:
- stored_token = _encrypt_token(token)
- encrypted = True
- except Exception as e:
- print(f"Ошибка шифрования токена: {e}")
-
- data = {
- "token": stored_token,
- "expiration_time": expiration_time,
- "encrypted": encrypted
- }
-
- try:
- with open(TOKEN_FILE, "w") as f:
- json.dump(data, f)
-
- status = "Бессрочно" if expiration_time == 0 else QDateTime.fromSecsSinceEpoch(int(expiration_time)).toString()
- print(f"Токен сохранен. Срок действия: {status}")
- return expiration_time
- except IOError as e:
- print(f"Ошибка сохранения токена: {e}")
- return None
-
-
-def load_token():
- """Загружает токен и проверяет его валидность."""
- try:
- if not os.path.exists(TOKEN_FILE):
- return None, None
-
- with open(TOKEN_FILE, "r") as f:
- data = json.load(f)
-
- token = data.get("token")
- encrypted = data.get("encrypted", False)
- if token and encrypted:
- try:
- token = _decrypt_token(token)
- except Exception as e:
- print(f"Ошибка расшифровки токена: {e}")
- if os.path.exists(TOKEN_FILE):
- os.remove(TOKEN_FILE)
- return None, None
- expiration_time = data.get("expiration_time")
-
- # ИСПРАВЛЕНИЕ: токен валиден, если время истечения 0 ИЛИ оно больше текущего
- if token and (expiration_time == 0 or expiration_time > time.time()):
- return token, expiration_time
- else:
- if os.path.exists(TOKEN_FILE):
- os.remove(TOKEN_FILE)
- return None, None
- except Exception as e:
- print(f"Ошибка загрузки: {e}")
- return None, None
-
-class MultiLinkDialog(QDialog):
- def __init__(self, parent=None):
- super().__init__(parent)
- self.setWindowTitle("Ввод нескольких ссылок")
- self.setMinimumSize(400, 300)
- layout = QVBoxLayout(self)
- label = QLabel("Вставьте ссылки на страницы VK, каждая с новой строки:")
- layout.addWidget(label)
- self.links_text_edit = QTextEdit()
- layout.addWidget(self.links_text_edit)
- button_box = QDialogButtonBox()
- button_box.addButton("ОК", QDialogButtonBox.AcceptRole)
- button_box.addButton("Отмена", QDialogButtonBox.RejectRole)
- button_box.accepted.connect(self.accept)
- button_box.rejected.connect(self.reject)
- layout.addWidget(button_box)
-
- def get_links(self):
- return [line.strip() for line in self.links_text_edit.toPlainText().strip().split('\n') if line.strip()]
-
-
class VkChatManager(QMainWindow):
def __init__(self):
super().__init__()
@@ -389,55 +218,6 @@ class VkChatManager(QMainWindow):
return
self._process_links_list([url])
- def _process_links_list(self, links_list):
- if not self.vk:
- QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
- return
-
- self.user_ids_to_process.clear()
- resolved_ids = []
- failed_links = []
-
- self._set_busy(True, "Статус: Определяю ID...")
- try:
- for link in links_list:
- try:
- path = urlparse(link).path
- screen_name = path.split('/')[-1] if path else ''
- if not screen_name and len(path.split('/')) > 1:
- screen_name = path.split('/')[-2]
-
- if not screen_name:
- failed_links.append(link)
- continue
-
- resolved_object = self._vk_call_with_retry(self.vk.utils.resolveScreenName, screen_name=screen_name)
- if resolved_object and resolved_object.get('type') == 'user':
- resolved_ids.append(resolved_object['object_id'])
- else:
- failed_links.append(link)
- except VkApiError as e:
- if self._handle_vk_api_error("resolveScreenName", e, action_name="получения ID пользователей"):
- return
- failed_links.append(f"{link} ({self._format_vk_error(e)})")
- except Exception:
- failed_links.append(link)
- finally:
- self._set_busy(False)
-
- self.user_ids_to_process = resolved_ids
-
- status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользовател(ем/ями)."
- if len(links_list) > 1:
- self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
-
- if failed_links:
- QMessageBox.warning(self, "Ошибка получения ID",
- f"Не удалось получить ID для следующих ссылок:\n" + "\n".join(failed_links))
-
- self.status_label.setText(status_message)
- self.set_ui_state(self.token is not None)
-
def create_menu(self):
"""Создает верхнее меню."""
menu_bar = self.menuBar()
@@ -464,6 +244,44 @@ class VkChatManager(QMainWindow):
tools_menu.addAction(logout_action)
self.logout_action = logout_action
+ help_menu = menu_bar.addMenu("Справка")
+ about_action = QAction("О приложении", self)
+ about_action.setStatusTip("Показать информацию о приложении")
+ about_action.triggered.connect(self.show_about_dialog)
+ help_menu.addAction(about_action)
+ self.about_action = about_action
+
+ def show_about_dialog(self):
+ message_box = QMessageBox(self)
+ message_box.setWindowTitle("О приложении")
+ message_box.setIcon(QMessageBox.Information)
+ message_box.setTextFormat(Qt.RichText)
+
+ repo_url = self.update_repository_url
+ if repo_url:
+ repo_html = f'{repo_url}'
+ else:
+ repo_html = "не указан"
+
+ message_box.setText(
+ (
+ f"Anabasis Chat Manager
"
+ f"Версия: {APP_VERSION}
"
+ "Инструмент для массового управления пользователями в чатах VK.
"
+ "Поддерживается проверка обновлений и автообновление Windows-сборки.
"
+ f"Репозиторий: {repo_html}"
+ )
+ )
+
+ # QMessageBox не имеет setOpenExternalLinks, настраиваем его внутренний QLabel.
+ for label in message_box.findChildren(QLabel):
+ if "href=" in label.text():
+ label.setTextInteractionFlags(Qt.TextBrowserInteraction)
+ label.setOpenExternalLinks(True)
+ break
+
+ message_box.exec()
+
def create_chat_tab(self):
# This implementation correctly creates a scrollable area for chat lists.
tab_content_widget = QWidget()
@@ -572,180 +390,6 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text)
- def _download_update_archive(self, download_url, destination_path):
- request = urllib.request.Request(
- download_url,
- headers={"User-Agent": "AnabasisManager-Updater"},
- )
- with urllib.request.urlopen(request, timeout=60) as response:
- with open(destination_path, "wb") as f:
- shutil.copyfileobj(response, f)
-
- def _download_update_text(self, url):
- request = urllib.request.Request(
- url,
- headers={"User-Agent": "AnabasisManager-Updater"},
- )
- with urllib.request.urlopen(request, timeout=30) as response:
- return response.read().decode("utf-8", errors="replace")
-
- @staticmethod
- def _sha256_file(path):
- digest = hashlib.sha256()
- with open(path, "rb") as f:
- for chunk in iter(lambda: f.read(1024 * 1024), b""):
- digest.update(chunk)
- return digest.hexdigest().lower()
-
- @staticmethod
- def _extract_sha256_from_text(checksum_text, target_file_name):
- target = (target_file_name or "").strip().lower()
- for raw_line in checksum_text.splitlines():
- line = raw_line.strip()
- if not line:
- continue
- match = re.search(r"\b([A-Fa-f0-9]{64})\b", line)
- if not match:
- continue
- checksum = match.group(1).lower()
- if not target:
- return checksum
- line_lower = line.lower()
- if target in line_lower:
- return checksum
- if os.path.basename(target) in line_lower:
- return checksum
- return ""
-
- def _verify_update_checksum(self, zip_path, checksum_url, download_name):
- if not checksum_url:
- raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.")
- checksum_text = self._download_update_text(checksum_url)
- expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path))
- if not expected_hash:
- raise RuntimeError("Не удалось найти SHA256 для архива обновления.")
- actual_hash = self._sha256_file(zip_path)
- if actual_hash != expected_hash:
- raise RuntimeError("SHA256 не совпадает, обновление отменено.")
-
- def _locate_extracted_root(self, extracted_dir):
- entries = []
- for name in os.listdir(extracted_dir):
- full_path = os.path.join(extracted_dir, name)
- if os.path.isdir(full_path):
- entries.append(full_path)
- if len(entries) == 1:
- candidate = entries[0]
- if os.path.exists(os.path.join(candidate, "AnabasisManager.exe")):
- return candidate
- return extracted_dir
-
- def _build_update_script(self, app_dir, source_dir, exe_name, target_pid):
- script_path = os.path.join(tempfile.gettempdir(), "anabasis_apply_update.cmd")
- script_lines = [
- "@echo off",
- "setlocal",
- f"set APP_DIR={app_dir}",
- f"set SRC_DIR={source_dir}",
- f"set EXE_NAME={exe_name}",
- f"set TARGET_PID={target_pid}",
- "set BACKUP_DIR=%TEMP%\\anabasis_backup_%RANDOM%%RANDOM%",
- ":wait_for_exit",
- "tasklist /FI \"PID eq %TARGET_PID%\" | find \"%TARGET_PID%\" >nul",
- "if %ERRORLEVEL% EQU 0 (",
- " timeout /t 1 /nobreak >nul",
- " goto :wait_for_exit",
- ")",
- "timeout /t 1 /nobreak >nul",
- "mkdir \"%BACKUP_DIR%\" >nul 2>&1",
- "robocopy \"%APP_DIR%\" \"%BACKUP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
- "set RC=%ERRORLEVEL%",
- "if %RC% GEQ 8 goto :backup_error",
- "robocopy \"%SRC_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:12 /W:2 >nul",
- "set RC=%ERRORLEVEL%",
- "if %RC% GEQ 8 goto :rollback",
- "start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
- "timeout /t 2 /nobreak >nul",
- "tasklist /FI \"IMAGENAME eq %EXE_NAME%\" | find /I \"%EXE_NAME%\" >nul",
- "if %ERRORLEVEL% NEQ 0 goto :rollback",
- "rmdir /S /Q \"%BACKUP_DIR%\" >nul 2>&1",
- "exit /b 0",
- ":rollback",
- "robocopy \"%BACKUP_DIR%\" \"%APP_DIR%\" /E /NFL /NDL /NJH /NJS /NP /R:6 /W:2 >nul",
- "start \"\" \"%APP_DIR%\\%EXE_NAME%\"",
- "echo Auto-update failed. Rollback executed. > \"%APP_DIR%\\update_error.log\"",
- "exit /b 2",
- ":backup_error",
- "echo Auto-update failed during backup. Code %RC% > \"%APP_DIR%\\update_error.log\"",
- "exit /b %RC%",
- ]
- with open(script_path, "w", encoding="utf-8", newline="\r\n") as f:
- f.write("\r\n".join(script_lines) + "\r\n")
- return script_path
-
- def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""):
- if os.name != "nt":
- QMessageBox.information(
- self,
- "Автообновление",
- "Автообновление пока поддерживается только в Windows-сборке.",
- )
- return False
- if not getattr(sys, "frozen", False):
- QMessageBox.information(
- self,
- "Автообновление",
- "Автообновление доступно в собранной версии приложения (.exe).",
- )
- return False
- if not download_url:
- QMessageBox.warning(self, "Автообновление", "В релизе нет ссылки на файл для обновления.")
- return False
-
- self.status_label.setText(f"Статус: загрузка обновления {latest_version}...")
- self._set_busy(True)
- work_dir = tempfile.mkdtemp(prefix="anabasis_update_")
- zip_path = os.path.join(work_dir, "update.zip")
- unpack_dir = os.path.join(work_dir, "extracted")
- try:
- self._download_update_archive(download_url, zip_path)
- self._verify_update_checksum(zip_path, checksum_url, download_name)
- os.makedirs(unpack_dir, exist_ok=True)
- with zipfile.ZipFile(zip_path, "r") as archive:
- archive.extractall(unpack_dir)
-
- source_dir = self._locate_extracted_root(unpack_dir)
- app_exe = sys.executable
- app_dir = os.path.dirname(app_exe)
- exe_name = os.path.basename(app_exe)
- script_path = self._build_update_script(app_dir, source_dir, exe_name, os.getpid())
-
- creation_flags = 0
- if hasattr(subprocess, "CREATE_NEW_PROCESS_GROUP"):
- creation_flags |= subprocess.CREATE_NEW_PROCESS_GROUP
- if hasattr(subprocess, "DETACHED_PROCESS"):
- creation_flags |= subprocess.DETACHED_PROCESS
-
- subprocess.Popen(
- ["cmd.exe", "/c", script_path],
- cwd=work_dir,
- creationflags=creation_flags,
- )
- self._log_event("auto_update", f"Update {latest_version} started from {download_url}")
- QMessageBox.information(
- self,
- "Обновление запущено",
- "Обновление скачано. Приложение будет перезапущено.",
- )
- QApplication.instance().quit()
- return True
- except Exception as e:
- self._log_event("auto_update_failed", str(e), level="ERROR")
- QMessageBox.warning(self, "Автообновление", f"Не удалось выполнить автообновление: {e}")
- return False
- finally:
- self._set_busy(False)
-
def setup_token_timer(self):
self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -1182,83 +826,6 @@ class VkChatManager(QMainWindow):
def _vk_call_with_retry(self, func, *args, **kwargs):
return self.vk_service.call_with_retry(func, *args, **kwargs)
- def load_chats(self):
- self._clear_chat_tabs()
-
- # Get the checkbox layouts from each tab
- layouts = [
- self.office_tab.findChild(QWidget).findChild(QVBoxLayout),
- self.retail_tab.findChild(QWidget).findChild(QVBoxLayout),
- self.warehouse_tab.findChild(QWidget).findChild(QVBoxLayout),
- self.coffee_tab.findChild(QWidget).findChild(QVBoxLayout),
- self.other_tab.findChild(QWidget).findChild(QVBoxLayout)
- ]
-
- try:
- self._set_busy(True, "Статус: загрузка чатов...")
- conversations = []
- start_from = None
- seen_start_tokens = set()
- while True:
- params = {"count": 200, "filter": "all"}
- if start_from:
- if start_from in seen_start_tokens:
- break
- params["start_from"] = start_from
- seen_start_tokens.add(start_from)
-
- response = self._vk_call_with_retry(self.vk.messages.getConversations, **params)
- page_items = response.get("items", [])
- if not page_items:
- break
- conversations.extend(page_items)
-
- start_from = response.get("next_from")
- if not start_from:
- break
- for conv in conversations:
- if conv['conversation']['peer']['type'] == 'chat':
- chat_id = conv['conversation']['peer']['local_id']
- title = conv['conversation']['chat_settings']['title']
- self.chats.append({'id': chat_id, 'title': title})
-
- checkbox = QCheckBox(f"{title} (id: {chat_id})")
- checkbox.setProperty("chat_id", chat_id)
-
- # Insert checkbox at the top of the layout (before the stretch)
- if "AG офис" in title:
- layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
- self.office_chat_checkboxes.append(checkbox)
- elif "AG розница" in title:
- layouts[1].insertWidget(layouts[1].count() - 1, checkbox)
- self.retail_chat_checkboxes.append(checkbox)
- elif "AG склад" in title:
- layouts[2].insertWidget(layouts[2].count() - 1, checkbox)
- self.warehouse_chat_checkboxes.append(checkbox)
- elif "AG кофейни" in title:
- layouts[3].insertWidget(layouts[3].count() - 1, checkbox)
- self.coffee_chat_checkboxes.append(checkbox)
- else:
- layouts[4].insertWidget(layouts[4].count() - 1, checkbox)
- self.other_chat_checkboxes.append(checkbox)
-
- self.chat_tabs.setTabText(0, f"AG Офис ({len(self.office_chat_checkboxes)})")
- self.chat_tabs.setTabText(1, f"AG Розница ({len(self.retail_chat_checkboxes)})")
- self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
- self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
- self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
- except VkApiError as e:
- if self._handle_vk_api_error(
- "load_chats",
- e,
- action_name="загрузки чатов",
- ):
- return
- QMessageBox.critical(self, "Ошибка", f"Не удалось загрузить чаты: {self._format_vk_error(e)}")
- self.set_ui_state(False)
- finally:
- self._set_busy(False)
-
def get_user_info_by_id(self, user_id):
try:
user = self.vk.users.get(user_ids=user_id)[0]