263 lines
9.4 KiB
Python
263 lines
9.4 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import urllib.error
|
||
import urllib.request
|
||
from urllib.parse import urlparse
|
||
|
||
try:
|
||
from PySide6.QtCore import QObject, Signal
|
||
except Exception:
|
||
class _FallbackBoundSignal:
|
||
def __init__(self):
|
||
self._callbacks = []
|
||
|
||
def connect(self, callback):
|
||
if callback is not None:
|
||
self._callbacks.append(callback)
|
||
|
||
def emit(self, *args, **kwargs):
|
||
for callback in list(self._callbacks):
|
||
callback(*args, **kwargs)
|
||
|
||
class _FallbackSignalDescriptor:
|
||
def __init__(self):
|
||
self._storage_name = ""
|
||
|
||
def __set_name__(self, owner, name):
|
||
self._storage_name = f"__fallback_signal_{name}"
|
||
|
||
def __get__(self, instance, owner):
|
||
if instance is None:
|
||
return self
|
||
signal = instance.__dict__.get(self._storage_name)
|
||
if signal is None:
|
||
signal = _FallbackBoundSignal()
|
||
instance.__dict__[self._storage_name] = signal
|
||
return signal
|
||
|
||
class QObject:
|
||
pass
|
||
|
||
def Signal(*_args, **_kwargs):
|
||
return _FallbackSignalDescriptor()
|
||
|
||
|
||
def _version_key(version_text):
|
||
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
|
||
if not parts:
|
||
return (0, 0, 0)
|
||
while len(parts) < 3:
|
||
parts.append(0)
|
||
return tuple(parts[:3])
|
||
|
||
|
||
def _is_newer_version(latest_version, current_version):
|
||
latest_key = _version_key(latest_version)
|
||
current_key = _version_key(current_version)
|
||
return latest_key > current_key
|
||
|
||
|
||
def _sanitize_repo_url(value):
|
||
value = (value or "").strip()
|
||
if not value:
|
||
return ""
|
||
if "://" not in value and value.count("/") == 1:
|
||
return f"https://github.com/{value}"
|
||
parsed = urlparse(value)
|
||
if not parsed.scheme or not parsed.netloc:
|
||
return ""
|
||
clean_path = parsed.path.rstrip("/")
|
||
if clean_path.endswith(".git"):
|
||
clean_path = clean_path[:-4]
|
||
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
|
||
|
||
|
||
def _normalize_update_channel(value):
|
||
channel = (value or "").strip().lower()
|
||
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
|
||
return "beta"
|
||
return "stable"
|
||
|
||
|
||
def _select_release_from_list(releases):
|
||
for item in releases:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
if item.get("draft"):
|
||
continue
|
||
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
|
||
if not tag_name:
|
||
continue
|
||
return item
|
||
return None
|
||
|
||
|
||
def _extract_release_payload(release_data, repository_url, current_version):
|
||
parsed = urlparse(repository_url)
|
||
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||
repo_path = parsed.path.strip("/")
|
||
releases_url = f"{base_url}/{repo_path}/releases"
|
||
|
||
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
|
||
latest_version = latest_tag.lstrip("vV").strip()
|
||
html_url = release_data.get("html_url") or releases_url
|
||
assets = release_data.get("assets") or []
|
||
download_url = ""
|
||
download_name = ""
|
||
checksum_url = ""
|
||
installer_url = ""
|
||
installer_name = ""
|
||
for asset in assets:
|
||
url = asset.get("browser_download_url", "")
|
||
if url.lower().endswith(".zip"):
|
||
download_url = url
|
||
download_name = asset.get("name", "")
|
||
break
|
||
if not download_url and assets:
|
||
download_url = assets[0].get("browser_download_url", "")
|
||
download_name = assets[0].get("name", "")
|
||
|
||
for asset in assets:
|
||
url = asset.get("browser_download_url", "")
|
||
name = asset.get("name", "")
|
||
name_lower = name.lower()
|
||
if installer_url:
|
||
break
|
||
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
|
||
installer_url = url
|
||
installer_name = name
|
||
|
||
for asset in assets:
|
||
name = asset.get("name", "").lower()
|
||
if not name:
|
||
continue
|
||
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
|
||
if not is_checksum_asset:
|
||
continue
|
||
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
|
||
checksum_url = asset.get("browser_download_url", "")
|
||
break
|
||
if not checksum_url:
|
||
checksum_url = asset.get("browser_download_url", "")
|
||
|
||
return {
|
||
"repository_url": repository_url,
|
||
"latest_version": latest_version,
|
||
"current_version": current_version,
|
||
"latest_tag": latest_tag,
|
||
"release_url": html_url,
|
||
"download_url": download_url,
|
||
"download_name": download_name,
|
||
"installer_url": installer_url,
|
||
"installer_name": installer_name,
|
||
"checksum_url": checksum_url,
|
||
"has_update": _is_newer_version(latest_version, current_version),
|
||
}
|
||
|
||
|
||
def detect_update_repository_url(configured_url="", configured_repo=""):
|
||
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
|
||
if env_url:
|
||
return env_url
|
||
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
|
||
if env_repo:
|
||
return env_repo
|
||
cfg_url = _sanitize_repo_url(configured_url)
|
||
if cfg_url:
|
||
return cfg_url
|
||
cfg_repo = _sanitize_repo_url(configured_repo)
|
||
if cfg_repo:
|
||
return cfg_repo
|
||
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
|
||
if not os.path.exists(git_config_path):
|
||
return ""
|
||
try:
|
||
with open(git_config_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
|
||
if not match:
|
||
return ""
|
||
remote = match.group(1).strip()
|
||
if remote.startswith("git@"):
|
||
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
|
||
if ssh_match:
|
||
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
|
||
return _sanitize_repo_url(remote)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
class UpdateChecker(QObject):
|
||
check_finished = Signal(dict)
|
||
check_failed = Signal(str)
|
||
|
||
def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
|
||
super().__init__()
|
||
self.repository_url = repository_url
|
||
self.current_version = current_version
|
||
self.request_timeout = request_timeout
|
||
self.channel = _normalize_update_channel(channel)
|
||
|
||
def run(self):
|
||
if not self.repository_url:
|
||
self.check_failed.emit("Не задан URL репозитория обновлений.")
|
||
return
|
||
|
||
parsed = urlparse(self.repository_url)
|
||
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
||
repo_path = parsed.path.strip("/")
|
||
if not repo_path or repo_path.count("/") < 1:
|
||
self.check_failed.emit("Некорректный URL репозитория обновлений.")
|
||
return
|
||
|
||
use_beta_channel = self.channel == "beta"
|
||
if parsed.netloc.lower().endswith("github.com"):
|
||
if use_beta_channel:
|
||
api_url = f"https://api.github.com/repos/{repo_path}/releases"
|
||
else:
|
||
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
|
||
else:
|
||
if use_beta_channel:
|
||
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
|
||
else:
|
||
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
|
||
releases_url = f"{base_url}/{repo_path}/releases"
|
||
request = urllib.request.Request(
|
||
api_url,
|
||
headers={
|
||
"Accept": "application/vnd.github+json",
|
||
"User-Agent": "AnabasisManager-Updater",
|
||
},
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
|
||
response_data = json.loads(response.read().decode("utf-8"))
|
||
except urllib.error.HTTPError as e:
|
||
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
|
||
return
|
||
except urllib.error.URLError as e:
|
||
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
|
||
return
|
||
except Exception as e:
|
||
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
|
||
return
|
||
|
||
release_data = response_data
|
||
if use_beta_channel:
|
||
if not isinstance(response_data, list):
|
||
self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
|
||
return
|
||
release_data = _select_release_from_list(response_data)
|
||
if not release_data:
|
||
self.check_failed.emit("В канале beta не найдено доступных релизов.")
|
||
return
|
||
elif not isinstance(response_data, dict):
|
||
self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
|
||
return
|
||
|
||
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
|
||
payload["release_channel"] = self.channel
|
||
payload["releases_url"] = releases_url
|
||
self.check_finished.emit(payload)
|