From 56fd3a10c73fd50e3f52317b42b84700310b7e05 Mon Sep 17 00:00:00 2001 From: benya Date: Sat, 18 Apr 2026 17:55:52 +0300 Subject: [PATCH] Add new features --- PKGBUILD | 66 ++ README.md | 42 +- whipper/gui.py | 771 +++++++++++++++++++----- whipper/program/cdparanoia.py | 13 + whipper/test/test_common_program.py | 11 +- whipper/test/test_gui.py | 679 +++++++++++++++++++++ whipper/test/test_program_cdparanoia.py | 57 ++ 7 files changed, 1486 insertions(+), 153 deletions(-) create mode 100644 PKGBUILD create mode 100644 whipper/test/test_gui.py diff --git a/PKGBUILD b/PKGBUILD new file mode 100644 index 0000000..c675e36 --- /dev/null +++ b/PKGBUILD @@ -0,0 +1,66 @@ +pkgname=whipper-git +pkgver=0.10.0.r66.g992923b +pkgrel=1 +pkgdesc='CD-DA ripper prioritising accuracy over speed' +arch=('x86_64') +url='https://github.com/whipper-team/whipper' +license=('GPL-3.0-or-later') +depends=( + 'python' + 'python-discid' + 'python-musicbrainzngs' + 'python-mutagen' + 'python-pycdio' + 'python-pygobject' + 'python-ruamel-yaml' + 'cdrdao' + 'cdparanoia' + 'flac' + 'gtk3' + 'libdiscid' + 'libsndfile' + 'sox' +) +makedepends=( + 'git' + 'gcc' + 'python-build' + 'python-installer' + 'python-setuptools' + 'python-setuptools-scm' + 'python-wheel' +) +checkdepends=( + 'python-pytest' + 'python-twisted' +) +optdepends=( + 'python-pillow: cover art embedding support' + 'python-docutils: man page generation' +) +provides=('whipper') +conflicts=('whipper') +source=('git+https://git.daemonlord.ru/benya/whipper-gui.git') +sha256sums=('SKIP') + +pkgver() { + cd "$srcdir/whipper" + git describe --long --tags --abbrev=7 | sed 's/^v//; s/\([^-]*-g\)/r\1/; s/-/./g' +} + +build() { + cd "$srcdir/whipper" + python -m build --wheel --no-isolation +} + +check() { + cd "$srcdir/whipper" + export HOME="$srcdir/home" + mkdir -p "$HOME" + pytest -q +} + +package() { + cd "$srcdir/whipper" + python -m installer --destdir="$pkgdir" dist/*.whl +} diff --git a/README.md b/README.md index b0cf8c8..7de7cf2 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ In order to track whipper's latest changes it's advised to check its commit hist 3. [Fetching the source code](#fetching-the-source-code) 4. [Finalizing the build](#finalizing-the-build) - [Usage](#usage) +- [GUI frontend](#gui-frontend) - [Getting started](#getting-started) - [Configuration file documentation](#configuration-file-documentation) - [Running uninstalled](#running-uninstalled) @@ -136,6 +137,7 @@ Whipper relies on the following packages in order to run correctly and provide a - [pycdio](https://pypi.python.org/pypi/pycdio/), for drive identification (required for drive offset and caching behavior to be stored in the configuration file). - To avoid bugs it's advised to use the most recent `pycdio` version with the corresponding `libcdio` release or, if stuck on old pycdio versions, **0.20**/**0.21** with `libcdio` ≥ **0.90** ≤ **0.94**. All other combinations won't probably work. - [discid](https://pypi.org/project/discid/), for calculating Musicbrainz disc id. +- [PyGObject](https://pypi.org/project/PyGObject/), for the GTK frontend (`whipper-gui`) - [ruamel.yaml](https://pypi.org/project/ruamel.yaml/), for generating well formed YAML report logfiles - [libsndfile](http://www.mega-nerd.com/libsndfile/), for reading wav files - [flac](https://xiph.org/flac/), for reading flac files @@ -147,6 +149,7 @@ Some dependencies aren't available in the PyPI. They can be probably installed u - [cd-paranoia](https://github.com/rocky/libcdio-paranoia) - [cdrdao](http://cdrdao.sourceforge.net/) +- GTK 3 / PyGObject runtime packages (distribution specific, required by `whipper-gui`) - [libsndfile](http://www.mega-nerd.com/libsndfile/) - [flac](https://xiph.org/flac/) - [sox](http://sox.sourceforge.net/) @@ -176,7 +179,7 @@ cd whipper ### Finalizing the build -Install whipper: `python3 setup.py install` +Install whipper: `python3 -m pip install .` Note that, depending on the chosen installation path, this command may require elevated rights. @@ -184,7 +187,12 @@ To build the man pages, follow the instructions in the relevant [README](https:/ ## Usage -Whipper currently only has a command-line interface called `whipper` which is self-documenting: `whipper -h` gives you the basic instructions. +Whipper provides: + +- a command-line interface called `whipper` +- a GTK frontend called `whipper-gui` + +The CLI is still the most complete interface. `whipper -h` gives you the basic instructions. Whipper implements a tree of commands: for example, the top-level `whipper` command has a number of sub-commands. @@ -200,6 +208,35 @@ is not, because the `-d` argument applies to the `cd` command. A more complete set of usage instructions can be found in the `whipper` [man pages](https://github.com/whipper-team/whipper/blob/develop/man/README.md). +## GUI frontend + +`whipper-gui` is a GTK 3 frontend for the common `whipper cd` workflow. + +Launch it with: + +```bash +whipper-gui +``` + +The current GUI can: + +- detect and select optical drives +- read the current disc TOC +- look up matching MusicBrainz releases +- inspect release and track metadata +- rip discs through whipper's native task pipeline +- keep a live log of the rip process +- cancel an in-progress TOC read or rip +- configure the logger, working directory, and disc/track templates +- remember its GUI settings in `$XDG_CONFIG_HOME/whipper/gui.json` + +The current GUI does not yet expose every CLI feature. In particular, these options are still CLI-only: + +- interactive release prompting (`whipper cd -p`) +- a few lower-level CLI workflows outside `whipper cd rip` such as `drive analyze` and `offset find` + +The GUI uses whipper's internal API both for disc scanning and for the rip pipeline itself. It still lacks dedicated GUI workflows for some of the more specialized CLI tools. + ## Getting started The simplest way to get started making accurate rips is: @@ -286,6 +323,7 @@ source checkout: ```bash python3 -m whipper -h +python3 -m whipper.gui ``` ## Logger plugins diff --git a/whipper/gui.py b/whipper/gui.py index 4153aab..56487b8 100644 --- a/whipper/gui.py +++ b/whipper/gui.py @@ -1,24 +1,65 @@ import os -import re -import subprocess import sys import threading import json +import io +import logging +import contextlib +import importlib.util from pathlib import Path -import gi +_GUI_IMPORT_ERROR = None -gi.require_version("Gtk", "3.0") -from gi.repository import GLib, Gtk +try: + import cdio +except ImportError as exc: + cdio = None + _GUI_IMPORT_ERROR = exc -from whipper.common import common, config, drive, mbngs, task as whipper_task +try: + import gi + + gi.require_version("Gtk", "3.0") + from gi.repository import GLib, Gtk +except (ImportError, ValueError) as exc: + gi = None + GLib = None + Gtk = None + if _GUI_IMPORT_ERROR is None: + _GUI_IMPORT_ERROR = exc + +from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task from whipper.common.program import Program -from whipper.program import cdrdao, utils +from whipper.command import cd as cd_command +from whipper.program import cdrdao, cdparanoia, utils +from whipper.result import result -RIP_START_RE = re.compile(r"ripping track (\d+) of (\d+)(?: \(try (\d+)\))?: (.+)") -RIP_DONE_RE = re.compile(r"CRCs match for track (\d+)") -RIP_SKIP_RE = re.compile(r"giving up on track (\d+) after (\d+) times") +logger = logging.getLogger(__name__) + + +class RipCancelledError(Exception): + pass + + +def gui_runtime_available(): + return _GUI_IMPORT_ERROR is None + + +def gui_runtime_error_message(): + missing = [] + if gi is None: + missing.append("PyGObject / GTK 3") + if cdio is None: + missing.append("pycdio") + if not missing: + missing.append("GUI runtime dependencies") + return "whipper-gui requires %s" % " and ".join(missing) + + +def require_gui_runtime(): + if not gui_runtime_available(): + raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR def _format_duration_ms(duration_ms): @@ -35,24 +76,35 @@ def _release_year(metadata): return metadata.release[:4] if metadata.release else "" -class WhipperGui(Gtk.Application): +def _release_duration_distance(metadata, duration_ms): + metadata_duration = getattr(metadata, "duration", None) + if metadata_duration is None: + return float("inf") + return abs(metadata_duration - duration_ms) + + +class WhipperGui(Gtk.Application if Gtk is not None else object): def __init__(self): + require_gui_runtime() super().__init__(application_id="com.github.whipper_team.WhipperGui") self.window = None + self.main_pane = None - self.process = None self.worker = None self.pulse_id = 0 self.scan_data = None self.scan_runner = None + self.rip_runner = None self.scan_cancel_requested = False + self.rip_cancel_requested = False self.current_release = None self.current_track_number = 0 self.current_track_total = 0 self.device_combo = None self.output_button = None + self.working_directory_entry = None self.country_entry = None self.release_id_entry = None self.refresh_button = None @@ -67,6 +119,9 @@ class WhipperGui(Gtk.Application): self.cover_art_combo = None self.max_retries_spin = None self.offset_spin = None + self.logger_combo = None + self.track_template_entry = None + self.disc_template_entry = None self.release_store = None self.release_view = None @@ -129,17 +184,26 @@ class WhipperGui(Gtk.Application): action=Gtk.FileChooserAction.SELECT_FOLDER, ) self.output_button.set_filename(os.path.expanduser("~")) + self.output_button.connect("file-set", self._on_settings_changed) grid.attach(self.output_button, 1, 1, 3, 1) - grid.attach(Gtk.Label(label="Country", xalign=0), 0, 2, 1, 1) + grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1) + self.working_directory_entry = Gtk.Entry() + self.working_directory_entry.set_placeholder_text("Optional working directory") + self.working_directory_entry.connect("changed", self._on_settings_changed) + grid.attach(self.working_directory_entry, 1, 2, 3, 1) + + grid.attach(Gtk.Label(label="Country", xalign=0), 0, 3, 1, 1) self.country_entry = Gtk.Entry() self.country_entry.set_placeholder_text("Optional MusicBrainz country filter") - grid.attach(self.country_entry, 1, 2, 1, 1) + self.country_entry.connect("changed", self._on_settings_changed) + grid.attach(self.country_entry, 1, 3, 1, 1) - grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 2, 1, 1) + grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 3, 1, 1) self.release_id_entry = Gtk.Entry() self.release_id_entry.set_placeholder_text("Optional release override") - grid.attach(self.release_id_entry, 3, 2, 1, 1) + self.release_id_entry.connect("changed", self._on_settings_changed) + grid.attach(self.release_id_entry, 3, 3, 1, 1) return grid @@ -190,6 +254,7 @@ class WhipperGui(Gtk.Application): pane.set_wide_handle(True) pane.pack1(self._build_left_panel(), resize=True, shrink=False) pane.pack2(self._build_right_panel(), resize=True, shrink=False) + self.main_pane = pane return pane def _build_left_panel(self): @@ -286,13 +351,16 @@ class WhipperGui(Gtk.Application): grid.attach(self.unknown_check, 0, 0, 2, 1) self.cdr_check = Gtk.CheckButton(label="Allow CD-R") + self.cdr_check.connect("toggled", self._on_settings_changed) grid.attach(self.cdr_check, 2, 0, 1, 1) self.keep_going_check = Gtk.CheckButton(label="Keep going on failed tracks") self.keep_going_check.set_active(True) + self.keep_going_check.connect("toggled", self._on_settings_changed) grid.attach(self.keep_going_check, 0, 1, 2, 1) self.overread_check = Gtk.CheckButton(label="Force overread") + self.overread_check.connect("toggled", self._on_settings_changed) grid.attach(self.overread_check, 2, 1, 1, 1) grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1) @@ -302,24 +370,47 @@ class WhipperGui(Gtk.Application): self.cover_art_combo.append("embed", "Embed only") self.cover_art_combo.append("complete", "File + embed") self.cover_art_combo.set_active(0) + self.cover_art_combo.connect("changed", self._on_settings_changed) grid.attach(self.cover_art_combo, 1, 2, 1, 1) grid.attach(Gtk.Label(label="Max Retries", xalign=0), 2, 2, 1, 1) self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1) self.max_retries_spin.set_value(5) + self.max_retries_spin.connect("value-changed", self._on_settings_changed) grid.attach(self.max_retries_spin, 3, 2, 1, 1) grid.attach(Gtk.Label(label="Offset", xalign=0), 0, 3, 1, 1) self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1) self.offset_spin.set_value(0) + self.offset_spin.connect("value-changed", self._on_settings_changed) grid.attach(self.offset_spin, 1, 3, 1, 1) + grid.attach(Gtk.Label(label="Logger", xalign=0), 2, 3, 1, 1) + self.logger_combo = Gtk.ComboBoxText() + for logger_name in sorted(result.getLoggers()): + self.logger_combo.append(logger_name, logger_name) + self.logger_combo.set_active_id("whipper") + self.logger_combo.connect("changed", self._on_settings_changed) + grid.attach(self.logger_combo, 3, 3, 1, 1) + + grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 4, 1, 1) + self.track_template_entry = Gtk.Entry() + self.track_template_entry.set_text(cd_command.DEFAULT_TRACK_TEMPLATE) + self.track_template_entry.connect("changed", self._on_settings_changed) + grid.attach(self.track_template_entry, 1, 4, 3, 1) + + grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 5, 1, 1) + self.disc_template_entry = Gtk.Entry() + self.disc_template_entry.set_text(cd_command.DEFAULT_DISC_TEMPLATE) + self.disc_template_entry.connect("changed", self._on_settings_changed) + grid.attach(self.disc_template_entry, 1, 5, 3, 1) + note = Gtk.Label( - label="If your drive offset is configured in whipper, leave Offset at 0 and use the config value.", + label="The configured drive offset is loaded automatically when whipper knows this drive.", xalign=0, ) note.set_line_wrap(True) - grid.attach(note, 2, 3, 2, 1) + grid.attach(note, 0, 6, 4, 1) return frame @@ -353,6 +444,7 @@ class WhipperGui(Gtk.Application): def _collect_gui_settings(self): return { "output_directory": self.output_button.get_filename(), + "working_directory": self.working_directory_entry.get_text(), "country": self.country_entry.get_text(), "release_id": self.release_id_entry.get_text(), "unknown": self.unknown_check.get_active(), @@ -361,6 +453,13 @@ class WhipperGui(Gtk.Application): "overread": self.overread_check.get_active(), "cover_art": self.cover_art_combo.get_active_id() or "", "max_retries": int(self.max_retries_spin.get_value()), + "offset": int(self.offset_spin.get_value()), + "logger": self.logger_combo.get_active_id() or "whipper", + "track_template": self.track_template_entry.get_text(), + "disc_template": self.disc_template_entry.get_text(), + "window_width": self.window.get_size()[0] if self.window is not None else None, + "window_height": self.window.get_size()[1] if self.window is not None else None, + "pane_position": self.main_pane.get_position() if self.main_pane is not None else None, } def _save_gui_settings(self): @@ -382,6 +481,7 @@ class WhipperGui(Gtk.Application): output_directory = data.get("output_directory") if output_directory and os.path.isdir(output_directory): self.output_button.set_filename(output_directory) + self.working_directory_entry.set_text(data.get("working_directory", "")) self.country_entry.set_text(data.get("country", "")) self.release_id_entry.set_text(data.get("release_id", "")) self.unknown_check.set_active(bool(data.get("unknown", True))) @@ -399,9 +499,37 @@ class WhipperGui(Gtk.Application): if isinstance(retries, int): self.max_retries_spin.set_value(retries) + offset = data.get("offset") + if isinstance(offset, int): + self.offset_spin.set_value(offset) + + logger_name = data.get("logger", "whipper") + if logger_name in result.getLoggers(): + self.logger_combo.set_active_id(logger_name) + + self.track_template_entry.set_text( + data.get("track_template", cd_command.DEFAULT_TRACK_TEMPLATE) + ) + self.disc_template_entry.set_text( + data.get("disc_template", cd_command.DEFAULT_DISC_TEMPLATE) + ) + + width = data.get("window_width") + height = data.get("window_height") + if isinstance(width, int) and isinstance(height, int): + self.window.resize(width, height) + + pane_position = data.get("pane_position") + if isinstance(pane_position, int): + GLib.idle_add(self.main_pane.set_position, pane_position) + def _set_label(self, key, value): self.info_labels[key].set_text(value or "—") + def _can_rip(self): + has_release = self.current_release is not None or bool(self.release_id_entry.get_text().strip()) + return has_release or self.unknown_check.get_active() + def _reset_progress(self): self.current_track_number = 0 self.current_track_total = 0 @@ -414,18 +542,18 @@ class WhipperGui(Gtk.Application): def _set_running_state(self, running, status): self.read_button.set_sensitive(not running) self.refresh_button.set_sensitive(not running) - self.rip_button.set_sensitive((not running) and (self.current_release is not None or self.unknown_check.get_active())) + self.rip_button.set_sensitive((not running) and self._can_rip()) self.stop_button.set_sensitive(running) self.status_label.set_text(status) if running: if self.pulse_id == 0: - self.pulse_id = GLib.timeout_add(150, self._pulse_track_bar) + self.pulse_id = GLib.timeout_add(150, self._pulse_progress) elif self.pulse_id: GLib.source_remove(self.pulse_id) self.pulse_id = 0 - def _pulse_track_bar(self): - if self.process is None: + def _pulse_progress(self): + if self.scan_runner is None and self.rip_runner is None: return False self.track_bar.pulse() return True @@ -457,6 +585,7 @@ class WhipperGui(Gtk.Application): def _update_release_store(self, releases): self.release_store.clear() + self.current_release = None for metadata in releases: self.release_store.append([ metadata.artist or "", @@ -468,6 +597,9 @@ class WhipperGui(Gtk.Application): ]) if releases: self.release_view.get_selection().select_path(0) + else: + self._update_track_store(None) + self._update_release_details(None) def _update_track_store(self, metadata): self.track_store.clear() @@ -521,13 +653,6 @@ class WhipperGui(Gtk.Application): if offset is not None: self.offset_spin.set_value(int(offset)) - def _selected_release(self): - selection = self.release_view.get_selection() - model, tree_iter = selection.get_selected() - if tree_iter is None: - return None - return model.get_value(tree_iter, 5) - def _read_disc_worker(self, device, country): try: self.scan_cancel_requested = False @@ -537,10 +662,8 @@ class WhipperGui(Gtk.Application): GLib.idle_add(self._set_running_state, True, "Reading disc") GLib.idle_add(self.progress_label.set_text, "Scanning disc and looking up MusicBrainz") - conf = config.Config() runner = CancellableSyncRunner() self.scan_runner = runner - prog = Program(conf, record=False) utils.load_device(device) utils.unmount_device(device) @@ -563,7 +686,7 @@ class WhipperGui(Gtk.Application): releases = [] if self.scan_cancel_requested: raise RuntimeError("Disc scan cancelled") - releases = sorted(releases, key=lambda md: abs(md.duration - ittoc.duration())) + releases = sorted(releases, key=lambda md: _release_duration_distance(md, ittoc.duration())) def apply_results(): self.scan_data = { @@ -585,7 +708,6 @@ class WhipperGui(Gtk.Application): self._append_log("Disc duration: %s, %s audio tracks\n" % (duration, track_count)) self._append_log("\nFound %d matching release(s)\n" % len(releases)) self._update_release_store(releases) - self._update_release_details(None) self._set_running_state(False, "Disc ready") self.progress_label.set_text("Disc metadata loaded") if not releases: @@ -617,131 +739,399 @@ class WhipperGui(Gtk.Application): finally: self.scan_runner = None - def _build_cd_args(self): - args = [sys.executable, "-m", "whipper", "cd"] - device = self._selected_device() - if device: - args.extend(["-d", device]) - country = self.country_entry.get_text().strip() - if country: - args.extend(["-c", country]) - release_override = self.release_id_entry.get_text().strip() - if release_override: - args.extend(["-R", release_override]) - elif self.current_release is not None: - args.extend(["-R", self.current_release.mbid]) - return args - - def _run_rip_command(self): - args = self._build_cd_args() - args.append("rip") - - output_dir = self.output_button.get_filename() - if output_dir: - args.extend(["-O", output_dir]) - - cover_art = self.cover_art_combo.get_active_id() - if cover_art: - args.extend(["-C", cover_art]) - + def _collect_rip_settings(self): retries = int(self.max_retries_spin.get_value()) - args.extend(["-r", str(retries)]) + return { + "device": self._selected_device(), + "output_directory": self.output_button.get_filename() or os.curdir, + "working_directory": self.working_directory_entry.get_text().strip() or None, + "country": self.country_entry.get_text().strip() or None, + "release_id": self.release_id_entry.get_text().strip() or None, + "unknown": self.unknown_check.get_active(), + "cdr": self.cdr_check.get_active(), + "keep_going": self.keep_going_check.get_active(), + "overread": self.overread_check.get_active(), + "cover_art": self.cover_art_combo.get_active_id() or None, + "max_retries": float("inf") if retries == 0 else retries, + "offset": int(self.offset_spin.get_value()), + "logger": self.logger_combo.get_active_id() or "whipper", + "track_template": self.track_template_entry.get_text() or cd_command.DEFAULT_TRACK_TEMPLATE, + "disc_template": self.disc_template_entry.get_text() or cd_command.DEFAULT_DISC_TEMPLATE, + } - offset = int(self.offset_spin.get_value()) - if offset: - args.extend(["-o", str(offset)]) + def _validate_rip_settings(self, settings): + if not settings["device"]: + raise ValueError("No optical drive selected") + if not settings["output_directory"]: + raise ValueError("Output directory is required") + if not os.path.isdir(os.path.expanduser(settings["output_directory"])): + raise ValueError("Output directory does not exist") + cd_command.validate_template(settings["track_template"], "track") + cd_command.validate_template(settings["disc_template"], "disc") + if settings["working_directory"] and not os.path.isdir(os.path.expanduser(settings["working_directory"])): + raise ValueError("Working directory does not exist") + if settings["logger"] not in result.getLoggers(): + raise ValueError("Unknown logger '%s'" % settings["logger"]) - if self.unknown_check.get_active(): - args.append("-U") - if self.cdr_check.get_active(): - args.append("--cdr") - if self.keep_going_check.get_active(): - args.append("-k") - if self.overread_check.get_active(): - args.append("-x") + def _install_gui_log_handler(self): + handler = GuiLogHandler(self) + whipper_logger = logging.getLogger("whipper") + previous_level = whipper_logger.level + if previous_level == logging.NOTSET or previous_level > logging.INFO: + whipper_logger.setLevel(logging.INFO) + whipper_logger.addHandler(handler) + return whipper_logger, handler, previous_level - self._clear_log() - self._reset_progress() - self._append_log("$ %s\n\n" % " ".join(args)) - self._set_running_state(True, "Ripping disc") - self.progress_label.set_text("Preparing rip") + @staticmethod + def _remove_gui_log_handler(whipper_logger, handler, previous_level): + whipper_logger.removeHandler(handler) + whipper_logger.setLevel(previous_level) - def worker(): - try: - self.process = subprocess.Popen( - args, - cwd=os.getcwd(), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1, - ) - assert self.process.stdout is not None - for line in self.process.stdout: - GLib.idle_add(self._handle_rip_output_line, line) - returncode = self.process.wait() - GLib.idle_add(self._append_log, "\nProcess finished with exit code %d\n" % returncode) - GLib.idle_add(self._finish_rip, returncode) - except Exception as exc: - GLib.idle_add(self._append_log, "\n%s\n" % exc) - GLib.idle_add(self._finish_rip, 1) - finally: - self.process = None - - self.worker = threading.Thread(target=worker, daemon=True) - self.worker.start() - - def _handle_rip_output_line(self, line): - self._append_log(line) - - match = RIP_START_RE.search(line) - if match: - self.current_track_number = int(match.group(1)) - self.current_track_total = int(match.group(2)) - track_name = match.group(4) - overall_fraction = (self.current_track_number - 1) / max(self.current_track_total, 1) - self.overall_bar.set_fraction(overall_fraction) - self.overall_bar.set_text( - "Track %d/%d" % (self.current_track_number, self.current_track_total) + def _resolve_release_metadata(self, mbdiscid, settings): + if settings["release_id"]: + return mbngs.getReleaseMetadata( + settings["release_id"], + discid=mbdiscid, + country=settings["country"], + record=False, ) - self.track_bar.set_fraction(0.0) - self.track_bar.set_text(track_name) - self.progress_label.set_text("Ripping %s" % track_name) - return False - - match = RIP_DONE_RE.search(line) - if match: - finished = int(match.group(1)) - if self.current_track_total: - self.overall_bar.set_fraction(finished / self.current_track_total) - self.overall_bar.set_text("Track %d/%d" % (finished, self.current_track_total)) - self.track_bar.set_fraction(1.0) - self.track_bar.set_text("Track %d done" % finished) - self.progress_label.set_text("Track %d verified" % finished) - return False - - match = RIP_SKIP_RE.search(line) - if match: - skipped = int(match.group(1)) - self.track_bar.set_fraction(1.0) - self.track_bar.set_text("Track %d skipped" % skipped) - self.progress_label.set_text("Track %d failed" % skipped) - return False + if self.current_release is not None: + return self.current_release + if self.scan_data and self.scan_data.get("mbid") == mbdiscid: + releases = self.scan_data.get("releases") or [] + if releases: + return releases[0] + return None + def _update_rip_task_progress(self, description, item_label, item_index, item_total, value): + value = min(max(value, 0.0), 1.0) + self.current_track_number = item_index + self.current_track_total = item_total + self.overall_bar.set_fraction(((item_index - 1) + value) / max(item_total, 1)) + self.overall_bar.set_text("Track %d/%d" % (item_index, item_total)) + self.track_bar.set_fraction(value) + self.track_bar.set_text(description) + self.progress_label.set_text("%s: %s" % (item_label, description)) return False - def _finish_rip(self, returncode): - if returncode == 0: + def _mark_track_finished(self, item_label, item_index, item_total, skipped=False): + self.current_track_number = item_index + self.current_track_total = item_total + self.overall_bar.set_fraction(item_index / max(item_total, 1)) + self.overall_bar.set_text("Track %d/%d" % (item_index, item_total)) + self.track_bar.set_fraction(1.0) + self.track_bar.set_text("%s %s" % (item_label, "skipped" if skipped else "done")) + self.progress_label.set_text("%s %s" % (item_label, "failed" if skipped else "verified")) + return False + + def _finish_rip(self, returncode, status="Done"): + if returncode in (0, 5): self.overall_bar.set_fraction(1.0) self.overall_bar.set_text("Complete") self.track_bar.set_fraction(1.0) self.progress_label.set_text("Rip complete") - self._set_running_state(False, "Done") + self._set_running_state(False, status) + elif self.rip_cancel_requested: + self.progress_label.set_text("Rip cancelled") + self._set_running_state(False, "Cancelled") else: self.progress_label.set_text("Rip failed") self._set_running_state(False, "Failed") return False + def _rip_track(self, runner, program, itable, settings, track_number, item_index, item_total, + cover_art_path, skipped_tracks, mbdiscid): + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") + + track_result = program.result.getTrackResult(track_number) + if not track_result: + track_result = result.TrackResult() + program.result.tracks.append(track_result) + + path = program.getPath( + program.outdir, + settings["track_template"], + mbdiscid, + program.metadata, + track_number=track_number, + ) + ".flac" + track_result.number = track_number + track_result.filename = path + + if track_number > 0: + track_result.pregap = itable.tracks[track_number - 1].getPregap() + track_result.pre_emphasis = itable.tracks[track_number - 1].pre_emphasis + + item_label = "HTOA" if track_number == 0 else "Track %d" % track_number + + if os.path.exists(path): + GLib.idle_add(self._append_log, "%s already exists, verifying\n" % item_label) + if not program.verifyTrack(runner, track_result): + GLib.idle_add(self._append_log, "%s verification failed, reripping\n" % item_label) + os.unlink(path) + + if not os.path.exists(path): + track_result.testduration = 0.0 + track_result.copyduration = 0.0 + tries = 1 + while tries <= settings["max_retries"]: + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") + extra = "" if tries == 1 else " (try %d)" % tries + GLib.idle_add(self._append_log, "%s%s: %s\n" % (item_label, extra, os.path.basename(path))) + + tag_list = program.getTagList(track_number, mbdiscid) + if track_number > 0 and itable.tracks[track_number - 1].isrc is not None: + tag_list["ISRC"] = itable.tracks[track_number - 1].isrc + + rip_task = cdparanoia.ReadVerifyTrackTask( + track_result.filename, + program.result.table, + program.getHTOA()[0] if track_number == 0 else program.result.table.getTrackStart(track_number), + program.getHTOA()[1] if track_number == 0 else program.result.table.getTrackEnd(track_number), + settings["overread"], + offset=settings["offset"], + device=settings["device"], + taglist=tag_list, + what="%s%s" % (item_label.lower(), extra), + coverArtPath=cover_art_path, + ) + listener = GuiTaskListener(self, item_label, item_index, item_total) + rip_task.addListener(listener) + for child_task in rip_task.tasks: + child_task.addListener(listener) + + try: + runner.run(rip_task, verbose=False) + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") + track_result.testcrc = rip_task.testchecksum + track_result.copycrc = rip_task.copychecksum + track_result.peak = rip_task.peak + track_result.quality = rip_task.quality + track_result.testspeed = rip_task.testspeed + track_result.copyspeed = rip_task.copyspeed + track_result.testduration += rip_task.testduration + track_result.copyduration += rip_task.copyduration + if track_result.filename != rip_task.path: + track_result.filename = rip_task.path + break + except Exception as exc: + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") from exc + logger.debug("track %s try %d failed: %r", item_label, tries, exc) + tries += 1 + + if tries > settings["max_retries"]: + tries -= 1 + GLib.idle_add(self._append_log, "%s giving up after %d tries\n" % (item_label, tries)) + if settings["keep_going"]: + track_result.skipped = True + skipped_tracks.append(track_result) + GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True) + return + raise RuntimeError("%s can't be ripped" % item_label) + + if track_result in skipped_tracks: + GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True) + return + + if track_result.testcrc != track_result.copycrc: + raise RuntimeError("CRCs did not match for %s" % item_label) + + if track_number == 0: + if track_result.peak == cd_command.SILENT: + program.result.table.setFile( + 1, 0, None, program.result.table.getTrackStart(1), track_number + ) + if os.path.exists(track_result.filename): + os.unlink(track_result.filename) + track_result.filename = None + GLib.idle_add(self._append_log, "HTOA discarded, contains digital silence\n") + else: + program.result.table.setFile( + 1, 0, track_result.filename, program.result.table.getTrackStart(1), track_number + ) + else: + program.result.table.setFile( + track_number, + 1, + track_result.filename, + program.result.table.getTrackLength(track_number), + track_number, + ) + + GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, False) + + def _rip_disc_worker(self, settings): + whipper_logger, log_handler, previous_level = self._install_gui_log_handler() + original_cwd = os.getcwd() + try: + self.rip_cancel_requested = False + GLib.idle_add(self._clear_log) + GLib.idle_add(self._reset_progress) + GLib.idle_add(self._set_running_state, True, "Ripping disc") + GLib.idle_add(self.progress_label.set_text, "Preparing rip") + GLib.idle_add(self._append_log, "Starting native whipper rip\n\n") + + runner = CancellableSyncRunner() + self.rip_runner = runner + conf = config.Config() + program = Program(conf, record=False) + + if settings["working_directory"]: + os.chdir(os.path.expanduser(settings["working_directory"])) + + utils.load_device(settings["device"]) + utils.unmount_device(settings["device"]) + if drive.get_cdrom_drive_status(settings["device"]) == 1: + raise OSError("No CD detected, please insert one and retry") + + ittoc = program.getFastToc(runner, settings["device"]) + program.getRipResult() + cddb = ittoc.getCDDBDiscId() + mbdiscid = ittoc.getMusicBrainzDiscId() + GLib.idle_add(self._append_log, "CDDB disc id: %s\n" % cddb) + GLib.idle_add(self._append_log, "MusicBrainz disc id: %s\n" % mbdiscid) + + program.metadata = self._resolve_release_metadata(mbdiscid, settings) + if program.metadata is None and not settings["unknown"]: + raise RuntimeError("Unable to resolve disc metadata. Select a release or enable ripping without metadata.") + if program.metadata is not None: + program.metadata.discid = mbdiscid + GLib.idle_add( + self._append_log, + "Using release: %s - %s\n" % ( + program.metadata.artist or "Unknown Artist", + program.metadata.releaseTitle or program.metadata.title or "Unknown Title", + ), + ) + + program.result.isCdr = cdrdao.DetectCdr(settings["device"]) + if program.result.isCdr and not settings["cdr"]: + raise RuntimeError("Inserted disc appears to be a CD-R. Enable 'Allow CD-R' to continue.") + + out_fpath = program.getPath( + settings["output_directory"], + settings["disc_template"], + mbdiscid, + program.metadata, + ) + itable = program.getTable( + runner, + ittoc.getCDDBDiscId(), + ittoc.getMusicBrainzDiscId(), + settings["device"], + settings["offset"], + out_fpath, + ) + + program.result.cdrdaoVersion = cdrdao.version() + program.result.cdparanoiaVersion = cdparanoia.getCdParanoiaVersion() + info = drive.getDeviceInfo(settings["device"]) + if info: + try: + program.result.cdparanoiaDefeatsCache = conf.getDefeatsCache(*info) + except KeyError: + pass + program.result.artist = program.metadata.artist if program.metadata else "Unknown Artist" + program.result.title = program.metadata.releaseTitle if program.metadata else "Unknown Title" + _, program.result.vendor, program.result.model, program.result.release = \ + cdio.Device(settings["device"]).get_hwinfo() + program.result.metadata = program.metadata + program.result.offset = settings["offset"] + program.result.overread = settings["overread"] + program.result.logger = settings["logger"] + program.outdir = settings["output_directory"] + + disc_name = program.getPath(program.outdir, settings["disc_template"], mbdiscid, program.metadata) + dirname = os.path.dirname(disc_name) + if os.path.exists(dirname): + log_file = disc_name + ".log" + if os.path.exists(log_file): + raise RuntimeError("output directory %s is a finished rip" % dirname) + else: + os.makedirs(dirname) + + cover_art_path = None + if settings["cover_art"] in {"embed", "complete"} and importlib.util.find_spec("PIL") is None: + GLib.idle_add(self._append_log, "Cover art embedding requires Pillow; continuing without embedded art\n") + elif settings["cover_art"] in {"file", "embed", "complete"}: + if getattr(program.metadata, "mbid", None): + cover_art_path = program.getCoverArt(dirname, program.metadata.mbid) + else: + GLib.idle_add(self._append_log, "Cover art requested but disc metadata is unavailable\n") + if settings["cover_art"] == "file": + embed_cover_art_path = None + else: + embed_cover_art_path = cover_art_path + + skipped_tracks = [] + rip_numbers = [] + if program.getHTOA(): + rip_numbers.append(0) + for index, track in enumerate(itable.tracks, start=1): + if track.audio: + rip_numbers.append(index) + else: + GLib.idle_add(self._append_log, "Skipping data track %d, not implemented\n" % index) + track.indexes[1].relative = 0 + + total_items = len(rip_numbers) + for item_index, track_number in enumerate(rip_numbers, start=1): + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") + self._rip_track( + runner, + program, + itable, + settings, + track_number, + item_index, + total_items, + embed_cover_art_path, + skipped_tracks, + mbdiscid, + ) + + if settings["cover_art"] == "embed" and cover_art_path is not None: + os.remove(cover_art_path) + + if self.rip_cancel_requested: + raise RipCancelledError("Rip cancelled") + + program.skipped_tracks = skipped_tracks or None + program.writeCue(disc_name) + program.write_m3u(disc_name) + + try: + with contextlib.redirect_stdout(io.StringIO()) as stdout_buffer: + program.verifyImage(runner, itable) + accurip.print_report(program.result) + report_output = stdout_buffer.getvalue() + if report_output: + GLib.idle_add(self._append_log, "\n" + report_output + "\n") + except accurip.EntryNotFound: + GLib.idle_add(self._append_log, "AccurateRip entry not found\n") + + txt_logger = result.getLoggers()[settings["logger"]]() + program.writeLog(disc_name, txt_logger) + if skipped_tracks: + GLib.idle_add(self._append_log, "%d track(s) were skipped during this rip\n" % len(skipped_tracks)) + GLib.idle_add(self._finish_rip, 5, "Done with skipped tracks") + else: + GLib.idle_add(self._append_log, "Rip finished successfully\n") + GLib.idle_add(self._finish_rip, 0, "Done") + except Exception as exc: + if not isinstance(exc, RipCancelledError): + GLib.idle_add(self._append_log, "%s\n" % exc) + GLib.idle_add(self._finish_rip, 1, "Cancelled" if self.rip_cancel_requested else "Failed") + finally: + self.rip_runner = None + self._remove_gui_log_handler(whipper_logger, log_handler, previous_level) + os.chdir(original_cwd) + def _on_release_selected(self, selection): model, tree_iter = selection.get_selected() if tree_iter is None: @@ -752,7 +1142,7 @@ class WhipperGui(Gtk.Application): self.current_release = model.get_value(tree_iter, 5) self._update_track_store(self.current_release) self._update_release_details(self.current_release) - self.rip_button.set_sensitive(self.current_release is not None or self.unknown_check.get_active()) + self.rip_button.set_sensitive(self._can_rip()) def _on_refresh_clicked(self, _button): self._refresh_devices() @@ -760,12 +1150,22 @@ class WhipperGui(Gtk.Application): def _on_unknown_toggled(self, _button): self._save_gui_settings() self.rip_button.set_sensitive( - (self.process is None) and - (self.current_release is not None or self.unknown_check.get_active()) + (self.scan_runner is None) and + (self.rip_runner is None) and + self._can_rip() + ) + + def _on_settings_changed(self, *_args): + self._save_gui_settings() + self.rip_button.set_sensitive( + (self.scan_runner is None) and + (self.rip_runner is None) and + self._can_rip() ) def _on_device_changed(self, _combo): self._apply_configured_offset() + self._save_gui_settings() def _on_read_clicked(self, _button): device = self._selected_device() @@ -780,20 +1180,88 @@ class WhipperGui(Gtk.Application): self.worker.start() def _on_rip_clicked(self, _button): - if self.current_release is None and not self.unknown_check.get_active(): + if not self._can_rip(): self._append_log("Select a release or enable ripping without metadata.\n") self.status_label.set_text("Missing release") return - self._run_rip_command() + settings = self._collect_rip_settings() + try: + self._validate_rip_settings(settings) + except Exception as exc: + self._append_log("%s\n" % exc) + self.status_label.set_text("Invalid settings") + return + self.worker = threading.Thread( + target=self._rip_disc_worker, + args=(settings,), + daemon=True, + ) + self.worker.start() def _on_stop_clicked(self, _button): - if self.process is not None: - self.process.terminate() - self._append_log("\nStopping process...\n") - elif self.scan_runner is not None: + if self.scan_runner is not None: self.scan_cancel_requested = True self._append_log("\nStopping scan...\n") self.scan_runner.cancel() + elif self.rip_runner is not None: + self.rip_cancel_requested = True + self._append_log("\nStopping rip...\n") + self.rip_runner.cancel() + + +class GuiLogHandler(logging.Handler): + def __init__(self, gui): + super().__init__(level=logging.INFO) + self.gui = gui + self.setFormatter(logging.Formatter("%(message)s")) + + def emit(self, record): + try: + message = self.format(record) + except Exception: + message = record.getMessage() + GLib.idle_add(self.gui._append_log, message + "\n") + + +class GuiTaskListener: + def __init__(self, gui, item_label, item_index, item_total): + self.gui = gui + self.item_label = item_label + self.item_index = item_index + self.item_total = item_total + + def started(self, task): + GLib.idle_add( + self.gui._update_rip_task_progress, + task.description, + self.item_label, + self.item_index, + self.item_total, + task.progress, + ) + + def progressed(self, task, value): + GLib.idle_add( + self.gui._update_rip_task_progress, + task.description, + self.item_label, + self.item_index, + self.item_total, + value, + ) + + def described(self, task, description): + GLib.idle_add( + self.gui._update_rip_task_progress, + description, + self.item_label, + self.item_index, + self.item_total, + task.progress, + ) + + def stopped(self, task): + return None class CancellableSyncRunner(whipper_task.SyncRunner): @@ -818,5 +1286,10 @@ class CancellableSyncRunner(whipper_task.SyncRunner): def main(): + try: + require_gui_runtime() + except RuntimeError as exc: + print(str(exc), file=sys.stderr) + return 1 app = WhipperGui() return app.run(sys.argv) diff --git a/whipper/program/cdparanoia.py b/whipper/program/cdparanoia.py index 5c4b779..b8a135f 100644 --- a/whipper/program/cdparanoia.py +++ b/whipper/program/cdparanoia.py @@ -404,6 +404,10 @@ class ReadTrackTask(task.Task): self.stop() return + def abort(self): + if getattr(self, "_popen", None) is not None and self._popen.poll() is None: + self._popen.terminate() + class ReadVerifyTrackTask(task.MultiSeparateTask): """ @@ -566,6 +570,15 @@ class ReadVerifyTrackTask(task.MultiSeparateTask): task.MultiSeparateTask.stop(self) + def abort(self): + if 0 < self._task <= len(self.tasks): + current_task = self.tasks[self._task - 1] + abort = getattr(current_task, "abort", None) + if callable(abort): + abort() + else: + current_task.stop() + _VERSION_RE = re.compile( "^cdparanoia (?P.+) release (?P.+)") diff --git a/whipper/test/test_common_program.py b/whipper/test/test_common_program.py index 1bc07a2..9d4116a 100644 --- a/whipper/test/test_common_program.py +++ b/whipper/test/test_common_program.py @@ -45,6 +45,12 @@ class PathTestCase(unittest.TestCase): # TODO: Test cover art embedding too. class CoverArtTestCase(unittest.TestCase): + def setUp(self): + self.cover_art_path = None + + def tearDown(self): + if self.cover_art_path and os.path.exists(self.cover_art_path): + os.unlink(self.cover_art_path) @staticmethod def _mock_get_front_image(release_id): @@ -90,5 +96,6 @@ class CoverArtTestCase(unittest.TestCase): # https://musicbrainz.org/release/76df3287-6cda-33eb-8e9a-044b5e15ffdd path = os.path.dirname(__file__) release_id = "76df3287-6cda-33eb-8e9a-044b5e15ffdd" - coverArtPath = self._mock_getCoverArt(path, release_id) - self.assertTrue(os.path.isfile(coverArtPath)) + self.cover_art_path = self._mock_getCoverArt(path, release_id) + self.assertEqual(self.cover_art_path, os.path.join(path, 'cover.jpg')) + self.assertTrue(os.path.isfile(self.cover_art_path)) diff --git a/whipper/test/test_gui.py b/whipper/test/test_gui.py new file mode 100644 index 0000000..4c140d9 --- /dev/null +++ b/whipper/test/test_gui.py @@ -0,0 +1,679 @@ +import os +import threading +import time +from types import MethodType, SimpleNamespace + +import pytest + +from whipper import gui + + +def _bind_methods(app, *names): + for name in names: + setattr(app, name, MethodType(getattr(gui.WhipperGui, name), app)) + return app + + +def _drain_glib_until(predicate, timeout=2.0): + context = gui.GLib.MainContext.default() + deadline = time.time() + timeout + while time.time() < deadline: + while context.pending(): + context.iteration(False) + if predicate(): + return + time.sleep(0.01) + while context.pending(): + context.iteration(False) + assert predicate() + + +class FakeEntry: + def __init__(self, text=""): + self.text = text + + def get_text(self): + return self.text + + def set_text(self, value): + self.text = value + + +class FakeToggle: + def __init__(self, active=False): + self.active = active + + def get_active(self): + return self.active + + def set_active(self, value): + self.active = bool(value) + + +class FakeCombo: + def __init__(self, active_id=None): + self.active_id = active_id + + def get_active_id(self): + return self.active_id + + def set_active_id(self, value): + self.active_id = value + + def set_active(self, index): + self.active_id = "" if index == 0 else self.active_id + + +class FakeSpin: + def __init__(self, value=0): + self.value = value + + def get_value(self): + return self.value + + def set_value(self, value): + self.value = value + + +class FakeButton: + def __init__(self): + self.sensitive = None + + def set_sensitive(self, value): + self.sensitive = bool(value) + + +class FakeLabel: + def __init__(self, text=""): + self.text = text + + def set_text(self, value): + self.text = value + + def get_text(self): + return self.text + + +class FakeProgressBar: + def __init__(self): + self.fraction = 0.0 + self.text = "" + self.pulses = 0 + + def set_fraction(self, value): + self.fraction = value + + def set_text(self, value): + self.text = value + + def pulse(self): + self.pulses += 1 + + +class FakeFileChooser: + def __init__(self, filename=None): + self.filename = filename + + def get_filename(self): + return self.filename + + def set_filename(self, filename): + self.filename = filename + + +class FakeWindow: + def __init__(self, width=640, height=480): + self.size = (width, height) + + def get_size(self): + return self.size + + def resize(self, width, height): + self.size = (width, height) + + +class FakePane: + def __init__(self, position=0): + self.position = position + + def get_position(self): + return self.position + + def set_position(self, value): + self.position = value + + +class FakeListStore(list): + def clear(self): + del self[:] + + def append(self, row): + super().append(row) + + def get_value(self, tree_iter, index): + return self[tree_iter][index] + + +class FakeSelection: + def __init__(self, model): + self.model = model + self.selected = None + + def get_selected(self): + return self.model, self.selected + + def select_path(self, path): + self.selected = path + + +class FakeReleaseView: + def __init__(self, store): + self.selection = FakeSelection(store) + + def get_selection(self): + return self.selection + + +class FakeRunner: + def __init__(self): + self.cancelled = False + + def run(self, task, verbose=False): + self.task = task + + def cancel(self): + self.cancelled = True + + +class FakeTrack: + def __init__(self, audio=True): + self.audio = audio + self.isrc = None + self.pre_emphasis = False + self.indexes = { + 1: SimpleNamespace(relative=1), + } + + def getPregap(self): + return 0 + + +class FakeITable: + def __init__(self): + self.tracks = [FakeTrack(audio=True)] + + +class FakeResultTable: + def getTrackStart(self, _track_number): + return 0 + + def getTrackEnd(self, _track_number): + return 9 + + def getTrackLength(self, _track_number): + return 10 + + def setFile(self, *_args): + return None + + +class FakeRipResult: + def __init__(self): + self.tracks = [] + self.table = FakeResultTable() + self.isCdr = False + + def getTrackResult(self, track_number): + for track in self.tracks: + if track.number == track_number: + return track + return None + + +def _make_metadata(title, duration, mbid): + return SimpleNamespace( + artist="Artist", + releaseTitle=title, + title=title, + release="2024-01-01", + releaseType="Album", + countries=["US"], + duration=duration, + tracks=[], + discNumber=1, + discTotal=1, + mbid=mbid, + url="https://example.invalid/%s" % mbid, + barcode=None, + catalogNumbers=[], + ) + + +def _make_ui_app(tmp_path): + app = SimpleNamespace() + app.window = FakeWindow(111, 222) + app.main_pane = FakePane(77) + app.output_button = FakeFileChooser(str(tmp_path)) + app.working_directory_entry = FakeEntry("") + app.country_entry = FakeEntry("") + app.release_id_entry = FakeEntry("") + app.unknown_check = FakeToggle(True) + app.cdr_check = FakeToggle(False) + app.keep_going_check = FakeToggle(True) + app.overread_check = FakeToggle(False) + app.cover_art_combo = FakeCombo("") + app.max_retries_spin = FakeSpin(5) + app.offset_spin = FakeSpin(0) + app.logger_combo = FakeCombo("whipper") + app.track_template_entry = FakeEntry("%t") + app.disc_template_entry = FakeEntry("%d") + app.read_button = FakeButton() + app.refresh_button = FakeButton() + app.rip_button = FakeButton() + app.stop_button = FakeButton() + app.status_label = FakeLabel() + app.progress_label = FakeLabel() + app.overall_bar = FakeProgressBar() + app.track_bar = FakeProgressBar() + app.release_store = FakeListStore() + app.track_store = FakeListStore() + app.release_view = FakeReleaseView(app.release_store) + app.release_details = FakeLabel() + app.info_labels = {key: FakeLabel() for key in [ + "device", "disc_status", "cddb", "mbid", "duration", "tracks" + ]} + app.scan_runner = None + app.rip_runner = None + app.scan_cancel_requested = False + app.rip_cancel_requested = False + app.current_release = None + app.current_track_number = 0 + app.current_track_total = 0 + app.pulse_id = 0 + app.logs = [] + app._append_log = lambda text: app.logs.append(text) + app._clear_log = lambda: app.logs.clear() + app._config_path = lambda: tmp_path / "gui.json" + _bind_methods( + app, + "_collect_gui_settings", + "_save_gui_settings", + "_load_gui_settings", + "_can_rip", + "_set_label", + "_set_running_state", + "_update_release_store", + "_update_track_store", + "_update_release_details", + "_update_rip_task_progress", + "_finish_rip", + "_reset_progress", + "_pulse_progress", + "_resolve_release_metadata", + "_on_release_selected", + "_on_stop_clicked", + ) + app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler + app._install_gui_log_handler = MethodType(gui.WhipperGui._install_gui_log_handler, app) + return app + + +def test_gui_settings_roundtrip(tmp_path, monkeypatch): + app = _make_ui_app(tmp_path) + app.output_button.set_filename(str(tmp_path / "output")) + os.mkdir(app.output_button.get_filename()) + app.working_directory_entry.set_text(str(tmp_path)) + app.country_entry.set_text("JP") + app.release_id_entry.set_text("release-id") + app.cdr_check.set_active(True) + app.cover_art_combo.set_active_id("embed") + app.max_retries_spin.set_value(7) + app.offset_spin.set_value(123) + app.track_template_entry.set_text("%A/%t") + app.disc_template_entry.set_text("%A/%d") + monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args)) + + gui.WhipperGui._save_gui_settings(app) + + app.output_button.set_filename(None) + app.working_directory_entry.set_text("") + app.country_entry.set_text("") + app.release_id_entry.set_text("") + app.unknown_check.set_active(False) + app.cdr_check.set_active(False) + app.keep_going_check.set_active(False) + app.overread_check.set_active(True) + app.cover_art_combo.set_active_id(None) + app.max_retries_spin.set_value(0) + app.offset_spin.set_value(0) + app.logger_combo.set_active_id(None) + app.track_template_entry.set_text("") + app.disc_template_entry.set_text("") + + gui.WhipperGui._load_gui_settings(app) + + assert app.output_button.get_filename() == str(tmp_path / "output") + assert app.working_directory_entry.get_text() == str(tmp_path) + assert app.country_entry.get_text() == "JP" + assert app.release_id_entry.get_text() == "release-id" + assert app.unknown_check.get_active() is True + assert app.cdr_check.get_active() is True + assert app.keep_going_check.get_active() is True + assert app.overread_check.get_active() is False + assert app.cover_art_combo.get_active_id() == "embed" + assert app.max_retries_spin.get_value() == 7 + assert app.offset_spin.get_value() == 123 + assert app.logger_combo.get_active_id() == "whipper" + assert app.track_template_entry.get_text() == "%A/%t" + assert app.disc_template_entry.get_text() == "%A/%d" + assert app.window.get_size() == (111, 222) + assert app.main_pane.get_position() == 77 + + +def test_release_selection_and_progress_updates(monkeypatch, tmp_path): + app = _make_ui_app(tmp_path) + selection = app.release_view.get_selection() + metadata = _make_metadata("Chosen", 1000, "mbid-1") + metadata.tracks = [SimpleNamespace(artist="Track Artist", title="Track Title", duration=210000)] + app.release_store.append(["Artist", "Chosen", "2024", "Album", "US", metadata]) + selection.select_path(0) + monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 99) + monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) + + gui.WhipperGui._on_release_selected(app, selection) + gui.WhipperGui._set_running_state(app, True, "Busy") + assert app.read_button.sensitive is False + assert app.refresh_button.sensitive is False + assert app.stop_button.sensitive is True + gui.WhipperGui._update_rip_task_progress(app, "Encoding", "Track 1", 1, 2, 0.25) + gui.WhipperGui._finish_rip(app, 0, "Done") + + assert app.current_release is metadata + assert app.track_store[0][1] == "Track Artist" + assert app.release_details.get_text().startswith("Artist: Artist") + assert app.overall_bar.fraction == 1.0 + assert app.track_bar.fraction == 1.0 + assert app.progress_label.get_text() == "Rip complete" + + +def test_main_reports_missing_runtime(monkeypatch, capsys): + monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing")) + monkeypatch.setattr(gui, "gi", None) + monkeypatch.setattr(gui, "cdio", None) + + assert gui.main() == 1 + assert "whipper-gui requires" in capsys.readouterr().err + + +def test_read_disc_worker_processes_idle_callbacks(monkeypatch, tmp_path): + app = _make_ui_app(tmp_path) + calls = [] + + class FakeReadTOCTask: + def __init__(self, _device, fast_toc=True): + assert fast_toc is True + self.toc = SimpleNamespace(table=SimpleNamespace( + getCDDBDiscId=lambda: "cddb-id", + getMusicBrainzDiscId=lambda: "mbid-disc", + duration=lambda: 200000, + getAudioTracks=lambda: 10, + )) + + runner = FakeRunner() + monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) + monkeypatch.setattr(gui.cdrdao, "ReadTOCTask", FakeReadTOCTask) + monkeypatch.setattr(gui.utils, "load_device", lambda device: calls.append(("load", device))) + monkeypatch.setattr(gui.utils, "unmount_device", lambda device: calls.append(("unmount", device))) + monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) + monkeypatch.setattr( + gui.mbngs, + "musicbrainz", + lambda *_args, **_kwargs: [ + _make_metadata("Far", 260000, "r2"), + _make_metadata("Near", 205000, "r1"), + ], + ) + monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1) + monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) + + worker = threading.Thread( + target=gui.WhipperGui._read_disc_worker, + args=(app, "/dev/cdrom", "US"), + ) + worker.start() + _drain_glib_until(lambda: not worker.is_alive()) + worker.join() + + assert calls == [("load", "/dev/cdrom"), ("unmount", "/dev/cdrom")] + assert app.scan_data["mbid"] == "mbid-disc" + assert app.scan_data["releases"][0].releaseTitle == "Near" + assert app.info_labels["disc_status"].get_text() == "Ready" + assert "Found 2 matching release(s)" in "".join(app.logs) + + +def test_rip_disc_worker_success(monkeypatch, tmp_path): + app = _make_ui_app(tmp_path) + app.current_release = _make_metadata("Selected", 200000, "release-mbid") + app.release_id_entry.set_text("") + app.unknown_check.set_active(False) + app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]} + program_holder = {} + + class FakeProgram: + def __init__(self, _conf, record=False): + assert record is False + self.metadata = None + self.outdir = None + self.result = FakeRipResult() + self.cover_art_paths = [] + self.write_cue = False + self.write_m3u_called = False + self.write_log_called = False + program_holder["program"] = self + + def getFastToc(self, _runner, _device): + return SimpleNamespace( + getCDDBDiscId=lambda: "cddb-id", + getMusicBrainzDiscId=lambda: "mbid-disc", + ) + + def getRipResult(self): + return self.result + + def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None): + if track_number is None: + return os.path.join(base, "Artist - Selected", "Artist - Selected") + return os.path.join(base, "Artist - Selected", "track-%02d" % track_number) + + def getTable(self, *_args): + return FakeITable() + + def getHTOA(self): + return None + + def getTagList(self, _track_number, _mbdiscid): + return {} + + def getCoverArt(self, dirname, _mbid): + path = os.path.join(dirname, "cover.jpg") + with open(path, "wb") as handle: + handle.write(b"cover") + self.cover_art_paths.append(path) + return path + + def verifyImage(self, _runner, _itable): + return None + + def writeCue(self, _disc_name): + self.write_cue = True + + def write_m3u(self, _disc_name): + self.write_m3u_called = True + + def writeLog(self, _disc_name, _logger): + self.write_log_called = True + + def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total, + _cover_art_path, _skipped_tracks, _mbdiscid): + track_result = SimpleNamespace(number=track_number, filename="track.flac") + program.result.tracks.append(track_result) + + runner = FakeRunner() + monkeypatch.setattr(gui, "Program", FakeProgram) + monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) + monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace( + getDefeatsCache=lambda *_args: True, + )) + monkeypatch.setattr(gui.utils, "load_device", lambda _device: None) + monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None) + monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) + monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release")) + monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False) + monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0") + monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2") + monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace( + get_hwinfo=lambda: (None, "vendor", "model", "release") + )) + monkeypatch.setattr(gui.importlib.util, "find_spec", lambda name: object() if name == "PIL" else None) + monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()}) + monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None) + monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args)) + app._rip_track = MethodType(fake_rip_track, app) + + settings = { + "device": "/dev/cdrom", + "output_directory": str(tmp_path), + "working_directory": None, + "country": None, + "release_id": None, + "unknown": False, + "cdr": False, + "keep_going": True, + "overread": False, + "cover_art": "complete", + "max_retries": 1, + "offset": 0, + "logger": "whipper", + "track_template": "%t", + "disc_template": "%d", + } + + gui.WhipperGui._rip_disc_worker(app, settings) + + program = program_holder["program"] + assert program.write_cue is True + assert program.write_m3u_called is True + assert program.write_log_called is True + assert program.cover_art_paths + assert app.status_label.get_text() == "Done" + assert "Rip finished successfully" in "".join(app.logs) + + +def test_rip_disc_worker_cancel_during_verify(monkeypatch, tmp_path): + app = _make_ui_app(tmp_path) + app.current_release = _make_metadata("Selected", 200000, "release-mbid") + app.unknown_check.set_active(False) + app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]} + verify_started = threading.Event() + + class FakeProgram: + def __init__(self, _conf, record=False): + assert record is False + self.metadata = None + self.outdir = None + self.result = FakeRipResult() + + def getFastToc(self, _runner, _device): + return SimpleNamespace( + getCDDBDiscId=lambda: "cddb-id", + getMusicBrainzDiscId=lambda: "mbid-disc", + ) + + def getRipResult(self): + return self.result + + def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None): + if track_number is None: + return os.path.join(base, "Artist - Selected", "Artist - Selected") + return os.path.join(base, "Artist - Selected", "track-%02d" % track_number) + + def getTable(self, *_args): + return FakeITable() + + def getHTOA(self): + return None + + def verifyImage(self, _runner, _itable): + verify_started.set() + while not app.rip_cancel_requested: + time.sleep(0.01) + raise gui.RipCancelledError("Rip cancelled") + + def writeCue(self, _disc_name): + return None + + def write_m3u(self, _disc_name): + return None + + def writeLog(self, _disc_name, _logger): + return None + + def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total, + _cover_art_path, _skipped_tracks, _mbdiscid): + program.result.tracks.append(SimpleNamespace(number=track_number, filename="track.flac")) + + runner = FakeRunner() + monkeypatch.setattr(gui, "Program", FakeProgram) + monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) + monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace( + getDefeatsCache=lambda *_args: True, + )) + monkeypatch.setattr(gui.utils, "load_device", lambda _device: None) + monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None) + monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) + monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release")) + monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False) + monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0") + monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2") + monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace( + get_hwinfo=lambda: (None, "vendor", "model", "release") + )) + monkeypatch.setattr(gui.importlib.util, "find_spec", lambda _name: None) + monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()}) + monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None) + monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1) + monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) + app._rip_track = MethodType(fake_rip_track, app) + + settings = { + "device": "/dev/cdrom", + "output_directory": str(tmp_path), + "working_directory": None, + "country": None, + "release_id": None, + "unknown": False, + "cdr": False, + "keep_going": True, + "overread": False, + "cover_art": None, + "max_retries": 1, + "offset": 0, + "logger": "whipper", + "track_template": "%t", + "disc_template": "%d", + } + + worker = threading.Thread( + target=gui.WhipperGui._rip_disc_worker, + args=(app, settings), + ) + worker.start() + assert verify_started.wait(timeout=2.0) + gui.WhipperGui._on_stop_clicked(app, None) + _drain_glib_until(lambda: not worker.is_alive()) + worker.join() + + assert app.rip_runner is None + assert app.rip_cancel_requested is True + assert runner.cancelled is True + assert app.status_label.get_text() == "Cancelled" diff --git a/whipper/test/test_program_cdparanoia.py b/whipper/test/test_program_cdparanoia.py index 108b9e4..5b4395e 100644 --- a/whipper/test/test_program_cdparanoia.py +++ b/whipper/test/test_program_cdparanoia.py @@ -2,6 +2,7 @@ # vi:si:et:sw=4:sts=4:ts=4 import os +from unittest import mock from whipper.extern.task import task @@ -89,3 +90,59 @@ class CacheTestCase(common.TestCase): t = AnalyzeFileTask(path) self.runner.run(t) self.assertTrue(t.defeatsCache) + + +class ReadTrackAbortTestCase(common.TestCase): + def testAbortTerminatesRunningProcess(self): + popen = mock.Mock() + popen.poll.return_value = None + + rip_task = cdparanoia.ReadTrackTask( + '/tmp/track.wav', mock.Mock(), 0, 0, False + ) + rip_task._popen = popen + + rip_task.abort() + + popen.terminate.assert_called_once_with() + + def testAbortIgnoresFinishedProcess(self): + popen = mock.Mock() + popen.poll.return_value = 0 + + rip_task = cdparanoia.ReadTrackTask( + '/tmp/track.wav', mock.Mock(), 0, 0, False + ) + rip_task._popen = popen + + rip_task.abort() + + popen.terminate.assert_not_called() + + +class ReadVerifyAbortTestCase(common.TestCase): + def testAbortDelegatesToCurrentTaskAbort(self): + current_task = mock.Mock() + current_task.abort = mock.Mock() + verify_task = cdparanoia.ReadVerifyTrackTask.__new__( + cdparanoia.ReadVerifyTrackTask + ) + verify_task.tasks = [mock.Mock(), current_task] + verify_task._task = 2 + + verify_task.abort() + + current_task.abort.assert_called_once_with() + + def testAbortFallsBackToStop(self): + current_task = mock.Mock() + del current_task.abort + verify_task = cdparanoia.ReadVerifyTrackTask.__new__( + cdparanoia.ReadVerifyTrackTask + ) + verify_task.tasks = [current_task] + verify_task._task = 1 + + verify_task.abort() + + current_task.stop.assert_called_once_with()