5 Commits

Author SHA1 Message Date
c77ca4652b fix(ci): update syntax validation test file list
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m54s
2026-02-16 00:37:26 +03:00
4ec24c6d0f chore(version): bump to 2.2.3
Some checks failed
Desktop CI / tests (push) Failing after 12s
Desktop Release / release (push) Has been cancelled
2026-02-16 00:35:38 +03:00
72edfffd9e fix(update): harden reentry state and add runtime regression test 2026-02-16 00:35:03 +03:00
db5d901435 test(main): replace brittle smoke checks with AST contracts 2026-02-16 00:31:38 +03:00
cd5e6e1f6b fix(update): prevent repeated check crash and bump to 2.2.2
All checks were successful
Desktop CI / tests (push) Successful in 15s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:59:57 +03:00
6 changed files with 239 additions and 83 deletions

View File

@@ -28,7 +28,7 @@ jobs:
- name: Validate syntax
run: |
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auth_relogin_smoke.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_token_store.py
python -m py_compile app_version.py main.py build.py updater_gui.py tests/test_auto_update_service.py tests/test_chat_actions.py tests/test_main_contracts.py tests/test_token_store.py tests/test_update_reentry_runtime.py tests/test_update_service.py tests/test_updater_gui.py
- name: Run tests
run: |

View File

@@ -1 +1 @@
APP_VERSION = "2.2.1"
APP_VERSION = "2.2.3"

47
main.py
View File

@@ -49,7 +49,7 @@ UPDATE_REPOSITORY = ""
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin", "_unused")
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
def get_resource_path(relative_path):
@@ -171,6 +171,7 @@ class VkChatManager(QMainWindow):
self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None
self.update_thread = None
self._update_in_progress = False
self._update_check_silent = False
self._bulk_worker_thread = None
self._bulk_worker = None
@@ -462,10 +463,12 @@ class VkChatManager(QMainWindow):
self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.isRunning():
if self._update_in_progress:
self.status_label.setText("Статус: проверка обновлений уже выполняется...")
return
self._update_check_silent = silent_no_updates
self._update_in_progress = True
self._set_update_action_state(True)
channel_label = "бета" if self.update_channel == "beta" else "релизы"
self.status_label.setText(f"Статус: проверка обновлений ({channel_label})...")
@@ -485,14 +488,11 @@ class VkChatManager(QMainWindow):
self.update_checker.check_failed.connect(self.update_thread.quit)
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
self.update_thread.finished.connect(self._on_update_thread_finished)
self.update_thread.finished.connect(self.update_thread.deleteLater)
self.update_thread.start()
def _on_update_check_finished(self, result):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
if result.get("has_update"):
latest_version = result.get("latest_version") or result.get("latest_tag") or "unknown"
self.status_label.setText(f"Статус: доступно обновление {latest_version}")
@@ -551,9 +551,6 @@ class VkChatManager(QMainWindow):
QMessageBox.information(self, "Обновления", f"Установлена актуальная версия в канале {channel_label}.")
def _on_update_check_failed(self, error_text):
self._set_update_action_state(False)
self.update_checker = None
self.update_thread = None
self._log_event("update_check_failed", error_text, level="WARN")
if not self.update_repository_url:
self.status_label.setText("Статус: обновления не настроены (URL репозитория не задан).")
@@ -570,6 +567,12 @@ class VkChatManager(QMainWindow):
if not self._update_check_silent:
QMessageBox.warning(self, "Проверка обновлений", error_text)
def _on_update_thread_finished(self):
self._set_update_action_state(False)
self._update_in_progress = False
self.update_checker = None
self.update_thread = None
def setup_token_timer(self):
self.token_countdown_timer = QTimer(self)
self.token_countdown_timer.timeout.connect(self.update_token_timer_display)
@@ -764,8 +767,8 @@ class VkChatManager(QMainWindow):
timestamp = QDateTime.currentDateTime().toString("yyyy-MM-dd HH:mm:ss")
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [{level}] {context}: {message}\n")
except Exception:
pass
except Exception as exc:
sys.stderr.write(f"[WARN] log_write_failed: {exc}\n")
def _log_error(self, context, exc):
self._log("ERROR", context, self._format_vk_error(exc))
@@ -782,8 +785,8 @@ class VkChatManager(QMainWindow):
if os.path.exists(LOG_BACKUP_FILE):
os.remove(LOG_BACKUP_FILE)
os.replace(LOG_FILE, LOG_BACKUP_FILE)
except Exception:
pass
except Exception as exc:
sys.stderr.write(f"[WARN] log_rotate_failed: {exc}\n")
def _format_vk_error(self, exc):
error = getattr(exc, "error", None)
@@ -895,8 +898,8 @@ class VkChatManager(QMainWindow):
try:
if output_path and os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
def _on_auth_process_finished(self, exit_code, _exit_status):
output_path = self.auth_output_path
@@ -933,8 +936,8 @@ class VkChatManager(QMainWindow):
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить файл результата авторизации: {exc}", level="WARN")
else:
self._log_event("auth_result", "Файл результата авторизации не найден.", level="WARN")
@@ -966,8 +969,8 @@ class VkChatManager(QMainWindow):
try:
if os.path.exists(output_path):
os.remove(output_path)
except Exception:
pass
except Exception as exc:
self._log_event("auth_result_cleanup", f"Не удалось удалить старый файл результата авторизации: {exc}", level="WARN")
program, args = self._build_auth_command(auth_url, output_path)
self.auth_output_path = output_path
@@ -1120,7 +1123,8 @@ class VkChatManager(QMainWindow):
try:
user = self.vk.users.get(user_ids=user_id)[0]
return f"{user.get('first_name', '')} {user.get('last_name', '')}"
except Exception:
except Exception as exc:
self._log_event("get_user_info", f"Не удалось получить имя пользователя {user_id}: {exc}", level="WARN")
return f"Пользователь {user_id}"
def _get_selected_chats(self):
@@ -1404,7 +1408,8 @@ if __name__ == "__main__":
idx = sys.argv.index("--auth")
auth_url = sys.argv[idx + 1]
output_path = sys.argv[idx + 2]
except Exception:
except Exception as exc:
sys.stderr.write(f"[ERROR] auth_cli_args_invalid: {exc}\n")
sys.exit(1)
auth_webview.main_auth(auth_url, output_path)
sys.exit(0)

