import os import sys import time from vk_api import VkApi from vk_api.exceptions import VkApiError class VkService: def __init__(self): self.session = None self.api = None def set_token(self, token): self.session = VkApi(token=token) self.api = self.session.get_api() def clear(self): self.session = None self.api = None @staticmethod def build_auth_command(auth_url, output_path, entry_script_path=None): if getattr(sys, "frozen", False): return sys.executable, ["--auth", auth_url, output_path] script_path = entry_script_path or os.path.abspath(__file__) return sys.executable, [script_path, "--auth", auth_url, output_path] @staticmethod def vk_error_code(exc): error = getattr(exc, "error", None) if isinstance(error, dict): return error.get("error_code") return getattr(exc, "code", None) @classmethod def is_auth_error(cls, exc, formatted_message=None): code = cls.vk_error_code(exc) if code == 5: return True message = (formatted_message or str(exc)).lower() return "invalid_access_token" in message or "user authorization failed" in message @classmethod def is_retryable_error(cls, exc): return cls.vk_error_code(exc) in (6, 9, 10) def call_with_retry(self, func, *args, **kwargs): max_attempts = 5 for attempt in range(1, max_attempts + 1): try: return func(*args, **kwargs) except VkApiError as e: if not self.is_retryable_error(e) or attempt == max_attempts: raise delay = min(2.0, 0.35 * (2 ** (attempt - 1))) if self.vk_error_code(e) == 9: delay = max(delay, 1.0) time.sleep(delay)