11 Commits

Author SHA1 Message Date
Денисов Александр Андреевич
ac2013bcca Add logging for chat load and bump version to 2.2.4
Some checks failed
Desktop CI / tests (push) Successful in 17s
Desktop Release / release (push) Failing after 3m10s
2026-02-17 17:58:17 +03:00
c77ca4652b fix(ci): update syntax validation test file list
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m54s
2026-02-16 00:37:26 +03:00
4ec24c6d0f chore(version): bump to 2.2.3
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Has been cancelled
2026-02-16 00:35:38 +03:00
72edfffd9e fix(update): harden reentry state and add runtime regression test 2026-02-16 00:35:03 +03:00
db5d901435 test(main): replace brittle smoke checks with AST contracts 2026-02-16 00:31:38 +03:00
cd5e6e1f6b fix(update): prevent repeated check crash and bump to 2.2.2
All checks were successful
Desktop CI / tests (push) Successful in 15s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:59:57 +03:00
4b3347a069 fix(update,security): add release notes in updater and bump to 2.2.1
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:51:44 +03:00
e0628b1792 chore(version): bump to 2.2.0
Some checks are pending
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Has started running
2026-02-15 23:49:04 +03:00
201184700f docs(readme): update install steps and feature list 2026-02-15 23:48:15 +03:00
5253c942e8 fix(core,security): safe update extraction and async bulk vk actions 2026-02-15 23:42:51 +03:00
c645d964bf fix(auth,ui): add auth webview cli entrypoint and bulk action progress bar
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 17s
2026-02-15 23:31:01 +03:00
11 changed files with 624 additions and 178 deletions

View File

@@ -28,7 +28,7 @@ jobs:
- name: Validate syntax - name: Validate syntax
run: | run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
- name: Run tests - name: Run tests
run: | run: |

View File

@@ -13,9 +13,12 @@
* Моментальная загрузка всех доступных чатов пользователя. * Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»). * Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед. * Быстрое обновление списка бесед.
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
* Визуальный прогресс-бар по ходу операции.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`). * **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно. * **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса. * **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
--- ---
@@ -47,7 +50,7 @@
3. **Установите зависимости:** 3. **Установите зависимости:**
```bash ```bash
pip install PySide6 vk_api pip install -r requirements.txt
``` ```
4. **Запустите приложение:** 4. **Запустите приложение:**
@@ -68,6 +71,12 @@
## 📂 Техническая информация ## 📂 Техническая информация
### Последние обновления
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
### Сборка проекта (для разработчиков) ### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`. Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.
@@ -101,4 +110,4 @@ python build.py
--- ---
Проект распространяется под лицензией MIT. Проект распространяется под лицензией MIT.
Сэкономьте часы ручного труда с Anabasis VK Chat Manager. Сэкономьте часы ручного труда с Anabasis VK Chat Manager.

View File

@@ -1 +1 @@
APP_VERSION = "2.1.2" APP_VERSION = "2.2.4"

View File

@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
webview.start(private_mode=False, storage_path=storage_path) webview.start(private_mode=False, storage_path=storage_path)
def main():
# Supports both: `python auth_webview.py <auth_url> <output_path>`
# and: `python auth_webview.py --auth <auth_url> <output_path>`
args = sys.argv[1:]
if len(args) == 3 and args[0] == "--auth":
auth_url, output_path = args[1], args[2]
elif len(args) == 2:
auth_url, output_path = args[0], args[1]
else:
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
return 1
main_auth(auth_url, output_path)
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

437
main.py
View File