View File

@@ -1,60 +0,0 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,97 @@
import unittest
from types import SimpleNamespace
from unittest import mock
class _DummySignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _DummyThread:
created = 0
def __init__(self, _parent=None):
type(self).created += 1
self.started = _DummySignal()
self.finished = _DummySignal()
def start(self):
self.started.emit()
def quit(self):
self.finished.emit()
def deleteLater(self):
return None
class _DummyChecker:
created = 0
def __init__(self, *_args, **_kwargs):
type(self).created += 1
self.check_finished = _DummySignal()
self.check_failed = _DummySignal()
def moveToThread(self, _thread):
return None
def run(self):
return None
def deleteLater(self):
return None
class UpdateReentryRuntimeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
try:
import main # noqa: PLC0415
except Exception as exc:
raise unittest.SkipTest(f"main import unavailable: {exc}") from exc
cls.main = main
def test_repeated_update_check_is_ignored_until_thread_finishes(self):
_DummyChecker.created = 0
_DummyThread.created = 0
manager = self.main.VkChatManager.__new__(self.main.VkChatManager)
manager._update_in_progress = False
manager._update_check_silent = False
manager.update_channel = "stable"
manager.update_repository_url = "https://example.com/org/repo"
manager.update_checker = None
manager.update_thread = None
manager.status_label = SimpleNamespace(setText=lambda *_args, **_kwargs: None)
manager._log_event = lambda *_args, **_kwargs: None
manager._set_update_action_state = lambda *_args, **_kwargs: None
with mock.patch.object(self.main, "UpdateChecker", _DummyChecker), mock.patch.object(self.main, "QThread", _DummyThread):
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertTrue(manager._update_in_progress)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
first_thread = manager.update_thread
self.main.VkChatManager.check_for_updates(manager, silent_no_updates=True)
self.assertEqual(_DummyChecker.created, 1)
self.assertEqual(_DummyThread.created, 1)
self.assertIs(manager.update_thread, first_thread)
manager.update_checker.check_finished.emit({"has_update": False, "current_version": self.main.APP_VERSION})
self.assertFalse(manager._update_in_progress)
self.assertIsNone(manager.update_checker)
self.assertIsNone(manager.update_thread)
if __name__ == "__main__":
unittest.main()