5 Commits

Author SHA1 Message Date
4b3347a069 fix(update,security): add release notes in updater and bump to 2.2.1
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 3m30s
2026-02-15 23:51:44 +03:00
e0628b1792 chore(version): bump to 2.2.0
Some checks are pending
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Has started running
2026-02-15 23:49:04 +03:00
201184700f docs(readme): update install steps and feature list 2026-02-15 23:48:15 +03:00
5253c942e8 fix(core,security): safe update extraction and async bulk vk actions 2026-02-15 23:42:51 +03:00
c645d964bf fix(auth,ui): add auth webview cli entrypoint and bulk action progress bar
All checks were successful
Desktop CI / tests (push) Successful in 13s
Desktop Release / release (push) Successful in 17s
2026-02-15 23:31:01 +03:00
7 changed files with 336 additions and 95 deletions

View File

@@ -13,9 +13,12 @@
* Моментальная загрузка всех доступных чатов пользователя. * Моментальная загрузка всех доступных чатов пользователя.
* Групповой выбор чатов («Выбрать все» / «Снять выбор»). * Групповой выбор чатов («Выбрать все» / «Снять выбор»).
* Быстрое обновление списка бесед. * Быстрое обновление списка бесед.
* Выполнение массовых действий в фоновом потоке без подвисания интерфейса.
* Визуальный прогресс-бар по ходу операции.
* **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`). * **👤 Интеллектуальный поиск ID:** Автоматическое распознавание ID пользователя из ссылок любого формата (например, `vk.com/id123`, `vk.com/durov` или просто `durov`).
* **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно. * **🛠 Управление в один клик:** Кнопки для мгновенного исключения или приглашения пользователя во все выбранные чаты одновременно.
* **🛡 Стабильность:** Улучшенная обработка ошибок VK API и автоматическая реакция на смену IP-адреса. * **🔄 Безопасные обновления:** Проверка SHA256 и защищенная распаковка архива обновления.
* **🛡 Стабильность и безопасность:** Улучшенная обработка ошибок VK API, автоматическая реакция на смену IP-адреса и безопасное хранение токена с шифрованием DPAPI в Windows.
--- ---
@@ -47,7 +50,7 @@
3. **Установите зависимости:** 3. **Установите зависимости:**
```bash ```bash
pip install PySide6 vk_api pip install -r requirements.txt
``` ```
4. **Запустите приложение:** 4. **Запустите приложение:**
@@ -68,6 +71,12 @@
## 📂 Техническая информация ## 📂 Техническая информация
### Последние обновления
- Массовые операции VK (`remove/add/admin`) выполняются в фоновом потоке, чтобы интерфейс не зависал; добавлен визуальный прогресс-бар.
- Распаковка архива автообновления теперь валидирует пути перед извлечением для защиты от path traversal.
- Проверка обновлений переведена на `QThread` (модель потоков Qt) вместо Python `threading.Thread`.
- В Windows сохранение токена требует успешного шифрования через DPAPI; при ошибке шифрования сессия продолжается, но токен не сохраняется на диск.
### Сборка проекта (для разработчиков) ### Сборка проекта (для разработчиков)
Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`. Проект использует кастомный скрипт автоматизации `build.py`, который оптимизирует зависимости `PySide6` и корректно упаковывает `QtWebEngineCore`.

View File

@@ -1 +1 @@
APP_VERSION = "2.1.2" APP_VERSION = "2.2.1"

View File

@@ -74,5 +74,21 @@ def main_auth(auth_url, output_path):
webview.start(private_mode=False, storage_path=storage_path) webview.start(private_mode=False, storage_path=storage_path)
def main():
# Supports both: `python auth_webview.py <auth_url> <output_path>`
# and: `python auth_webview.py --auth <auth_url> <output_path>`
args = sys.argv[1:]
if len(args) == 3 and args[0] == "--auth":
auth_url, output_path = args[1], args[2]
elif len(args) == 2:
auth_url, output_path = args[0], args[1]
else:
print("Usage: auth_webview.py [--auth] <auth_url> <output_path>")
return 1
main_auth(auth_url, output_path)
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

339
main.py
View File