@@ -1,13 +1,12 @@
import json import json
import os import os
import shutil import shutil
import sys import sys
import threading
import time import time
from PySide6.QtCore import QProcess from PySide6.QtCore import QProcess
from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox, QPushButton, QVBoxLayout, QWidget, QMessageBox,
@@ -50,6 +49,7 @@ UPDATE_REPOSITORY = ""
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable" UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8 UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
def get_resource_path(relative_path): def get_resource_path(relative_path):
@@ -59,6 +59,86 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска # Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path) return os.path.join(os.path.abspath("."), relative_path)
class BulkActionWorker(QObject):
progress = Signal(int, int, str)
finished = Signal(dict)
auth_error = Signal(str, object, str)
failed = Signal(str)
done = Signal()
def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages):
super().__init__()
self.vk_call_with_retry = vk_call_with_retry
self.vk = vk_api
self.action_type = action_type
self.selected_chats = selected_chats
self.user_infos = user_infos
self.visible_messages = visible_messages
self.total = len(self.selected_chats) * len(self.user_infos)
@staticmethod
def _is_auth_error(exc):
return VkService.is_auth_error(exc, str(exc).lower())
def _emit_progress(self, processed):
label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add")
self.progress.emit(processed, self.total, label)
def run(self):
results = []
processed = 0
try:
for chat in self.selected_chats:
peer_id = None
if self.action_type == "admin":
try:
peer_id = 2000000000 + int(chat["id"])
except (ValueError, TypeError):
for _user_id, user_info in self.user_infos.items():
results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})")
processed += 1
self._emit_progress(processed)
continue
for user_id, user_info in self.user_infos.items():
try:
if self.action_type == "remove":
self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
elif self.action_type == "add":
params = {"chat_id": chat["id"], "user_id": user_id}
if self.visible_messages:
params["visible_messages_count"] = 250
self.vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
elif self.action_type == "admin":
self.vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin",
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
else:
raise RuntimeError(f"Unknown action: {self.action_type}")
except VkApiError as exc:
if self._is_auth_error(exc):
context = "set_user_admin" if self.action_type == "admin" else "execute_user_action"
action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями"
self.auth_error.emit(context, exc, action_name)
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}")
finally:
processed += 1
self._emit_progress(processed)
self.finished.emit({"results": results, "processed": processed, "total": self.total})
except Exception as exc:
self.failed.emit(str(exc))
finally:
self.done.emit()
class VkChatManager(QMainWindow): class VkChatManager(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -85,11 +165,19 @@ class VkChatManager(QMainWindow):
self._auth_ui_busy = False self._auth_ui_busy = False
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0 self._last_auth_relogin_ts = 0.0
self._active_action_button = None
self._active_action_button_text = ""
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_channel = UPDATE_CHANNEL_DEFAULT self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None self.update_checker = None
self.update_thread = None self.update_thread = None
self._update_in_progress = False
self._update_check_silent = False self._update_check_silent = False
self._bulk_worker_thread = None
self._bulk_worker = None
self._bulk_action_context = None
self._bulk_action_success_message_title = ""
self._bulk_clear_inputs_on_success = True
self.resolve_timer = QTimer(self) self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True) self.resolve_timer.setSingleShot(True)
@@ -184,6 +272,12 @@ class VkChatManager(QMainWindow):
self.add_user_btn.setMinimumHeight(50) self.add_user_btn.setMinimumHeight(50)
self.add_user_btn.clicked.connect(self.add_user_to_chat) self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn) layout.addWidget(self.add_user_btn)
self.operation_progress = QProgressBar()
self.operation_progress.setRange(0, 100)
self.operation_progress.setValue(0)
self.operation_progress.setTextVisible(True)
self.operation_progress.hide()
layout.addWidget(self.operation_progress)
self.status_label = QLabel("Статус: не авторизован") self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.status_label) layout.addWidget(self.status_label)
@@ -369,10 +463,12 @@ class VkChatManager(QMainWindow):
self._log_event("update_channel", f"update_channel={self.update_channel}") self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False): def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive(): if self._update_in_progress:
self.status_label.setText("Статус: проверка обновлений уже выполняется...")
return return
self._update_check_silent = silent_no_updates self._update_check_silent = silent_no_updates
self._update_in_progress = True
self._set_update_action_state(True) self._set_update_action_state(True)
channel_label = "бета" if self.update_channel == "beta" else "релизы" channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...") self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
@@ -383,16 +479,20 @@ class VkChatManager(QMainWindow):
request_timeout=UPDATE_REQUEST_TIMEOUT, request_timeout=UPDATE_REQUEST_TIMEOUT,
channel=self.update_channel, channel=self.update_channel,
) )
self.update_thread = QThread(self)
self.update_checker.moveToThread(self.update_thread)
self.update_thread.started.connect(self.update_checker.run)
self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) self.update_checker.check_finished.connect(self.update_thread.quit)
self.update_checker.check_failed.connect(self.update_thread.quit)
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
self.update_thread.finished.connect(self._on_update_thread_finished)
self.update_thread.finished.connect(self.update_thread.deleteLater)
self.update_thread.start() self.update_thread.start()
def _on_update_check_finished(self, result): def _on_update_check_finished(self, result):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
if result.get("has_update"): if result.get("has_update"):
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown" latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
self.status_label.setText(f"Статус: доступно обновление {latest_version}") self.status_label.setText(f"Статус: доступно обновление {latest_version}")
@@ -405,6 +505,14 @@ class VkChatManager(QMainWindow):
f"Доступная версия: {latest_version}\n\n" f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?" "Открыть страницу загрузки?"
) )
release_notes = (result.get("release_notes") or "").strip()
if release_notes:
preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()]
preview_text = "\n".join(preview_lines[:8])
if len(preview_lines) > 8:
preview_text += "\n..."
message_box.setInformativeText(f"Что нового:\n{preview_text}")
message_box.setDetailedText(release_notes)
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole) update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
setup_button = None setup_button = None
@@ -443,9 +551,6 @@ class VkChatManager(QMainWindow):
QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.") QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
def _on_update_check_failed(self, error_text): def _on_update_check_failed(self, error_text):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
self._log_event("update_check_failed", error_text, level="WARN") self._log_event("update_check_failed", error_text, level="WARN")
if not self.update_repository_url: if not self.update_repository_url:
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).") self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
@@ -462,6 +567,12 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent: if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text) QMessageBox.warning(self, "Проверка обновлений", error_text)
def _on_update_thread_finished(self):
self._set_update_action_state(False)
self._update_in_progress = False
self.update_checker = None
self.update_thread = None
def setup_token_timer(self): def setup_token_timer(self):
self.token_countdown_timer = QTimer(self) self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display) self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -550,6 +661,110 @@ class VkChatManager(QMainWindow):
else: else:
self.set_ui_state(True) self.set_ui_state(True)
def _start_operation_progress(self, total, label_text, action_button=None):
total = max(1, int(total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(0)
self.operation_progress.setFormat(f"{label_text}: %v/%m")
self.operation_progress.show()
self._active_action_button = action_button
self._active_action_button_text = action_button.text() if action_button else ""
if action_button is not None:
action_button.setText(f"{label_text} (0/{total})")
action_button.setEnabled(False)
def _update_operation_progress(self, processed, total, label_text):
total = max(1, int(total))
processed = max(0, min(int(processed), total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(processed)
self.operation_progress.setFormat(f"{label_text}: {processed}/{total}")
if self._active_action_button is not None:
self._active_action_button.setText(f"{label_text} ({processed}/{total})")
def _finish_operation_progress(self):
self.operation_progress.hide()
self.operation_progress.setValue(0)
self.operation_progress.setFormat("%p%")
if self._active_action_button is not None:
self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text())
self._active_action_button = None
self._active_action_button_text = ""
def _start_bulk_action_worker(
self,
action_type,
selected_chats,
user_infos,
action_label,
action_button=None,
success_message_title="Результаты",
clear_inputs_on_success=True,
):
total = max(1, len(selected_chats) * len(user_infos))
self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action"
self._bulk_action_success_message_title = success_message_title
self._bulk_clear_inputs_on_success = clear_inputs_on_success
self._log_event(
"bulk_action",
f"start action={action_type} chats={len(selected_chats)} users={len(user_infos)}",
)
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
self._start_operation_progress(total, action_label, action_button=action_button)
worker = BulkActionWorker(
vk_call_with_retry=self._vk_call_with_retry,
vk_api=self.vk,
action_type=action_type,
selected_chats=selected_chats,
user_infos=user_infos,
visible_messages=self.visible_messages_checkbox.isChecked(),
)
thread = QThread(self)
worker.moveToThread(thread)
thread.started.connect(worker.run)
worker.progress.connect(self._on_bulk_action_progress)
worker.finished.connect(self._on_bulk_action_finished)
worker.auth_error.connect(self._on_bulk_action_auth_error)
worker.failed.connect(self._on_bulk_action_failed)
worker.done.connect(self._on_bulk_action_done)
worker.done.connect(thread.quit)
worker.done.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
self._bulk_worker = worker
self._bulk_worker_thread = thread
thread.start()
def _on_bulk_action_progress(self, processed, total, label):
label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label)
self._update_operation_progress(processed, total, label_text)
self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...")
def _on_bulk_action_finished(self, payload):
results = payload.get("results", []) if isinstance(payload, dict) else []
processed = payload.get("processed") if isinstance(payload, dict) else None
total = payload.get("total") if isinstance(payload, dict) else None
if processed is not None and total is not None:
self._log_event("bulk_action", f"done processed={processed} total={total}")
QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results))
if self._bulk_clear_inputs_on_success:
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def _on_bulk_action_auth_error(self, context, exc, action_name):
self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name)
def _on_bulk_action_failed(self, error_text):
QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}")
def _on_bulk_action_done(self):
self._finish_operation_progress()
self._set_busy(False)
self._bulk_worker = None
self._bulk_worker_thread = None
def _ensure_log_dir(self): def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True) os.makedirs(APP_DATA_DIR, exist_ok=True)
@@ -560,8 +775,8 @@ class VkChatManager(QMainWindow):
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss") timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
with open(LOG_FILE, "a", encoding="utf-8") as f: with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [{level}] {context}: {message}\n") f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
except Exception: except Exception as exc:
pass sys.stderr.write(f"[WARN] log_write_failed: {exc}\n")
def _log_error(self, context, exc): def _log_error(self, context, exc):
self._log("ERROR", context, self._format_vk_error(exc)) self._log("ERROR", context, self._format_vk_error(exc))
@@ -578,8 +793,8 @@ class VkChatManager(QMainWindow):
if os.path.exists(LOG_BACKUP_FILE): if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE) os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE) os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception: except Exception as exc:
pass sys.stderr.write(f"[WARN] log_rotate_failed: {exc}\n")
def _format_vk_error(self, exc): def _format_vk_error(self, exc):
error = getattr(exc, "error", None) error = getattr(exc, "error", None)
@@ -691,8 +906,8 @@ class VkChatManager(QMainWindow):
try: try:
if output_path and os.path.exists(output_path): if output_path and os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
def _on_auth_process_finished(self, exit_code, _exit_status): def _on_auth_process_finished(self, exit_code, _exit_status):
output_path = self.auth_output_path output_path = self.auth_output_path
@@ -729,8 +944,8 @@ class VkChatManager(QMainWindow):
try: try:
if os.path.exists(output_path): if os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
else: else:
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN") self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
@@ -742,6 +957,7 @@ class VkChatManager(QMainWindow):
self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.") self._log_event("auth_process", "Авторизация уже запущена, повторный запуск пропущен.")
return return
self._log_event("auth_process", "start")
if keep_status_text and hasattr(self, "_relogin_status_text"): if keep_status_text and hasattr(self, "_relogin_status_text"):
status_text = self._relogin_status_text status_text = self._relogin_status_text
self._relogin_status_text = None self._relogin_status_text = None
@@ -762,8 +978,8 @@ class VkChatManager(QMainWindow):
try: try:
if os.path.exists(output_path): if os.path.exists(output_path):
os.remove(output_path) os.remove(output_path)
except Exception: except Exception as exc:
pass self._log_event("auth_result_cleanup", f"Не удалось удалить старый файл результата авторизации: {exc}", level="WARN")
program, args = self._build_auth_command(auth_url, output_path) program, args = self._build_auth_command(auth_url, output_path)
self.auth_output_path = output_path self.auth_output_path = output_path
@@ -783,14 +999,29 @@ class VkChatManager(QMainWindow):
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
return return
self._log_event("auth_process", f"success expires_in={expires_in}")
self.token = token self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время) # Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = token_store_save_token( try:
self.token, self.token_expiration_time = token_store_save_token(
TOKEN_FILE, self.token,
APP_DATA_DIR, TOKEN_FILE,
expires_in=expires_in, APP_DATA_DIR,
) expires_in=expires_in,
)
except Exception as e:
try:
expires_value = int(expires_in)
except (TypeError, ValueError):
expires_value = 0
self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0
self._log_event("token_store_save", str(e), level="WARN")
QMessageBox.warning(
self,
"Предупреждение",
"Не удалось безопасно сохранить токен на диске. "
"Текущая сессия активна, но после перезапуска потребуется повторная авторизация.",
)
self.token_input.setText(self.token[:50] + "...") self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован") self.status_label.setText("Статус: авторизован")
@@ -902,7 +1133,8 @@ class VkChatManager(QMainWindow):
try: try:
user = self.vk.users.get(user_ids=user_id)[0] user = self.vk.users.get(user_ids=user_id)[0]
return f"{user.get('first_name', '')} {user.get('last_name', '')}" return f"{user.get('first_name', '')} {user.get('last_name', '')}"
except Exception: except Exception as exc:
self._log_event("get_user_info", f"Не удалось получить имя пользователя {user_id}: {exc}", level="WARN")
return f"Пользователь {user_id}" return f"Пользователь {user_id}"
def _get_selected_chats(self): def _get_selected_chats(self):
@@ -914,7 +1146,7 @@ class VkChatManager(QMainWindow):
selected.append({'id': chat_id, 'title': title}) selected.append({'id': chat_id, 'title': title})
return selected return selected
def _execute_user_action(self, action_type): def _execute_user_action(self, action_type, action_button=None):
if not self.user_ids_to_process: if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
return return
@@ -972,44 +1204,23 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
results = [] action_label = "исключение" if action_type == "remove" else "приглашение"
total = len(selected_chats) * len(user_infos) self._start_bulk_action_worker(
processed = 0 action_type=action_type,
try: selected_chats=selected_chats,
action_label = "исключение" if action_type == "remove" else "приглашение" user_infos=user_infos,
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") action_label=action_label,
for chat in selected_chats: action_button=action_button,
for user_id, user_info in user_infos.items(): success_message_title="Результаты",
try: clear_inputs_on_success=True,
if action_type == "remove": )
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id) return
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def remove_user(self): def remove_user(self):
self._execute_user_action("remove") self._execute_user_action("remove", action_button=self.remove_user_btn)
def add_user_to_chat(self): def add_user_to_chat(self):
self._execute_user_action("add") self._execute_user_action("add", action_button=self.add_user_btn)
def set_user_admin(self): def set_user_admin(self):
"""Назначает пользователя администратором чата.""" """Назначает пользователя администратором чата."""
@@ -1047,46 +1258,16 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
# 4. Выполнение API запросов self._start_bulk_action_worker(
results = [] action_type="admin",
total = len(selected_chats) * len(user_infos) selected_chats=selected_chats,
processed = 0 user_infos=user_infos,
try: action_label="назначение админов",
self._set_busy(True, f"Статус: назначение админов (0/{total})...") action_button=None,
for chat in selected_chats: success_message_title="Результаты назначения",
# VK API требует peer_id. Для чатов это 2000000000 + local_id clear_inputs_on_success=True,
try: )
peer_id = 2000000000 + int(chat['id']) return
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here. # Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list): def _process_links_list(self, links_list):
@@ -1094,6 +1275,7 @@ class VkChatManager(QMainWindow):
QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.") QMessageBox.warning(self, "Ошибка", "Сначала авторизуйтесь.")
return return
self._log_event("resolve_ids", f"start count={len(links_list)}")
self.user_ids_to_process.clear() self.user_ids_to_process.clear()
resolved_ids = [] resolved_ids = []
failed_links = [] failed_links = []
@@ -1117,6 +1299,10 @@ class VkChatManager(QMainWindow):
self.user_ids_to_process = resolved_ids self.user_ids_to_process = resolved_ids
status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)." status_message = f"Статус: Готово к работе с {len(resolved_ids)} пользователем(ем/ями)."
self._log_event(
"resolve_ids",
f"done resolved={len(resolved_ids)} failed={len(failed_links)}",
)
if len(links_list) > 1: if len(links_list) > 1:
self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка") self._set_vk_url_input_text(f"Загружено {len(resolved_ids)}/{len(links_list)} из списка")
@@ -1143,17 +1329,40 @@ class VkChatManager(QMainWindow):
try: try:
self._set_busy(True, "Статус: загрузка чатов...") self._set_busy(True, "Статус: загрузка чатов...")
self._log_event("load_chats", "start")
conversations = load_chat_conversations(self._vk_call_with_retry, self.vk) conversations = load_chat_conversations(self._vk_call_with_retry, self.vk)
type_counts = {}
non_chat_samples = []
missing_title_count = 0
for conv in conversations: for conv in conversations:
if conv["conversation"]["peer"]["type"] != "chat": conv_info = conv.get("conversation", {})
peer = conv_info.get("peer", {})
peer_type = peer.get("type", "unknown")
type_counts[peer_type] = type_counts.get(peer_type, 0) + 1
if peer_type != "chat":
if len(non_chat_samples) < 30:
non_chat_samples.append(
{
"type": peer_type,
"peer_id": peer.get("id"),
"local_id": peer.get("local_id"),
"title": (conv_info.get("chat_settings") or {}).get("title", ""),
}
)
continue continue
chat_id = conv["conversation"]["peer"]["local_id"] chat_id = peer.get("local_id")
title = conv["conversation"]["chat_settings"]["title"] chat_settings = conv_info.get("chat_settings") or {}
title = chat_settings.get("title", "")
if not title:
missing_title_count += 1
self.chats.append({"id": chat_id, "title": title}) self.chats.append({"id": chat_id, "title": title})
checkbox = QCheckBox(f"{title} (id: {chat_id})") checkbox = QCheckBox(f"{title} (id: {chat_id})")
checkbox.setProperty("chat_id", chat_id) checkbox.setProperty("chat_id", chat_id)
if "группа магазинов" in title.casefold():
self._log_event("load_chats", f"chat_match title='{title}' id={chat_id}")
if "AG офис" in title: if "AG офис" in title:
layouts[0].insertWidget(layouts[0].count() - 1, checkbox) layouts[0].insertWidget(layouts[0].count() - 1, checkbox)
self.office_chat_checkboxes.append(checkbox) self.office_chat_checkboxes.append(checkbox)
@@ -1175,6 +1384,17 @@ class VkChatManager(QMainWindow):
self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})") self.chat_tabs.setTabText(2, f"AG Склад ({len(self.warehouse_chat_checkboxes)})")
self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})") self.chat_tabs.setTabText(3, f"AG Кофейни ({len(self.coffee_chat_checkboxes)})")
self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})") self.chat_tabs.setTabText(4, f"Прочие ({len(self.other_chat_checkboxes)})")
self._log_event(
"load_chats",
(
f"done total={len(conversations)} "
f"chats={len(self.chats)} "
f"type_counts={type_counts} "
f"missing_titles={missing_title_count}"
),
)
if non_chat_samples:
self._log_event("load_chats", f"non_chat_samples={non_chat_samples}")
except VkApiError as e: except VkApiError as e:
if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"): if self._handle_vk_api_error("load_chats", e, action_name="загрузки чатов"):
return return
@@ -1237,7 +1457,8 @@ if __name__ == "__main__":
idx = sys.argv.index("--auth") idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1] auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2] output_path = sys.argv[idx + 2]
except Exception: except Exception as exc:
sys.stderr.write(f"[ERROR] auth_cli_args_invalid: {exc}\n")
sys.exit(1) sys.exit(1)
auth_webview.main_auth(auth_url, output_path) auth_webview.main_auth(auth_url, output_path)
sys.exit(0) sys.exit(0)

