Files
AnabasisChatRemove/services/update_service.py
benya 4b3347a069
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
fix(update,security): add release notes in updater and bump to 2.2.1
2026-02-15 23:51:44 +03:00

265 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import re
import urllib.error
import urllib.request
from urllib.parse import urlparse
try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text):
parts = [int(x) for x in re.findall(r"\d+", str(version_text))]
if not parts:
return (0, 0, 0)
while len(parts) < 3:
parts.append(0)
return tuple(parts[:3])
def _is_newer_version(latest_version, current_version):
latest_key = _version_key(latest_version)
current_key = _version_key(current_version)
return latest_key > current_key
def _sanitize_repo_url(value):
value = (value or "").strip()
if not value:
return ""
if "://" not in value and value.count("/") == 1:
return f"https://github.com/{value}"
parsed = urlparse(value)
if not parsed.scheme or not parsed.netloc:
return ""
clean_path = parsed.path.rstrip("/")
if clean_path.endswith(".git"):
clean_path = clean_path[:-4]
return f"{parsed.scheme}://{parsed.netloc}{clean_path}"
def _normalize_update_channel(value):
channel = (value or "").strip().lower()
if channel in ("beta", "betas", "pre", "prerelease", "pre-release"):
return "beta"
return "stable"
def _select_release_from_list(releases):
for item in releases:
if not isinstance(item, dict):
continue
if item.get("draft"):
continue
tag_name = (item.get("tag_name") or item.get("name") or "").strip()
if not tag_name:
continue
return item
return None
def _extract_release_payload(release_data, repository_url, current_version):
parsed = urlparse(repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
releases_url = f"{base_url}/{repo_path}/releases"
latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or []
download_url = ""
download_name = ""
checksum_url = ""
installer_url = ""
installer_name = ""
for asset in assets:
url = asset.get("browser_download_url", "")
if url.lower().endswith(".zip"):
download_url = url
download_name = asset.get("name", "")
break
if not download_url and assets:
download_url = assets[0].get("browser_download_url", "")
download_name = assets[0].get("name", "")
for asset in assets:
url = asset.get("browser_download_url", "")
name = asset.get("name", "")
name_lower = name.lower()
if installer_url:
break
if url.lower().endswith(".exe") and ("setup" in name_lower or "installer" in name_lower):
installer_url = url
installer_name = name
for asset in assets:
name = asset.get("name", "").lower()
if not name:
continue
is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt")
if not is_checksum_asset:
continue
if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")):
checksum_url = asset.get("browser_download_url", "")
break
if not checksum_url:
checksum_url = asset.get("browser_download_url", "")
return {
"repository_url": repository_url,
"latest_version": latest_version,
"current_version": current_version,
"latest_tag": latest_tag,
"release_url": html_url,
"release_notes": release_notes,
"download_url": download_url,
"download_name": download_name,
"installer_url": installer_url,
"installer_name": installer_name,
"checksum_url": checksum_url,
"has_update": _is_newer_version(latest_version, current_version),
}
def detect_update_repository_url(configured_url="", configured_repo=""):
env_url = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_URL", ""))
if env_url:
return env_url
env_repo = _sanitize_repo_url(os.getenv("ANABASIS_UPDATE_REPOSITORY", ""))
if env_repo:
return env_repo
cfg_url = _sanitize_repo_url(configured_url)
if cfg_url:
return cfg_url
cfg_repo = _sanitize_repo_url(configured_repo)
if cfg_repo:
return cfg_repo
git_config_path = os.path.join(os.path.abspath("."), ".git", "config")
if not os.path.exists(git_config_path):
return ""
try:
with open(git_config_path, "r", encoding="utf-8") as f:
content = f.read()
match = re.search(r"url\s*=\s*((?:https?://|git@)[^\s]+)", content)
if not match:
return ""
remote = match.group(1).strip()
if remote.startswith("git@"):
ssh_match = re.match(r"git@([^:]+):(.+?)(?:\.git)?$", remote)
if ssh_match:
return _sanitize_repo_url(f"https://{ssh_match.group(1)}/{ssh_match.group(2)}")
return _sanitize_repo_url(remote)
except Exception:
return ""
class UpdateChecker(QObject):
check_finished = Signal(dict)
check_failed = Signal(str)
def __init__(self, repository_url, current_version, request_timeout=8, channel="stable"):
super().__init__()
self.repository_url = repository_url
self.current_version = current_version
self.request_timeout = request_timeout
self.channel = _normalize_update_channel(channel)
def run(self):
if not self.repository_url:
self.check_failed.emit("Не задан URL репозитория обновлений.")
return
parsed = urlparse(self.repository_url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
repo_path = parsed.path.strip("/")
if not repo_path or repo_path.count("/") < 1:
self.check_failed.emit("Некорректный URL репозитория обновлений.")
return
use_beta_channel = self.channel == "beta"
if parsed.netloc.lower().endswith("github.com"):
if use_beta_channel:
api_url = f"https://api.github.com/repos/{repo_path}/releases"
else:
api_url = f"https://api.github.com/repos/{repo_path}/releases/latest"
else:
if use_beta_channel:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases"
else:
api_url = f"{base_url}/api/v1/repos/{repo_path}/releases/latest"
releases_url = f"{base_url}/{repo_path}/releases"
request = urllib.request.Request(
api_url,
headers={
"Accept": "application/vnd.github+json",
"User-Agent": "AnabasisManager-Updater",
},
)
try:
with urllib.request.urlopen(request, timeout=self.request_timeout) as response:
response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
self.check_failed.emit(f"Ошибка HTTP при проверке обновлений: {e.code}")
return
except urllib.error.URLError as e:
self.check_failed.emit(f"Сетевая ошибка при проверке обновлений: {e}")
return
except Exception as e:
self.check_failed.emit(f"Не удалось проверить обновления: {e}")
return
release_data = response_data
if use_beta_channel:
if not isinstance(response_data, list):
self.check_failed.emit("Сервер вернул некорректный ответ списка релизов.")
return
release_data = _select_release_from_list(response_data)
if not release_data:
self.check_failed.emit("В канале beta не найдено доступных релизов.")
return
elif not isinstance(response_data, dict):
self.check_failed.emit("Сервер вернул некорректный ответ релиза.")
return
payload = _extract_release_payload(release_data, self.repository_url, self.current_version)
payload["release_channel"] = self.channel
payload["releases_url"] = releases_url
self.check_finished.emit(payload)