10 Commits

Author SHA1 Message Date
c77ca4652b fix(ci): update syntax validation test file list
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m54s
2026-02-16 00:37:26 +03:00
4ec24c6d0f chore(version): bump to 2.2.3
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Has been cancelled
2026-02-16 00:35:38 +03:00
72edfffd9e fix(update): harden reentry state and add runtime regression test 2026-02-16 00:35:03 +03:00
db5d901435 test(main): replace brittle smoke checks with AST contracts 2026-02-16 00:31:38 +03:00
cd5e6e1f6b fix(update): prevent repeated check crash and bump to 2.2.2
All checks were successful
Desktop CI / tests (push) Successful in 15s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:59:57 +03:00
4b3347a069 fix(update,security): add release notes in updater and bump to 2.2.1
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:51:44 +03:00
e0628b1792 chore(version): bump to 2.2.0
Some checks are pending
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Has started running
2026-02-15 23:49:04 +03:00
201184700f docs(readme): update install steps and feature list 2026-02-15 23:48:15 +03:00
5253c942e8 fix(core,security): safe update extraction and async bulk vk actions 2026-02-15 23:42:51 +03:00
c645d964bf fix(auth,ui): add auth webview cli entrypoint and bulk action progress bar
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 17s
2026-02-15 23:31:01 +03:00
11 changed files with 572 additions and 175 deletions

View File

@@ -28,7 +28,7 @@ jobs:
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
- name: Run tests
run: |

View File

@@ -13,9 +13,12 @@
* Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед.
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
* Визуальный прогресс-бар по ходу операции.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса.
* **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
---
@@ -47,7 +50,7 @@
3. **Установите зависимости:**
```bash
pip install PySide6 vk_api
pip install -r requirements.txt
```
4. **Запустите приложение:**
@@ -68,6 +71,12 @@
## 📂 Техническая информация
### Последние обновления
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.

View File

@@ -1 +1 @@
APP_VERSION = "2.1.2"
APP_VERSION = "2.2.3"

View File

@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
webview.start(private_mode=False, storage_path=storage_path)
def main():
# Supports both: `python auth_webview.py <auth_url> <output_path>`
# and: `python auth_webview.py --auth <auth_url> <output_path>`
args = sys.argv[1:]
if len(args) == 3 and args[0] == "--auth":
auth_url, output_path = args[1], args[2]
elif len(args) == 2:
auth_url, output_path = args[0], args[1]
else:
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
return 1
main_auth(auth_url, output_path)
return 0
if __name__ == "__main__":
main()
sys.exit(main())

362
main.py
View File