@@ -1,13 +1,12 @@
import json import json
import os import os
import shutil import shutil
import sys import sys
import threading
import time import time
from PySide6.QtCore import QProcess from PySide6.QtCore import QProcess
from PySide6.QtCore import QStandardPaths from PySide6.QtCore import QStandardPaths
from PySide6.QtCore import Qt, QUrl, QDateTime, QTimer from PySide6.QtCore import QObject, QThread, Signal, Qt, QUrl, QDateTime, QTimer
from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices from PySide6.QtGui import QIcon, QAction, QActionGroup, QDesktopServices
from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit, from PySide6.QtWidgets import (QApplication, QMainWindow, QLabel, QLineEdit,
QPushButton, QVBoxLayout, QWidget, QMessageBox, QPushButton, QVBoxLayout, QWidget, QMessageBox,
@@ -50,6 +49,7 @@ UPDATE_REPOSITORY = ""
UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove" UPDATE_REPOSITORY_URL = "https://git.daemonlord.ru/benya/AnabasisChatRemove"
UPDATE_CHANNEL_DEFAULT = "stable" UPDATE_CHANNEL_DEFAULT = "stable"
UPDATE_REQUEST_TIMEOUT = 8 UPDATE_REQUEST_TIMEOUT = 8
AUTH_ERROR_CONTEXTS = ("load_chats", "execute_user_action", "set_user_admin", "_unused")
def get_resource_path(relative_path): def get_resource_path(relative_path):
@@ -59,6 +59,86 @@ def get_resource_path(relative_path):
# Для cx_Freeze и обычного запуска # Для cx_Freeze и обычного запуска
return os.path.join(os.path.abspath("."), relative_path) return os.path.join(os.path.abspath("."), relative_path)
class BulkActionWorker(QObject):
progress = Signal(int, int, str)
finished = Signal(dict)
auth_error = Signal(str, object, str)
failed = Signal(str)
done = Signal()
def __init__(self, vk_call_with_retry, vk_api, action_type, selected_chats, user_infos, visible_messages):
super().__init__()
self.vk_call_with_retry = vk_call_with_retry
self.vk = vk_api
self.action_type = action_type
self.selected_chats = selected_chats
self.user_infos = user_infos
self.visible_messages = visible_messages
self.total = len(self.selected_chats) * len(self.user_infos)
@staticmethod
def _is_auth_error(exc):
return VkService.is_auth_error(exc, str(exc).lower())
def _emit_progress(self, processed):
label = "admin" if self.action_type == "admin" else ("remove" if self.action_type == "remove" else "add")
self.progress.emit(processed, self.total, label)
def run(self):
results = []
processed = 0
try:
for chat in self.selected_chats:
peer_id = None
if self.action_type == "admin":
try:
peer_id = 2000000000 + int(chat["id"])
except (ValueError, TypeError):
for _user_id, user_info in self.user_infos.items():
results.append(f"✗ Ошибка ID чата: {chat['id']} ({user_info})")
processed += 1
self._emit_progress(processed)
continue
for user_id, user_info in self.user_infos.items():
try:
if self.action_type == "remove":
self.vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat["id"], member_id=user_id)
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
elif self.action_type == "add":
params = {"chat_id": chat["id"], "user_id": user_id}
if self.visible_messages:
params["visible_messages_count"] = 250
self.vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
elif self.action_type == "admin":
self.vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin",
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
else:
raise RuntimeError(f"Unknown action: {self.action_type}")
except VkApiError as exc:
if self._is_auth_error(exc):
context = "set_user_admin" if self.action_type == "admin" else "execute_user_action"
action_name = "назначения администраторов" if self.action_type == "admin" else "выполнения операций с пользователями"
self.auth_error.emit(context, exc, action_name)
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {exc}")
finally:
processed += 1
self._emit_progress(processed)
self.finished.emit({"results": results, "processed": processed, "total": self.total})
except Exception as exc:
self.failed.emit(str(exc))
finally:
self.done.emit()
class VkChatManager(QMainWindow): class VkChatManager(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -85,11 +165,18 @@ class VkChatManager(QMainWindow):
self._auth_ui_busy = False self._auth_ui_busy = False
self._auth_relogin_in_progress = False self._auth_relogin_in_progress = False
self._last_auth_relogin_ts = 0.0 self._last_auth_relogin_ts = 0.0
self._active_action_button = None
self._active_action_button_text = ""
self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY) self.update_repository_url = detect_update_repository_url(UPDATE_REPOSITORY_URL, UPDATE_REPOSITORY)
self.update_channel = UPDATE_CHANNEL_DEFAULT self.update_channel = UPDATE_CHANNEL_DEFAULT
self.update_checker = None self.update_checker = None
self.update_thread = None self.update_thread = None
self._update_check_silent = False self._update_check_silent = False
self._bulk_worker_thread = None
self._bulk_worker = None
self._bulk_action_context = None
self._bulk_action_success_message_title = ""
self._bulk_clear_inputs_on_success = True
self.resolve_timer = QTimer(self) self.resolve_timer = QTimer(self)
self.resolve_timer.setSingleShot(True) self.resolve_timer.setSingleShot(True)
@@ -184,6 +271,12 @@ class VkChatManager(QMainWindow):
self.add_user_btn.setMinimumHeight(50) self.add_user_btn.setMinimumHeight(50)
self.add_user_btn.clicked.connect(self.add_user_to_chat) self.add_user_btn.clicked.connect(self.add_user_to_chat)
layout.addWidget(self.add_user_btn) layout.addWidget(self.add_user_btn)
self.operation_progress = QProgressBar()
self.operation_progress.setRange(0, 100)
self.operation_progress.setValue(0)
self.operation_progress.setTextVisible(True)
self.operation_progress.hide()
layout.addWidget(self.operation_progress)
self.status_label = QLabel("Статус: не авторизован") self.status_label = QLabel("Статус: не авторизован")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.status_label) layout.addWidget(self.status_label)
@@ -369,7 +462,7 @@ class VkChatManager(QMainWindow):
self._log_event("update_channel", f"update_channel={self.update_channel}") self._log_event("update_channel", f"update_channel={self.update_channel}")
def check_for_updates(self, silent_no_updates=False): def check_for_updates(self, silent_no_updates=False):
if self.update_thread and self.update_thread.is_alive(): if self.update_thread and self.update_thread.isRunning():
return return
self._update_check_silent = silent_no_updates self._update_check_silent = silent_no_updates
@@ -383,9 +476,16 @@ class VkChatManager(QMainWindow):
request_timeout=UPDATE_REQUEST_TIMEOUT, request_timeout=UPDATE_REQUEST_TIMEOUT,
channel=self.update_channel, channel=self.update_channel,
) )
self.update_thread = QThread(self)
self.update_checker.moveToThread(self.update_thread)
self.update_thread.started.connect(self.update_checker.run)
self.update_checker.check_finished.connect(self._on_update_check_finished) self.update_checker.check_finished.connect(self._on_update_check_finished)
self.update_checker.check_failed.connect(self._on_update_check_failed) self.update_checker.check_failed.connect(self._on_update_check_failed)
self.update_thread = threading.Thread(target=self.update_checker.run, daemon=True) self.update_checker.check_finished.connect(self.update_thread.quit)
self.update_checker.check_failed.connect(self.update_thread.quit)
self.update_checker.check_finished.connect(self.update_checker.deleteLater)
self.update_checker.check_failed.connect(self.update_checker.deleteLater)
self.update_thread.finished.connect(self.update_thread.deleteLater)
self.update_thread.start() self.update_thread.start()
def _on_update_check_finished(self, result): def _on_update_check_finished(self, result):
@@ -405,6 +505,14 @@ class VkChatManager(QMainWindow):
f"Доступная версия: {latest_version}\n\n" f"Доступная версия: {latest_version}\n\n"
"Открыть страницу загрузки?" "Открыть страницу загрузки?"
) )
release_notes = (result.get("release_notes") or "").strip()
if release_notes:
preview_lines = [line.strip() for line in release_notes.splitlines() if line.strip()]
preview_text = "\n".join(preview_lines[:8])
if len(preview_lines) > 8:
preview_text += "\n..."
message_box.setInformativeText(f"Что нового:\n{preview_text}")
message_box.setDetailedText(release_notes)
update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole) update_now_button = message_box.addButton("Обновить сейчас", QMessageBox.ButtonRole.AcceptRole)
download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole) download_button = message_box.addButton("Скачать", QMessageBox.ButtonRole.AcceptRole)
setup_button = None setup_button = None
@@ -550,6 +658,102 @@ class VkChatManager(QMainWindow):
else: else:
self.set_ui_state(True) self.set_ui_state(True)
def _start_operation_progress(self, total, label_text, action_button=None):
total = max(1, int(total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(0)
self.operation_progress.setFormat(f"{label_text}: %v/%m")
self.operation_progress.show()
self._active_action_button = action_button
self._active_action_button_text = action_button.text() if action_button else ""
if action_button is not None:
action_button.setText(f"{label_text} (0/{total})")
action_button.setEnabled(False)
def _update_operation_progress(self, processed, total, label_text):
total = max(1, int(total))
processed = max(0, min(int(processed), total))
self.operation_progress.setRange(0, total)
self.operation_progress.setValue(processed)
self.operation_progress.setFormat(f"{label_text}: {processed}/{total}")
if self._active_action_button is not None:
self._active_action_button.setText(f"{label_text} ({processed}/{total})")
def _finish_operation_progress(self):
self.operation_progress.hide()
self.operation_progress.setValue(0)
self.operation_progress.setFormat("%p%")
if self._active_action_button is not None:
self._active_action_button.setText(self._active_action_button_text or self._active_action_button.text())
self._active_action_button = None
self._active_action_button_text = ""
def _start_bulk_action_worker(
self,
action_type,
selected_chats,
user_infos,
action_label,
action_button=None,
success_message_title="Результаты",
clear_inputs_on_success=True,
):
total = max(1, len(selected_chats) * len(user_infos))
self._bulk_action_context = "set_user_admin" if action_type == "admin" else "execute_user_action"
self._bulk_action_success_message_title = success_message_title
self._bulk_clear_inputs_on_success = clear_inputs_on_success
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...")
self._start_operation_progress(total, action_label, action_button=action_button)
worker = BulkActionWorker(
vk_call_with_retry=self._vk_call_with_retry,
vk_api=self.vk,
action_type=action_type,
selected_chats=selected_chats,
user_infos=user_infos,
visible_messages=self.visible_messages_checkbox.isChecked(),
)
thread = QThread(self)
worker.moveToThread(thread)
thread.started.connect(worker.run)
worker.progress.connect(self._on_bulk_action_progress)
worker.finished.connect(self._on_bulk_action_finished)
worker.auth_error.connect(self._on_bulk_action_auth_error)
worker.failed.connect(self._on_bulk_action_failed)
worker.done.connect(self._on_bulk_action_done)
worker.done.connect(thread.quit)
worker.done.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
self._bulk_worker = worker
self._bulk_worker_thread = thread
thread.start()
def _on_bulk_action_progress(self, processed, total, label):
label_text = {"remove": "исключение", "add": "приглашение", "admin": "назначение админов"}.get(label, label)
self._update_operation_progress(processed, total, label_text)
self.status_label.setText(f"Статус: выполняется {label_text} ({processed}/{max(1, total)})...")
def _on_bulk_action_finished(self, payload):
results = payload.get("results", []) if isinstance(payload, dict) else []
QMessageBox.information(self, self._bulk_action_success_message_title, "\n".join(results))
if self._bulk_clear_inputs_on_success:
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def _on_bulk_action_auth_error(self, context, exc, action_name):
self._handle_vk_api_error(context or self._bulk_action_context or "bulk_action", exc, action_name=action_name)
def _on_bulk_action_failed(self, error_text):
QMessageBox.warning(self, "Ошибка", f"Не удалось выполнить операцию: {error_text}")
def _on_bulk_action_done(self):
self._finish_operation_progress()
self._set_busy(False)
self._bulk_worker = None
self._bulk_worker_thread = None
def _ensure_log_dir(self): def _ensure_log_dir(self):
os.makedirs(APP_DATA_DIR, exist_ok=True) os.makedirs(APP_DATA_DIR, exist_ok=True)
@@ -785,12 +989,26 @@ class VkChatManager(QMainWindow):
self.token = token self.token = token
# Сохраняем и получаем корректный expiration_time (0 или будущее время) # Сохраняем и получаем корректный expiration_time (0 или будущее время)
self.token_expiration_time = token_store_save_token( try:
self.token, self.token_expiration_time = token_store_save_token(
TOKEN_FILE, self.token,
APP_DATA_DIR, TOKEN_FILE,
expires_in=expires_in, APP_DATA_DIR,
) expires_in=expires_in,
)
except Exception as e:
try:
expires_value = int(expires_in)
except (TypeError, ValueError):
expires_value = 0
self.token_expiration_time = (time.time() + expires_value) if expires_value > 0 else 0
self._log_event("token_store_save", str(e), level="WARN")
QMessageBox.warning(
self,
"Предупреждение",
"Не удалось безопасно сохранить токен на диске. "
"Текущая сессия активна, но после перезапуска потребуется повторная авторизация.",
)
self.token_input.setText(self.token[:50] + "...") self.token_input.setText(self.token[:50] + "...")
self.status_label.setText("Статус: авторизован") self.status_label.setText("Статус: авторизован")
@@ -914,7 +1132,7 @@ class VkChatManager(QMainWindow):
selected.append({'id': chat_id, 'title': title}) selected.append({'id': chat_id, 'title': title})
return selected return selected
def _execute_user_action(self, action_type): def _execute_user_action(self, action_type, action_button=None):
if not self.user_ids_to_process: if not self.user_ids_to_process:
QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.") QMessageBox.warning(self, "Ошибка", f"Нет ID пользователей для операции.")
return return
@@ -972,44 +1190,23 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
results = [] action_label = "исключение" if action_type == "remove" else "приглашение"
total = len(selected_chats) * len(user_infos) self._start_bulk_action_worker(
processed = 0 action_type=action_type,
try: selected_chats=selected_chats,
action_label = "исключение" if action_type == "remove" else "приглашение" user_infos=user_infos,
self._set_busy(True, f"Статус: выполняется {action_label} (0/{total})...") action_label=action_label,
for chat in selected_chats: action_button=action_button,
for user_id, user_info in user_infos.items(): success_message_title="Результаты",
try: clear_inputs_on_success=True,
if action_type == "remove": )
self._vk_call_with_retry(self.vk.messages.removeChatUser, chat_id=chat['id'], member_id=user_id) return
results.append(f"'{user_info}' исключен из '{chat['title']}'.")
else:
params = {'chat_id': chat['id'], 'user_id': user_id}
if self.visible_messages_checkbox.isChecked():
params['visible_messages_count'] = 250
self._vk_call_with_retry(self.vk.messages.addChatUser, **params)
results.append(f"'{user_info}' приглашен в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("execute_user_action", e, action_name="выполнения операций с пользователями"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: выполняется {action_label} ({processed}/{total})...")
finally:
self._set_busy(False)
QMessageBox.information(self, "Результаты", "\n".join(results))
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
def remove_user(self): def remove_user(self):
self._execute_user_action("remove") self._execute_user_action("remove", action_button=self.remove_user_btn)
def add_user_to_chat(self): def add_user_to_chat(self):
self._execute_user_action("add") self._execute_user_action("add", action_button=self.add_user_btn)
def set_user_admin(self): def set_user_admin(self):
"""Назначает пользователя администратором чата.""" """Назначает пользователя администратором чата."""
@@ -1047,46 +1244,16 @@ class VkChatManager(QMainWindow):
if confirm_dialog.clickedButton() != yes_button: if confirm_dialog.clickedButton() != yes_button:
return return
# 4. Выполнение API запросов self._start_bulk_action_worker(
results = [] action_type="admin",
total = len(selected_chats) * len(user_infos) selected_chats=selected_chats,
processed = 0 user_infos=user_infos,
try: action_label="назначение админов",
self._set_busy(True, f"Статус: назначение админов (0/{total})...") action_button=None,
for chat in selected_chats: success_message_title="Результаты назначения",
# VK API требует peer_id. Для чатов это 2000000000 + local_id clear_inputs_on_success=True,
try: )
peer_id = 2000000000 + int(chat['id']) return
except ValueError:
results.append(f"✗ Ошибка ID чата: {chat['id']}")
continue
for user_id, user_info in user_infos.items():
try:
self._vk_call_with_retry(
self.vk.messages.setMemberRole,
peer_id=peer_id,
member_id=user_id,
role="admin"
)
results.append(f"'{user_info}' назначен админом в '{chat['title']}'.")
except VkApiError as e:
if self._handle_vk_api_error("set_user_admin", e, action_name="назначения администраторов"):
return
results.append(f"✗ Ошибка для '{user_info}' в '{chat['title']}': {self._format_vk_error(e)}")
finally:
processed += 1
self.status_label.setText(f"Статус: назначение админов ({processed}/{total})...")
finally:
self._set_busy(False)
# 5. Вывод результата
QMessageBox.information(self, "Результаты назначения", "\n".join(results))
# Очистка полей (по желанию, можно убрать эти две строки, если хотите оставить ввод)
self.vk_url_input.clear()
self.user_ids_to_process.clear()
self.set_ui_state(self.token is not None)
# Refactor overrides: keep logic in service modules and thin UI orchestration here. # Refactor overrides: keep logic in service modules and thin UI orchestration here.
def _process_links_list(self, links_list): def _process_links_list(self, links_list):

View File

@@ -9,6 +9,18 @@ import zipfile
class AutoUpdateService: class AutoUpdateService:
@staticmethod
def _safe_extract_zip(archive, destination_dir):
destination_real = os.path.realpath(destination_dir)
for member in archive.infolist():
member_name = member.filename or ""
if not member_name:
continue
target_path = os.path.realpath(os.path.join(destination_dir, member_name))
if target_path != destination_real and not target_path.startswith(destination_real + os.sep):
raise RuntimeError(f"Unsafe path in update archive: {member_name}")
archive.extractall(destination_dir)
@staticmethod @staticmethod
def download_update_archive(download_url, destination_path): def download_update_archive(download_url, destination_path):
request = urllib.request.Request( request = urllib.request.Request(
@@ -215,6 +227,6 @@ class AutoUpdateService:
cls.verify_update_checksum(zip_path, checksum_url, download_name) cls.verify_update_checksum(zip_path, checksum_url, download_name)
os.makedirs(unpack_dir, exist_ok=True) os.makedirs(unpack_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as archive: with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(unpack_dir) cls._safe_extract_zip(archive, unpack_dir)
source_dir = cls.locate_extracted_root(unpack_dir) source_dir = cls.locate_extracted_root(unpack_dir)
return work_dir, source_dir return work_dir, source_dir

View File

@@ -91,8 +91,8 @@ def save_token(token, token_file, app_data_dir, expires_in=0):
try: try:
stored_token = _encrypt_token(token) stored_token = _encrypt_token(token)
encrypted = True encrypted = True
except Exception: except Exception as exc:
pass raise RuntimeError("Failed to securely store token with DPAPI.") from exc
data = { data = {
"token": stored_token, "token": stored_token,

View File

@@ -5,7 +5,42 @@ import urllib.error
import urllib.request import urllib.request
from urllib.parse import urlparse from urllib.parse import urlparse
from PySide6.QtCore import QObject, Signal try:
from PySide6.QtCore import QObject, Signal
except Exception:
class _FallbackBoundSignal:
def __init__(self):
self._callbacks = []
def connect(self, callback):
if callback is not None:
self._callbacks.append(callback)
def emit(self, *args, **kwargs):
for callback in list(self._callbacks):
callback(*args, **kwargs)
class _FallbackSignalDescriptor:
def __init__(self):
self._storage_name = ""
def __set_name__(self, owner, name):
self._storage_name = f"__fallback_signal_{name}"
def __get__(self, instance, owner):
if instance is None:
return self
signal = instance.__dict__.get(self._storage_name)
if signal is None:
signal = _FallbackBoundSignal()
instance.__dict__[self._storage_name] = signal
return signal
class QObject:
pass
def Signal(*_args, **_kwargs):
return _FallbackSignalDescriptor()
def _version_key(version_text): def _version_key(version_text):
@@ -67,6 +102,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
latest_tag = release_data.get("tag_name") or release_data.get("name") or "" latest_tag = release_data.get("tag_name") or release_data.get("name") or ""
latest_version = latest_tag.lstrip("vV").strip() latest_version = latest_tag.lstrip("vV").strip()
html_url = release_data.get("html_url") or releases_url html_url = release_data.get("html_url") or releases_url
release_notes = (release_data.get("body") or "").strip()
assets = release_data.get("assets") or [] assets = release_data.get("assets") or []
download_url = "" download_url = ""
download_name = "" download_name = ""
@@ -112,6 +148,7 @@ def _extract_release_payload(release_data, repository_url, current_version):
"current_version": current_version, "current_version": current_version,
"latest_tag": latest_tag, "latest_tag": latest_tag,
"release_url": html_url, "release_url": html_url,
"release_notes": release_notes,
"download_url": download_url, "download_url": download_url,
"download_name": download_name, "download_name": download_name,
"installer_url": installer_url, "installer_url": installer_url,