View File

@@ -9,6 +9,18 @@ import zipfile
class AutoUpdateService: class AutoUpdateService:
@staticmethod
def _safe_extract_zip(archive, destination_dir):
destination_real = os.path.realpath(destination_dir)
for member in archive.infolist():
member_name = member.filename or ""
if not member_name:
continue
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
archive.extractall(destination_dir)
@staticmethod @staticmethod
def download_update_archive(download_url, destination_path): def download_update_archive(download_url, destination_path):
request = urllib.request.Request( request = urllib.request.Request(
@@ -215,6 +227,6 @@ class AutoUpdateService:
cls.verify_update_checksum(zip_path, checksum_url, download_name) cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True) os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive: with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir) cls._safe_extract_zip(archive, unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir) source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir return work_dir, source_dir

View File

@@ -91,8 +91,8 @@ def save_token(token, token_file, app_data_dir, expires_in=0):
try: try:
stored_token = _encrypt_token(token) stored_token = _encrypt_token(token)
encrypted = True encrypted = True
except Exception: except Exception as exc:
pass raise RuntimeError("Failed to securely store token with DPAPI.") from exc
data = { data = {
"token": stored_token, "token": stored_token,

View File

@@ -5,7 +5,42 @@ import urllib.error
import urllib.request import urllib.request
from urllib.parse import urlparse from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text): def _version_key(version_text):
@@ -67,6 +102,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
latest_tag = release_data.get("tag_name") or release_data.get("name") or "" latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip() latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or [] assets = release_data.get("assets") or []
download_url = "" download_url = ""
download_name = "" download_name = ""
@@ -112,6 +148,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
"current_version": current_version, "current_version": current_version,
"latest_tag": latest_tag, "latest_tag": latest_tag,
"release_url": html_url, "release_url": html_url,
"release_notes": release_notes,
"download_url": download_url, "download_url": download_url,
"download_name": download_name, "download_name": download_name,
"installer_url": installer_url, "installer_url": installer_url,

View File

@@ -1,60 +0,0 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,97 @@
import unittest
from types import SimpleNamespace
from unittest import mock
class _DummySignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _DummyThread:
created = 0
def __init__(self, _parent=None):
type(self).created += 1
self.started = _DummySignal()
self.finished = _DummySignal()
def start(self):
self.started.emit()
def quit(self):
self.finished.emit()
def deleteLater(self):
return None
class _DummyChecker:
created = 0
def __init__(self, *_args, **_kwargs):
type(self).created += 1
self.check_finished = _DummySignal()
self.check_failed = _DummySignal()
def moveToThread(self, _thread):
return None
def run(self):
return None
def deleteLater(self):
return None
class UpdateReentryRuntimeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
import main # noqa: PLC0415
except Exception as exc:
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
cls.main = main
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
_DummyChecker.created = 0
_DummyThread.created = 0
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
manager._update_in_progress = False
manager._update_check_silent = False
manager.update_channel = "stable"
manager.update_repository_url = "https://example.com/org/repo"
manager.update_checker = None
manager.update_thread = None
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
manager._log_event = lambda *_args, **_kwargs: None
manager._set_update_action_state = lambda *_args, **_kwargs: None
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertTrue(manager._update_in_progress)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
first_thread = manager.update_thread
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
self.assertIs(manager.update_thread, first_thread)
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
self.assertFalse(manager._update_in_progress)
self.assertIsNone(manager.update_checker)
self.assertIsNone(manager.update_thread)
if __name__ == "__main__":
unittest.main()