@@ -1,13 +1,12 @@
import json
import json
import os
import shutil
import sys
import threading
import time
from PySide6.QtCore import QProcess
from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer
from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox,
@@ -50,6 +49,7 @@ UPDATE_REPOSITORY = ""
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
def get_resource_path(relative_path):
@@ -59,6 +59,86 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path)
class BulkActionWorker(QObject):
progress = Signal(int, int, str)
finished = Signal(dict)
auth_error = Signal(str, object, str)
failed = Signal(str)
done = Signal()
def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages):
super().__init__()
self.vk_call_with_retry = vk_call_with_retry
self.vk = vk_api
self.action_type = action_type
self.selected_chats = selected_chats
self.user_infos = user_infos
self.visible_messages = visible_messages
self.total = len(self.selected_chats) * len(self.user_infos)
@staticmethod
def _is_auth_error(exc):
return VkService.is_auth_error(exc, str(exc).lower())
def _emit_progress(self, processed):
label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add")
self.progress.emit(processed, self.total, label)
def run(self):
results = []
processed = 0
try:
for chat in self.selected_chats:
peer_id = None
if self.action_type == "admin":
try:
peer_id = 2000000000 + int(chat["id"])
except (ValueError, TypeError):
for _user_id, user_info in self.user_infos.items():
results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})")
processed += 1
self._emit_progress(processed)
continue
for user_id, user_info in self.user_infos.items():
try:
if self.action_type == "remove":
self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
elif self.action_type == "add":
params = {"chat_id": chat["id"], "user_id": user_id}
if self.visible_messages:
params["visible_messages_count"] = 250
self.vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
elif self.action_type == "admin":
self.vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin",
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
else:
raise RuntimeError(f"Unknown action: {self.action_type}")
except VkApiError as exc:
if self._is_auth_error(exc):
context = "set_user_admin" if self.action_type == "admin" else "execute_user_action"
action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями"
self.auth_error.emit(context, exc, action_name)
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}")
finally:
processed += 1
self._emit_progress(processed)
self.finished.emit({"results": results, "processed": processed, "total": self.total})
except Exception as exc:
self.failed.emit(str(exc))
finally:
self.done.emit()
class VkChatManager(QMainWindow):
def __init__(self):
super().__init__()
@@ -85,11 +165,19 @@ class VkChatManager(QMainWindow):
self._auth_ui_busy = False
self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0
self._active_action_button = None
self._active_action_button_text = ""
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None
self.update_thread = None
self._update_in_progress = False
self._update_check_silent = False
self._bulk_worker_thread = None
self._bulk_worker = None
self._bulk_action_context = None
self._bulk_action_success_message_title = ""
self._bulk_clear_inputs_on_success = True
self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True)
@@ -184,6 +272,12 @@ class VkChatManager(QMainWindow):
self.add_user_btn.setMinimumHeight(50)
self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn)
self.operation_progress = QProgressBar()
self.operation_progress.setRange(0, 100)
self.operation_progress.setValue(0)
self.operation_progress.setTextVisible(True)
self.operation_progress.hide()
layout.addWidget(self.operation_progress)
self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.status_label)
@@ -369,10 +463,12 @@ class VkChatManager(QMainWindow):
self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive():
if self._update_in_progress:
self.status_label.setText("Статус: проверка обновлений уже выполняется...")
return
self._update_check_silent = silent_no_updates
self._update_in_progress = True
self._set_update_action_state(True)
channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
@@ -383,16 +479,20 @@ class VkChatManager(QMainWindow):
request_timeout=UPDATE_REQUEST_TIMEOUT,
channel=self.update_channel,
)
self.update_thread = QThread(self)
self.update_checker.moveToThread(self.update_thread)
self.update_thread.started.connect(self.update_checker.run)
self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True)
self.update_checker.check_finished.connect(self.update_thread.quit)
self.update_checker.check_failed.connect(self.update_thread.quit)
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
self.update_thread.finished.connect(self._on_update_thread_finished)
self.update_thread.finished.connect(self.update_thread.deleteLater)
self.update_thread.start()
def _on_update_check_finished(self, result):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
if result.get("has_update"):
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
self.status_label.setText(f"Статус: доступно обновление {latest_version}")
@@ -405,6 +505,14 @@ class VkChatManager(QMainWindow):
f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?"
)
release_notes = (result.get("release_notes") or "").strip()
if release_notes:
preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()]
preview_text = "\n".join(preview_lines[:8])
if len(preview_lines) > 8:
preview_text += "\n..."
message_box.setInformativeText(f"Что нового:\n{preview_text}")
message_box.setDetailedText(release_notes)
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
setup_button = None
@@ -443,9 +551,6 @@ class VkChatManager(QMainWindow):
QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
def _on_update_check_failed(self, error_text):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
self._log_event("update_check_failed", error_text, level="WARN")
if not self.update_repository_url:
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
@@ -462,6 +567,12 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text)
def _on_update_thread_finished(self):
self._set_update_action_state(False)
self._update_in_progress = False
self.update_checker = None
self.update_thread = None
def setup_token_timer(self):
self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -550,6 +661,102 @@ class VkChatManager(QMainWindow):
else:
self.set_ui_state(True)
def _start_operation_progress(self, total, label_text, action_button=None):
total = max(1, int(total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(0)
self.operation_progress.setFormat(f"{label_text}: %v/%m")
self.operation_progress.show()
self._active_action_button = action_button
self._active_action_button_text = action_button.text() if action_button else ""
if action_button is not None:
action_button.setText(f"{label_text} (0/{total})")
action_button.setEnabled(False)
def _update_operation_progress(self, processed, total, label_text):
total = max(1, int(total))
processed = max(0, min(int(processed), total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(processed)
self.operation_progress.setFormat(f"{label_text}: {processed}/{total}")
if self._active_action_button is not None:
self._active_action_button.setText(f"{label_text} ({processed}/{total})")
def _finish_operation_progress(self):
self.operation_progress.hide()
self.operation_progress.setValue(0)
self.operation_progress.setFormat("%p%")
if self._active_action_button is not None:
self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text())
self._active_action_button = None
self._active_action_button_text = ""
def _start_bulk_action_worker(
self,
action_type,
selected_chats,
user_infos,
action_label,
action_button=None,
success_message_title="Результаты",
clear_inputs_on_success=True,
):
total = max(1, len(selected_chats) * len(user_infos))
self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action"
self._bulk_action_success_message_title = success_message_title
self._bulk_clear_inputs_on_success = clear_inputs_on_success
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
self._start_operation_progress(total, action_label, action_button=action_button)
worker = BulkActionWorker(
vk_call_with_retry=self._vk_call_with_retry,
vk_api=self.vk,
action_type=action_type,
selected_chats=selected_chats,
user_infos=user_infos,
visible_messages=self.visible_messages_checkbox.isChecked(),
)
thread = QThread(self)
worker.moveToThread(thread)
thread.started.connect(worker.run)
worker.progress.connect(self._on_bulk_action_progress)
worker.finished.connect(self._on_bulk_action_finished)
worker.auth_error.connect(self._on_bulk_action_auth_error)
worker.failed.connect(self._on_bulk_action_failed)
worker.done.connect(self._on_bulk_action_done)
worker.done.connect(thread.quit)
worker.done.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
self._bulk_worker = worker
self._bulk_worker_thread = thread
thread.start()
def _on_bulk_action_progress(self, processed, total, label):
label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label)
self._update_operation_progress(processed, total, label_text)
self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...")
def _on_bulk_action_finished(self, payload):
results = payload.get("results", []) if isinstance(payload, dict) else []
QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results))
if self._bulk_clear_inputs_on_success:
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def _on_bulk_action_auth_error(self, context, exc, action_name):
self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name)
def _on_bulk_action_failed(self, error_text):
QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}")
def _on_bulk_action_done(self):
self._finish_operation_progress()
self._set_busy(False)
self._bulk_worker = None
self._bulk_worker_thread = None
def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True)
@@ -560,8 +767,8 @@ class VkChatManager(QMainWindow):
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
except Exception:
pass
except Exception as exc:
sys.stderr.write(f"[WARN] log_write_failed: {exc}\n")
def _log_error(self, context, exc):
self._log("ERROR", context, self._format_vk_error(exc))
@@ -578,8 +785,8 @@ class VkChatManager(QMainWindow):
if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception:
pass
except Exception as exc:
sys.stderr.write(f"[WARN] log_rotate_failed: {exc}\n")
def _format_vk_error(self, exc):
error = getattr(exc, "error", None)
@@ -691,8 +898,8 @@ class VkChatManager(QMainWindow):
try:
if output_path and os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
def _on_auth_process_finished(self, exit_code, _exit_status):
output_path = self.auth_output_path
@@ -729,8 +936,8 @@ class VkChatManager(QMainWindow):
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
else:
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
@@ -762,8 +969,8 @@ class VkChatManager(QMainWindow):
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить старый файл результата авторизации: {exc}", level="WARN")
program, args = self._build_auth_command(auth_url, output_path)
self.auth_output_path = output_path
@@ -785,12 +992,26 @@ class VkChatManager(QMainWindow):
self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время)
try:
self.token_expiration_time = token_store_save_token(
self.token,
TOKEN_FILE,
APP_DATA_DIR,
expires_in=expires_in,
)
except Exception as e:
try:
expires_value = int(expires_in)
except (TypeError, ValueError):
expires_value = 0
self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0
self._log_event("token_store_save", str(e), level="WARN")
QMessageBox.warning(
self,
"Предупреждение",
"Не удалось безопасно сохранить токен на диске. "
"Текущая сессия активна, но после перезапуска потребуется повторная авторизация.",
)
self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован")
@@ -902,7 +1123,8 @@ class VkChatManager(QMainWindow):
try:
user = self.vk.users.get(user_ids=user_id)[0]
return f"{user.get('first_name', '')} {user.get('last_name', '')}"
except Exception:
except Exception as exc:
self._log_event("get_user_info", f"Не удалось получить имя пользователя {user_id}: {exc}", level="WARN")
return f"Пользователь {user_id}"
def _get_selected_chats(self):
@@ -914,7 +1136,7 @@ class VkChatManager(QMainWindow):
selected.append({'id': chat_id, 'title': title})
return selected
def _execute_user_action(self, action_type):
def _execute_user_action(self, action_type, action_button=None):
if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
return
@@ -972,44 +1194,23 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button:
return
results = []
total = len(selected_chats) * len(user_infos)
processed = 0
try:
action_label = "исключение" if action_type == "remove" else "приглашение"
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
for chat in selected_chats:
for user_id, user_info in user_infos.items():
try:
if action_type == "remove":
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
self._start_bulk_action_worker(
action_type=action_type,
selected_chats=selected_chats,
user_infos=user_infos,
action_label=action_label,
action_button=action_button,
success_message_title="Результаты",
clear_inputs_on_success=True,
)
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def remove_user(self):
self._execute_user_action("remove")
self._execute_user_action("remove", action_button=self.remove_user_btn)
def add_user_to_chat(self):
self._execute_user_action("add")
self._execute_user_action("add", action_button=self.add_user_btn)
def set_user_admin(self):
"""Назначает пользователя администратором чата."""
@@ -1047,46 +1248,16 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button:
return
# 4. Выполнение API запросов
results = []
total = len(selected_chats) * len(user_infos)
processed = 0
try:
self._set_busy(True, f"Статус: назначение админов (0/{total})...")
for chat in selected_chats:
# VK API требует peer_id. Для чатов это 2000000000 + local_id
try:
peer_id = 2000000000 + int(chat['id'])
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
self._start_bulk_action_worker(
action_type="admin",
selected_chats=selected_chats,
user_infos=user_infos,
action_label="назначение админов",
action_button=None,
success_message_title="Результаты назначения",
clear_inputs_on_success=True,
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list):
@@ -1237,7 +1408,8 @@ if __name__ == "__main__":
idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2]
except Exception:
except Exception as exc:
sys.stderr.write(f"[ERROR] auth_cli_args_invalid: {exc}\n")
sys.exit(1)
auth_webview.main_auth(auth_url, output_path)
sys.exit(0)

