test(main): replace brittle smoke checks with AST contracts

This commit is contained in:
2026-02-16 00:31:38 +03:00
parent cd5e6e1f6b
commit db5d901435
3 changed files with 115 additions and 61 deletions

View File

@@ -49,7 +49,7 @@ UPDATE_REPOSITORY = ""
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin", "_unused")
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin")
def get_resource_path(relative_path):

View File

@@ -1,60 +0,0 @@
import unittest
from pathlib import Path
class AuthReloginSmokeTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8")
cls.vk_source = Path("services/vk_service.py").read_text(encoding="utf-8")
cls.update_source = Path("services/update_service.py").read_text(encoding="utf-8")
def test_auth_command_builder_handles_frozen_and_source(self):
self.assertIn("def _build_auth_command(self, auth_url, output_path):", self.main_source)
self.assertIn("entry_script_path=os.path.abspath(__file__)", self.main_source)
self.assertIn('return sys.executable, ["--auth", auth_url, output_path]', self.vk_source)
self.assertIn("script_path = entry_script_path or os.path.abspath(__file__)", self.vk_source)
def test_auth_runs_via_qprocess(self):
self.assertIn("process = QProcess(self)", self.main_source)
self.assertIn("process.start(program, args)", self.main_source)
self.assertIn("def _on_auth_process_finished(self, exit_code, _exit_status):", self.main_source)
self.assertIn("if self.auth_process and self.auth_process.state() == QProcess.ProcessState.NotRunning:", self.main_source)
def test_force_relogin_has_backoff_and_event_log(self):
self.assertIn("AUTH_RELOGIN_BACKOFF_SECONDS = 5.0", self.main_source)
self.assertIn("if self._auth_relogin_in_progress:", self.main_source)
self.assertIn("force_relogin_backoff", self.main_source)
self.assertIn("force_relogin", self.main_source)
def test_auth_error_paths_trigger_force_relogin(self):
self.assertIn(
"def _handle_vk_api_error(self, context, exc, action_name=None, ui_message_prefix=None, disable_ui=False):",
self.main_source,
)
self.assertIn("self._force_relogin(exc, action_name or context)", self.main_source)
self.assertIn('"load_chats",', self.main_source)
self.assertIn('"execute_user_action",', self.main_source)
self.assertIn('"set_user_admin",', self.main_source)
def test_tab_checkbox_lists_use_existing_attributes(self):
self.assertIn("self.warehouse_chat_checkboxes", self.main_source)
self.assertIn("self.coffee_chat_checkboxes", self.main_source)
self.assertNotIn("self.retail_warehouse_checkboxes", self.main_source)
self.assertNotIn("self.retail_coffee_checkboxes", self.main_source)
def test_update_check_actions_exist(self):
self.assertIn("from app_version import APP_VERSION", self.main_source)
self.assertIn("from services import (", self.main_source)
self.assertIn("UpdateChecker", self.main_source)
self.assertIn("detect_update_repository_url", self.main_source)
self.assertIn('QAction("Проверить обновления", self)', self.main_source)
self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.main_source)
self.assertIn("class UpdateChecker(QObject):", self.update_source)
self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.main_source)
self.assertIn("AutoUpdateService.prepare_update", self.main_source)
self.assertIn("AutoUpdateService.launch_gui_updater", self.main_source)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,114 @@
import ast
import unittest
from pathlib import Path
class MainContractsTests(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.main_source = Path("main.py").read_text(encoding="utf-8-sig")
cls.module = ast.parse(cls.main_source)
cls.vk_chat_manager = cls._find_class("VkChatManager")
@classmethod
def _find_class(cls, class_name):
for node in cls.module.body:
if isinstance(node, ast.ClassDef) and node.name == class_name:
return node
raise AssertionError(f"Class {class_name} not found")
def _find_method(self, method_name):
for node in self.vk_chat_manager.body:
if isinstance(node, ast.FunctionDef) and node.name == method_name:
return node
self.fail(f"Method {method_name} not found")
def _iter_nodes(self, node):
return ast.walk(node)
def test_auth_error_contexts_contains_only_supported_contexts(self):
expected_contexts = {"load_chats", "execute_user_action", "set_user_admin"}
for node in self.module.body:
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id == "AUTH_ERROR_CONTEXTS":
actual = set(ast.literal_eval(node.value))
self.assertSetEqual(actual, expected_contexts)
return
self.fail("AUTH_ERROR_CONTEXTS assignment not found")
def test_check_for_updates_has_reentry_guard(self):
method = self._find_method("check_for_updates")
has_guard = False
for node in method.body:
if not isinstance(node, ast.If):
continue
test = node.test
if (
isinstance(test, ast.Attribute)
and isinstance(test.value, ast.Name)
and test.value.id == "self"
and test.attr == "_update_in_progress"
):
has_guard = any(isinstance(stmt, ast.Return) for stmt in node.body)
if has_guard:
break
self.assertTrue(has_guard, "check_for_updates must return when update is already in progress")
def test_check_for_updates_connects_thread_finish_handler(self):
method = self._find_method("check_for_updates")
for node in self._iter_nodes(method):
if not isinstance(node, ast.Call):
continue
func = node.func
if not (isinstance(func, ast.Attribute) and func.attr == "connect"):
continue
value = func.value
if not (
isinstance(value, ast.Attribute)
and value.attr == "finished"
and isinstance(value.value, ast.Attribute)
and value.value.attr == "update_thread"
and isinstance(value.value.value, ast.Name)
and value.value.value.id == "self"
):
continue
if len(node.args) != 1:
continue
arg = node.args[0]
if (
isinstance(arg, ast.Attribute)
and arg.attr == "_on_update_thread_finished"
and isinstance(arg.value, ast.Name)
and arg.value.id == "self"
):
return
self.fail("update_thread.finished must be connected to _on_update_thread_finished")
def test_on_update_thread_finished_clears_update_state(self):
method = self._find_method("_on_update_thread_finished")
assignments = {}
for node in method.body:
if not isinstance(node, ast.Assign) or len(node.targets) != 1:
continue
target = node.targets[0]
if (
isinstance(target, ast.Attribute)
and isinstance(target.value, ast.Name)
and target.value.id == "self"
):
assignments[target.attr] = node.value
self.assertIn("_update_in_progress", assignments)
self.assertIn("update_checker", assignments)
self.assertIn("update_thread", assignments)
self.assertIsInstance(assignments["_update_in_progress"], ast.Constant)
self.assertIs(assignments["_update_in_progress"].value, False)
self.assertIsInstance(assignments["update_checker"], ast.Constant)
self.assertIsNone(assignments["update_checker"].value)
self.assertIsInstance(assignments["update_thread"], ast.Constant)
self.assertIsNone(assignments["update_thread"].value)
if __name__ == "__main__":
unittest.main()