import json import os import re import urllib.error import urllib.request from urllib.parse import urlparse try: from PySide6.QtCore import QObject, Signal except Exception: class _FallbackBoundSignal: def __init__(self): self._callbacks = [] def connect(self, callback): if callback is not None: self._callbacks.append(callback) def emit(self, *args, **kwargs): for callback in list(self._callbacks): callback(*args, **kwargs) class _FallbackSignalDescriptor: def __init__(self): self._storage_name = "" def __set_name__(self, owner, name): self._storage_name = f"__fallback_signal_{name}" def __get__(self, instance, owner): if instance is None: return self signal = instance.__dict__.get(self._storage_name) if signal is None: signal = _FallbackBoundSignal() instance.__dict__[self._storage_name] = signal return signal class QObject: pass def Signal(*_args, **_kwargs): return _FallbackSignalDescriptor() def _version_key(version_text): parts = [int(x) for x in re.findall(r"\d+", str(version_text))] if not parts: return (0, 0, 0) while len(parts) < 3: parts.append(0) return tuple(parts[:3]) def _is_newer_version(latest_version, current_version): latest_key = _version_key(latest_version) current_key = _version_key(current_version) return latest_key > current_key def _sanitize_repo_url(value): value = (value or "").strip() if not value: return "" if "://" not in value and value.count("/") == 1: return f"https://github.com/{value}" parsed = urlparse(value) if not parsed.scheme or not parsed.netloc: return "" clean_path = parsed.path.rstrip("/") if clean_path.endswith(".git"): clean_path = clean_path[:-4] return f"{parsed.scheme}://{parsed.netloc}{clean_path}" def _normalize_update_channel(value): channel = (value or "").strip().lower() if channel in ("beta", "betas", "pre", "prerelease", "pre-release"): return "beta" return "stable" def _select_release_from_list(releases): for item in releases: if not isinstance(item, dict): continue if item.get("draft"): continue tag_name = (item.get("tag_name") or item.get("name") or "").strip() if not tag_name: continue return item return None def _extract_release_payload(release_data, repository_url, current_version): parsed = urlparse(repository_url) base_url = f"{parsed.scheme}://{parsed.netloc}" repo_path = parsed.path.strip("/") releases_url = f"{base_url}/{repo_path}/releases" latest_tag = release_data.get("tag_name") or release_data.get("name") or "" latest_version = latest_tag.lstrip("vV").strip() html_url = release_data.get("html_url") or releases_url release_notes = (release_data.get("body") or "").strip() assets = release_data.get("assets") or [] download_url = "" download_name = "" checksum_url = "" installer_url = "" installer_name = "" for asset in assets: url = asset.get("browser_download_url", "") if url.lower().endswith(".zip"): download_url = url download_name = asset.get("name", "") break if not download_url and assets: download_url = assets[0].get("browser_download_url", "") download_name = assets[0].get("name", "") for asset in assets: url = asset.get("browser_download_url", "") name = asset.get("name", "") name_lower = name.lower() if installer_url: break if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower): installer_url = url installer_name = name for asset in assets: name = asset.get("name", "").lower() if not name: continue is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt") if not is_checksum_asset: continue if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")): checksum_url = asset.get("browser_download_url", "") break if not checksum_url: checksum_url = asset.get("browser_download_url", "") return { "repository_url": repository_url, "latest_version": latest_version, "current_version": current_version, "latest_tag": latest_tag, "release_url": html_url, "release_notes": release_notes, "download_url": download_url, "download_name": download_name, "installer_url": installer_url, "installer_name": installer_name, "checksum_url": checksum_url, "has_update": _is_newer_version(latest_version, current_version), } def detect_update_repository_url(configured_url="", configured_repo=""): env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", "")) if env_url: return env_url env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", "")) if env_repo: return env_repo cfg_url = _sanitize_repo_url(configured_url) if cfg_url: return cfg_url cfg_repo = _sanitize_repo_url(configured_repo) if cfg_repo: return cfg_repo git_config_path = os.path.join(os.path.abspath("."), ".git", "config") if not os.path.exists(git_config_path): return "" try: with open(git_config_path, "r", encoding="utf-8") as f: content = f.read() match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content) if not match: return "" remote = match.group(1).strip() if remote.startswith("git@"): ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote) if ssh_match: return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}") return _sanitize_repo_url(remote) except Exception: return "" class UpdateChecker(QObject): check_finished = Signal(dict) check_failed = Signal(str) def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"): super().__init__() self.repository_url = repository_url self.current_version = current_version self.request_timeout = request_timeout self.channel = _normalize_update_channel(channel) def run(self): if not self.repository_url: self.check_failed.emit("Не задан URL репозитория обновлений.") return parsed = urlparse(self.repository_url) base_url = f"{parsed.scheme}://{parsed.netloc}" repo_path = parsed.path.strip("/") if not repo_path or repo_path.count("/") < 1: self.check_failed.emit("Некорректный URL репозитория обновлений.") return use_beta_channel = self.channel == "beta" if parsed.netloc.lower().endswith("github.com"): if use_beta_channel: api_url = f"https://api.github.com/repos/{repo_path}/releases" else: api_url = f"https://api.github.com/repos/{repo_path}/releases/latest" else: if use_beta_channel: api_url = f"{base_url}/api/v1/repos/{repo_path}/releases" else: api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest" releases_url = f"{base_url}/{repo_path}/releases" request = urllib.request.Request( api_url, headers={ "Accept": "application/vnd.github+json", "User-Agent": "AnabasisManager-Updater", }, ) try: with urllib.request.urlopen(request, timeout=self.request_timeout) as response: response_data = json.loads(response.read().decode("utf-8")) except urllib.error.HTTPError as e: self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}") return except urllib.error.URLError as e: self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}") return except Exception as e: self.check_failed.emit(f"Не удалось проверить обновления: {e}") return release_data = response_data if use_beta_channel: if not isinstance(response_data, list): self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.") return release_data = _select_release_from_list(response_data) if not release_data: self.check_failed.emit("В канале beta не найдено доступных релизов.") return elif not isinstance(response_data, dict): self.check_failed.emit("Сервер вернул некорректный ответ релиза.") return payload = _extract_release_payload(release_data, self.repository_url, self.current_version) payload["release_channel"] = self.channel payload["releases_url"] = releases_url self.check_finished.emit(payload)