diff --git a/.gitea/workflows/release.yml b/.gitea/workflows/release.yml index b87a692..45da9c3 100644 --- a/.gitea/workflows/release.yml +++ b/.gitea/workflows/release.yml @@ -89,6 +89,18 @@ jobs: throw "Archive not found: $archivePath" } + - name: Generate SHA256 checksum + if: env.CONTINUE == 'true' + shell: powershell + run: | + $version = "${{ steps.extract_version.outputs.version }}" + $archiveName = "AnabasisManager-$version.zip" + $archivePath = "dist/$archiveName" + $checksumPath = "dist/$archiveName.sha256" + $hash = (Get-FileHash -Path $archivePath -Algorithm SHA256).Hash.ToLower() + "$hash $archiveName" | Set-Content -Path $checksumPath -Encoding UTF8 + Write-Host "Checksum created: $checksumPath" + - name: Configure git identity if: env.CONTINUE == 'true' shell: powershell @@ -118,3 +130,4 @@ jobs: Desktop release v${{ steps.extract_version.outputs.version }} files: | dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip + dist/AnabasisManager-${{ steps.extract_version.outputs.version }}.zip.sha256 diff --git a/main.py b/main.py index 33a6d81..af955ca 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ import time import auth_webview import os import re +import hashlib import subprocess import threading import tempfile @@ -357,13 +358,30 @@ class UpdateChecker(QObject): html_url = release_data.get("html_url") or releases_url assets = release_data.get("assets") or [] download_url = "" + download_name = "" + checksum_url = "" for asset in assets: url = asset.get("browser_download_url", "") if url.lower().endswith(".zip"): download_url = url + download_name = asset.get("name", "") break if not download_url and assets: download_url = assets[0].get("browser_download_url", "") + download_name = assets[0].get("name", "") + + for asset in assets: + name = asset.get("name", "").lower() + if not name: + continue + is_checksum_asset = name.endswith(".sha256") or name.endswith(".sha256.txt") or name in ("checksums.txt", "sha256sums.txt") + if not is_checksum_asset: + continue + if download_name and (download_name.lower() in name or name in (f"{download_name.lower()}.sha256", f"{download_name.lower()}.sha256.txt")): + checksum_url = asset.get("browser_download_url", "") + break + if not checksum_url: + checksum_url = asset.get("browser_download_url", "") self.check_finished.emit( { @@ -373,6 +391,8 @@ class UpdateChecker(QObject): "latest_tag": latest_tag, "release_url": html_url, "download_url": download_url, + "download_name": download_name, + "checksum_url": checksum_url, "has_update": _is_newer_version(latest_version, self.current_version), } ) @@ -707,9 +727,11 @@ class VkChatManager(QMainWindow): clicked = message_box.clickedButton() download_url = result.get("download_url") + checksum_url = result.get("checksum_url") + download_name = result.get("download_name") release_url = result.get("release_url") if clicked is update_now_button and download_url: - if not self._start_auto_update(download_url, latest_version): + if not self._start_auto_update(download_url, latest_version, checksum_url, download_name): if release_url: QDesktopServices.openUrl(QUrl(release_url)) return @@ -754,6 +776,53 @@ class VkChatManager(QMainWindow): with open(destination_path, "wb") as f: shutil.copyfileobj(response, f) + def _download_update_text(self, url): + request = urllib.request.Request( + url, + headers={"User-Agent": "AnabasisManager-Updater"}, + ) + with urllib.request.urlopen(request, timeout=30) as response: + return response.read().decode("utf-8", errors="replace") + + @staticmethod + def _sha256_file(path): + digest = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest().lower() + + @staticmethod + def _extract_sha256_from_text(checksum_text, target_file_name): + target = (target_file_name or "").strip().lower() + for raw_line in checksum_text.splitlines(): + line = raw_line.strip() + if not line: + continue + match = re.search(r"\b([A-Fa-f0-9]{64})\b", line) + if not match: + continue + checksum = match.group(1).lower() + if not target: + return checksum + line_lower = line.lower() + if target in line_lower: + return checksum + if os.path.basename(target) in line_lower: + return checksum + return "" + + def _verify_update_checksum(self, zip_path, checksum_url, download_name): + if not checksum_url: + raise RuntimeError("В релизе нет файла SHA256. Автообновление остановлено.") + checksum_text = self._download_update_text(checksum_url) + expected_hash = self._extract_sha256_from_text(checksum_text, download_name or os.path.basename(zip_path)) + if not expected_hash: + raise RuntimeError("Не удалось найти SHA256 для архива обновления.") + actual_hash = self._sha256_file(zip_path) + if actual_hash != expected_hash: + raise RuntimeError("SHA256 не совпадает, обновление отменено.") + def _locate_extracted_root(self, extracted_dir): entries = [] for name in os.listdir(extracted_dir): @@ -788,7 +857,7 @@ class VkChatManager(QMainWindow): f.write("\r\n".join(script_lines) + "\r\n") return script_path - def _start_auto_update(self, download_url, latest_version): + def _start_auto_update(self, download_url, latest_version, checksum_url="", download_name=""): if os.name != "nt": QMessageBox.information( self, @@ -814,6 +883,7 @@ class VkChatManager(QMainWindow): unpack_dir = os.path.join(work_dir, "extracted") try: self._download_update_archive(download_url, zip_path) + self._verify_update_checksum(zip_path, checksum_url, download_name) os.makedirs(unpack_dir, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as archive: archive.extractall(unpack_dir) diff --git a/tests/test_auth_relogin_smoke.py b/tests/test_auth_relogin_smoke.py index f5090b8..6562190 100644 --- a/tests/test_auth_relogin_smoke.py +++ b/tests/test_auth_relogin_smoke.py @@ -44,7 +44,8 @@ class AuthReloginSmokeTests(unittest.TestCase): self.assertIn("def check_for_updates(self, silent_no_updates=False):", self.source) self.assertIn("class UpdateChecker(QObject):", self.source) self.assertIn('message_box.addButton("Обновить сейчас", QMessageBox.AcceptRole)', self.source) - self.assertIn("def _start_auto_update(self, download_url, latest_version):", self.source) + self.assertIn("def _start_auto_update(self, download_url, latest_version, checksum_url=\"\", download_name=\"\"):", self.source) + self.assertIn("def _verify_update_checksum(self, zip_path, checksum_url, download_name):", self.source) if __name__ == "__main__":