Files
AnabasisChatRemove/services/vk_service.py

60 lines
1.8 KiB
Python

import os
import sys
import time
from vk_api import VkApi
from vk_api.exceptions import VkApiError
class VkService:
def __init__(self):
self.session = None
self.api = None
def set_token(self, token):
self.session = VkApi(token=token)
self.api = self.session.get_api()
def clear(self):
self.session = None
self.api = None
@staticmethod
def build_auth_command(auth_url, output_path, entry_script_path=None):
if getattr(sys, "frozen", False):
return sys.executable, ["--auth", auth_url, output_path]
script_path = entry_script_path or os.path.abspath(__file__)
return sys.executable, [script_path, "--auth", auth_url, output_path]
@staticmethod
def vk_error_code(exc):
error = getattr(exc, "error", None)
if isinstance(error, dict):
return error.get("error_code")
return getattr(exc, "code", None)
@classmethod
def is_auth_error(cls, exc, formatted_message=None):
code = cls.vk_error_code(exc)
if code == 5:
return True
message = (formatted_message or str(exc)).lower()
return "invalid_access_token" in message or "user authorization failed" in message
@classmethod
def is_retryable_error(cls, exc):
return cls.vk_error_code(exc) in (6, 9, 10)
def call_with_retry(self, func, *args, **kwargs):
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except VkApiError as e:
if not self.is_retryable_error(e) or attempt == max_attempts:
raise
delay = min(2.0, 0.35 * (2 ** (attempt - 1)))
if self.vk_error_code(e) == 9:
delay = max(delay, 1.0)
time.sleep(delay)