View File

@@ -9,6 +9,18 @@ import zipfile
class AutoUpdateService:
@staticmethod
def _safe_extract_zip(archive, destination_dir):
destination_real = os.path.realpath(destination_dir)
for member in archive.infolist():
member_name = member.filename or ""
if not member_name:
continue
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
archive.extractall(destination_dir)
@staticmethod
def download_update_archive(download_url, destination_path):
request = urllib.request.Request(
@@ -215,6 +227,6 @@ class AutoUpdateService:
cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir)
cls._safe_extract_zip(archive, unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir

View File

@@ -91,8 +91,8 @@ def save_token(token, token_file, app_data_dir, expires_in=0):
try:
stored_token = _encrypt_token(token)
encrypted = True
except Exception:
pass
except Exception as exc:
raise RuntimeError("Failed to securely store token with DPAPI.") from exc
data = {
"token": stored_token,

View File

@@ -5,7 +5,42 @@ import urllib.error
import urllib.request
from urllib.parse import urlparse
try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text):
@@ -67,6 +102,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
@@ -112,6 +148,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"release_notes": release_notes,
"download_url": download_url,
"download_name": download_name,
"installer_url": installer_url,

View File

@@ -1,60 +0,0 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,97 @@
import unittest
from types import SimpleNamespace
from unittest import mock
class _DummySignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _DummyThread:
created = 0
def __init__(self, _parent=None):
type(self).created += 1
self.started = _DummySignal()
self.finished = _DummySignal()
def start(self):
self.started.emit()
def quit(self):
self.finished.emit()
def deleteLater(self):
return None
class _DummyChecker:
created = 0
def __init__(self, *_args, **_kwargs):
type(self).created += 1
self.check_finished = _DummySignal()
self.check_failed = _DummySignal()
def moveToThread(self, _thread):
return None
def run(self):
return None
def deleteLater(self):
return None
class UpdateReentryRuntimeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
import main # noqa: PLC0415
except Exception as exc:
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
cls.main = main
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
_DummyChecker.created = 0
_DummyThread.created = 0
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
manager._update_in_progress = False
manager._update_check_silent = False
manager.update_channel = "stable"
manager.update_repository_url = "https://example.com/org/repo"
manager.update_checker = None
manager.update_thread = None
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
manager._log_event = lambda *_args, **_kwargs: None
manager._set_update_action_state = lambda *_args, **_kwargs: None
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertTrue(manager._update_in_progress)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
first_thread = manager.update_thread
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
self.assertIs(manager.update_thread, first_thread)
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
self.assertFalse(manager._update_in_progress)
self.assertIsNone(manager.update_checker)
self.assertIsNone(manager.update_thread)
if __name__ == "__main__":
unittest.main()