Add new features

This commit is contained in:
2026-04-18 17:55:52 +03:00
parent 992923bdc4
commit 56fd3a10c7
7 changed files with 1486 additions and 153 deletions

66
PKGBUILD Normal file
View File

@@ -0,0 +1,66 @@
pkgname=whipper-git
pkgver=0.10.0.r66.g992923b
pkgrel=1
pkgdesc='CD-DA ripper prioritising accuracy over speed'
arch=('x86_64')
url='https://github.com/whipper-team/whipper'
license=('GPL-3.0-or-later')
depends=(
'python'
'python-discid'
'python-musicbrainzngs'
'python-mutagen'
'python-pycdio'
'python-pygobject'
'python-ruamel-yaml'
'cdrdao'
'cdparanoia'
'flac'
'gtk3'
'libdiscid'
'libsndfile'
'sox'
)
makedepends=(
'git'
'gcc'
'python-build'
'python-installer'
'python-setuptools'
'python-setuptools-scm'
'python-wheel'
)
checkdepends=(
'python-pytest'
'python-twisted'
)
optdepends=(
'python-pillow: cover art embedding support'
'python-docutils: man page generation'
)
provides=('whipper')
conflicts=('whipper')
source=('git+https://git.daemonlord.ru/benya/whipper-gui.git')
sha256sums=('SKIP')
pkgver() {
cd "$srcdir/whipper"
git describe --long --tags --abbrev=7 | sed 's/^v//; s/\([^-]*-g\)/r\1/; s/-/./g'
}
build() {
cd "$srcdir/whipper"
python -m build --wheel --no-isolation
}
check() {
cd "$srcdir/whipper"
export HOME="$srcdir/home"
mkdir -p "$HOME"
pytest -q
}
package() {
cd "$srcdir/whipper"
python -m installer --destdir="$pkgdir" dist/*.whl
}

View File

@@ -28,6 +28,7 @@ In order to track whipper's latest changes it's advised to check its commit hist
3. [Fetching the source code](#fetching-the-source-code)
4. [Finalizing the build](#finalizing-the-build)
- [Usage](#usage)
- [GUI frontend](#gui-frontend)
- [Getting started](#getting-started)
- [Configuration file documentation](#configuration-file-documentation)
- [Running uninstalled](#running-uninstalled)
@@ -136,6 +137,7 @@ Whipper relies on the following packages in order to run correctly and provide a
- [pycdio](https://pypi.python.org/pypi/pycdio/), for drive identification (required for drive offset and caching behavior to be stored in the configuration file).
- To avoid bugs it's advised to use the most recent `pycdio` version with the corresponding `libcdio` release or, if stuck on old pycdio versions, **0.20**/**0.21** with `libcdio`**0.90****0.94**. All other combinations won't probably work.
- [discid](https://pypi.org/project/discid/), for calculating Musicbrainz disc id.
- [PyGObject](https://pypi.org/project/PyGObject/), for the GTK frontend (`whipper-gui`)
- [ruamel.yaml](https://pypi.org/project/ruamel.yaml/), for generating well formed YAML report logfiles
- [libsndfile](http://www.mega-nerd.com/libsndfile/), for reading wav files
- [flac](https://xiph.org/flac/), for reading flac files
@@ -147,6 +149,7 @@ Some dependencies aren't available in the PyPI. They can be probably installed u
- [cd-paranoia](https://github.com/rocky/libcdio-paranoia)
- [cdrdao](http://cdrdao.sourceforge.net/)
- GTK 3 / PyGObject runtime packages (distribution specific, required by `whipper-gui`)
- [libsndfile](http://www.mega-nerd.com/libsndfile/)
- [flac](https://xiph.org/flac/)
- [sox](http://sox.sourceforge.net/)
@@ -176,7 +179,7 @@ cd whipper
### Finalizing the build
Install whipper: `python3 setup.py install`
Install whipper: `python3 -m pip install .`
Note that, depending on the chosen installation path, this command may require elevated rights.
@@ -184,7 +187,12 @@ To build the man pages, follow the instructions in the relevant [README](https:/
## Usage
Whipper currently only has a command-line interface called `whipper` which is self-documenting: `whipper -h` gives you the basic instructions.
Whipper provides:
- a command-line interface called `whipper`
- a GTK frontend called `whipper-gui`
The CLI is still the most complete interface. `whipper -h` gives you the basic instructions.
Whipper implements a tree of commands: for example, the top-level `whipper` command has a number of sub-commands.
@@ -200,6 +208,35 @@ is not, because the `-d` argument applies to the `cd` command.
A more complete set of usage instructions can be found in the `whipper` [man pages](https://github.com/whipper-team/whipper/blob/develop/man/README.md).
## GUI frontend
`whipper-gui` is a GTK 3 frontend for the common `whipper cd` workflow.
Launch it with:
```bash
whipper-gui
```
The current GUI can:
- detect and select optical drives
- read the current disc TOC
- look up matching MusicBrainz releases
- inspect release and track metadata
- rip discs through whipper's native task pipeline
- keep a live log of the rip process
- cancel an in-progress TOC read or rip
- configure the logger, working directory, and disc/track templates
- remember its GUI settings in `$XDG_CONFIG_HOME/whipper/gui.json`
The current GUI does not yet expose every CLI feature. In particular, these options are still CLI-only:
- interactive release prompting (`whipper cd -p`)
- a few lower-level CLI workflows outside `whipper cd rip` such as `drive analyze` and `offset find`
The GUI uses whipper's internal API both for disc scanning and for the rip pipeline itself. It still lacks dedicated GUI workflows for some of the more specialized CLI tools.
## Getting started
The simplest way to get started making accurate rips is:
@@ -286,6 +323,7 @@ source checkout:
```bash
python3 -m whipper -h
python3 -m whipper.gui
```
## Logger plugins

View File

@@ -1,24 +1,65 @@
import os
import re
import subprocess
import sys
import threading
import json
import io
import logging
import contextlib
import importlib.util
from pathlib import Path
_GUI_IMPORT_ERROR = None
try:
import cdio
except ImportError as exc:
cdio = None
_GUI_IMPORT_ERROR = exc
try:
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk
except (ImportError, ValueError) as exc:
gi = None
GLib = None
Gtk = None
if _GUI_IMPORT_ERROR is None:
_GUI_IMPORT_ERROR = exc
from whipper.common import common, config, drive, mbngs, task as whipper_task
from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task
from whipper.common.program import Program
from whipper.program import cdrdao, utils
from whipper.command import cd as cd_command
from whipper.program import cdrdao, cdparanoia, utils
from whipper.result import result
RIP_START_RE = re.compile(r"ripping track (\d+) of (\d+)(?: \(try (\d+)\))?: (.+)")
RIP_DONE_RE = re.compile(r"CRCs match for track (\d+)")
RIP_SKIP_RE = re.compile(r"giving up on track (\d+) after (\d+) times")
logger = logging.getLogger(__name__)
class RipCancelledError(Exception):
pass
def gui_runtime_available():
return _GUI_IMPORT_ERROR is None
def gui_runtime_error_message():
missing = []
if gi is None:
missing.append("PyGObject / GTK 3")
if cdio is None:
missing.append("pycdio")
if not missing:
missing.append("GUI runtime dependencies")
return "whipper-gui requires %s" % " and ".join(missing)
def require_gui_runtime():
if not gui_runtime_available():
raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR
def _format_duration_ms(duration_ms):
@@ -35,24 +76,35 @@ def _release_year(metadata):
return metadata.release[:4] if metadata.release else ""
class WhipperGui(Gtk.Application):
def _release_duration_distance(metadata, duration_ms):
metadata_duration = getattr(metadata, "duration", None)
if metadata_duration is None:
return float("inf")
return abs(metadata_duration - duration_ms)
class WhipperGui(Gtk.Application if Gtk is not None else object):
def __init__(self):
require_gui_runtime()
super().__init__(application_id="com.github.whipper_team.WhipperGui")
self.window = None
self.main_pane = None
self.process = None
self.worker = None
self.pulse_id = 0
self.scan_data = None
self.scan_runner = None
self.rip_runner = None
self.scan_cancel_requested = False
self.rip_cancel_requested = False
self.current_release = None
self.current_track_number = 0
self.current_track_total = 0
self.device_combo = None
self.output_button = None
self.working_directory_entry = None
self.country_entry = None
self.release_id_entry = None
self.refresh_button = None
@@ -67,6 +119,9 @@ class WhipperGui(Gtk.Application):
self.cover_art_combo = None
self.max_retries_spin = None
self.offset_spin = None
self.logger_combo = None
self.track_template_entry = None
self.disc_template_entry = None
self.release_store = None
self.release_view = None
@@ -129,17 +184,26 @@ class WhipperGui(Gtk.Application):
action=Gtk.FileChooserAction.SELECT_FOLDER,
)
self.output_button.set_filename(os.path.expanduser("~"))
self.output_button.connect("file-set", self._on_settings_changed)
grid.attach(self.output_button, 1, 1, 3, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 2, 1, 1)
grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1)
self.working_directory_entry = Gtk.Entry()
self.working_directory_entry.set_placeholder_text("Optional working directory")
self.working_directory_entry.connect("changed", self._on_settings_changed)
grid.attach(self.working_directory_entry, 1, 2, 3, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 3, 1, 1)
self.country_entry = Gtk.Entry()
self.country_entry.set_placeholder_text("Optional MusicBrainz country filter")
grid.attach(self.country_entry, 1, 2, 1, 1)
self.country_entry.connect("changed", self._on_settings_changed)
grid.attach(self.country_entry, 1, 3, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 2, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 3, 1, 1)
self.release_id_entry = Gtk.Entry()
self.release_id_entry.set_placeholder_text("Optional release override")
grid.attach(self.release_id_entry, 3, 2, 1, 1)
self.release_id_entry.connect("changed", self._on_settings_changed)
grid.attach(self.release_id_entry, 3, 3, 1, 1)
return grid
@@ -190,6 +254,7 @@ class WhipperGui(Gtk.Application):
pane.set_wide_handle(True)
pane.pack1(self._build_left_panel(), resize=True, shrink=False)
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
self.main_pane = pane
return pane
def _build_left_panel(self):
@@ -286,13 +351,16 @@ class WhipperGui(Gtk.Application):
grid.attach(self.unknown_check, 0, 0, 2, 1)
self.cdr_check = Gtk.CheckButton(label="Allow CD-R")
self.cdr_check.connect("toggled", self._on_settings_changed)
grid.attach(self.cdr_check, 2, 0, 1, 1)
self.keep_going_check = Gtk.CheckButton(label="Keep going on failed tracks")
self.keep_going_check.set_active(True)
self.keep_going_check.connect("toggled", self._on_settings_changed)
grid.attach(self.keep_going_check, 0, 1, 2, 1)
self.overread_check = Gtk.CheckButton(label="Force overread")
self.overread_check.connect("toggled", self._on_settings_changed)
grid.attach(self.overread_check, 2, 1, 1, 1)
grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1)
@@ -302,24 +370,47 @@ class WhipperGui(Gtk.Application):
self.cover_art_combo.append("embed", "Embed only")
self.cover_art_combo.append("complete", "File + embed")
self.cover_art_combo.set_active(0)
self.cover_art_combo.connect("changed", self._on_settings_changed)
grid.attach(self.cover_art_combo, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Max Retries", xalign=0), 2, 2, 1, 1)
self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1)
self.max_retries_spin.set_value(5)
self.max_retries_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.max_retries_spin, 3, 2, 1, 1)
grid.attach(Gtk.Label(label="Offset", xalign=0), 0, 3, 1, 1)
self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1)
self.offset_spin.set_value(0)
self.offset_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.offset_spin, 1, 3, 1, 1)
grid.attach(Gtk.Label(label="Logger", xalign=0), 2, 3, 1, 1)
self.logger_combo = Gtk.ComboBoxText()
for logger_name in sorted(result.getLoggers()):
self.logger_combo.append(logger_name, logger_name)
self.logger_combo.set_active_id("whipper")
self.logger_combo.connect("changed", self._on_settings_changed)
grid.attach(self.logger_combo, 3, 3, 1, 1)
grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 4, 1, 1)
self.track_template_entry = Gtk.Entry()
self.track_template_entry.set_text(cd_command.DEFAULT_TRACK_TEMPLATE)
self.track_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.track_template_entry, 1, 4, 3, 1)
grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 5, 1, 1)
self.disc_template_entry = Gtk.Entry()
self.disc_template_entry.set_text(cd_command.DEFAULT_DISC_TEMPLATE)
self.disc_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.disc_template_entry, 1, 5, 3, 1)
note = Gtk.Label(
label="If your drive offset is configured in whipper, leave Offset at 0 and use the config value.",
label="The configured drive offset is loaded automatically when whipper knows this drive.",
xalign=0,
)
note.set_line_wrap(True)
grid.attach(note, 2, 3, 2, 1)
grid.attach(note, 0, 6, 4, 1)
return frame
@@ -353,6 +444,7 @@ class WhipperGui(Gtk.Application):
def _collect_gui_settings(self):
return {
"output_directory": self.output_button.get_filename(),
"working_directory": self.working_directory_entry.get_text(),
"country": self.country_entry.get_text(),
"release_id": self.release_id_entry.get_text(),
"unknown": self.unknown_check.get_active(),
@@ -361,6 +453,13 @@ class WhipperGui(Gtk.Application):
"overread": self.overread_check.get_active(),
"cover_art": self.cover_art_combo.get_active_id() or "",
"max_retries": int(self.max_retries_spin.get_value()),
"offset": int(self.offset_spin.get_value()),
"logger": self.logger_combo.get_active_id() or "whipper",
"track_template": self.track_template_entry.get_text(),
"disc_template": self.disc_template_entry.get_text(),
"window_width": self.window.get_size()[0] if self.window is not None else None,
"window_height": self.window.get_size()[1] if self.window is not None else None,
"pane_position": self.main_pane.get_position() if self.main_pane is not None else None,
}
def _save_gui_settings(self):
@@ -382,6 +481,7 @@ class WhipperGui(Gtk.Application):
output_directory = data.get("output_directory")
if output_directory and os.path.isdir(output_directory):
self.output_button.set_filename(output_directory)
self.working_directory_entry.set_text(data.get("working_directory", ""))
self.country_entry.set_text(data.get("country", ""))
self.release_id_entry.set_text(data.get("release_id", ""))
self.unknown_check.set_active(bool(data.get("unknown", True)))
@@ -399,9 +499,37 @@ class WhipperGui(Gtk.Application):
if isinstance(retries, int):
self.max_retries_spin.set_value(retries)
offset = data.get("offset")
if isinstance(offset, int):
self.offset_spin.set_value(offset)
logger_name = data.get("logger", "whipper")
if logger_name in result.getLoggers():
self.logger_combo.set_active_id(logger_name)
self.track_template_entry.set_text(
data.get("track_template", cd_command.DEFAULT_TRACK_TEMPLATE)
)
self.disc_template_entry.set_text(
data.get("disc_template", cd_command.DEFAULT_DISC_TEMPLATE)
)
width = data.get("window_width")
height = data.get("window_height")
if isinstance(width, int) and isinstance(height, int):
self.window.resize(width, height)
pane_position = data.get("pane_position")
if isinstance(pane_position, int):
GLib.idle_add(self.main_pane.set_position, pane_position)
def _set_label(self, key, value):
self.info_labels[key].set_text(value or "")
def _can_rip(self):
has_release = self.current_release is not None or bool(self.release_id_entry.get_text().strip())
return has_release or self.unknown_check.get_active()
def _reset_progress(self):
self.current_track_number = 0
self.current_track_total = 0
@@ -414,18 +542,18 @@ class WhipperGui(Gtk.Application):
def _set_running_state(self, running, status):
self.read_button.set_sensitive(not running)
self.refresh_button.set_sensitive(not running)
self.rip_button.set_sensitive((not running) and (self.current_release is not None or self.unknown_check.get_active()))
self.rip_button.set_sensitive((not running) and self._can_rip())
self.stop_button.set_sensitive(running)
self.status_label.set_text(status)
if running:
if self.pulse_id == 0:
self.pulse_id = GLib.timeout_add(150, self._pulse_track_bar)
self.pulse_id = GLib.timeout_add(150, self._pulse_progress)
elif self.pulse_id:
GLib.source_remove(self.pulse_id)
self.pulse_id = 0
def _pulse_track_bar(self):
if self.process is None:
def _pulse_progress(self):
if self.scan_runner is None and self.rip_runner is None:
return False
self.track_bar.pulse()
return True
@@ -457,6 +585,7 @@ class WhipperGui(Gtk.Application):
def _update_release_store(self, releases):
self.release_store.clear()
self.current_release = None
for metadata in releases:
self.release_store.append([
metadata.artist or "",
@@ -468,6 +597,9 @@ class WhipperGui(Gtk.Application):
])
if releases:
self.release_view.get_selection().select_path(0)
else:
self._update_track_store(None)
self._update_release_details(None)
def _update_track_store(self, metadata):
self.track_store.clear()
@@ -521,13 +653,6 @@ class WhipperGui(Gtk.Application):
if offset is not None:
self.offset_spin.set_value(int(offset))
def _selected_release(self):
selection = self.release_view.get_selection()
model, tree_iter = selection.get_selected()
if tree_iter is None:
return None
return model.get_value(tree_iter, 5)
def _read_disc_worker(self, device, country):
try:
self.scan_cancel_requested = False
@@ -537,10 +662,8 @@ class WhipperGui(Gtk.Application):
GLib.idle_add(self._set_running_state, True, "Reading disc")
GLib.idle_add(self.progress_label.set_text, "Scanning disc and looking up MusicBrainz")
conf = config.Config()
runner = CancellableSyncRunner()
self.scan_runner = runner
prog = Program(conf, record=False)
utils.load_device(device)
utils.unmount_device(device)
@@ -563,7 +686,7 @@ class WhipperGui(Gtk.Application):
releases = []
if self.scan_cancel_requested:
raise RuntimeError("Disc scan cancelled")
releases = sorted(releases, key=lambda md: abs(md.duration - ittoc.duration()))
releases = sorted(releases, key=lambda md: _release_duration_distance(md, ittoc.duration()))
def apply_results():
self.scan_data = {
@@ -585,7 +708,6 @@ class WhipperGui(Gtk.Application):
self._append_log("Disc duration: %s, %s audio tracks\n" % (duration, track_count))
self._append_log("\nFound %d matching release(s)\n" % len(releases))
self._update_release_store(releases)
self._update_release_details(None)
self._set_running_state(False, "Disc ready")
self.progress_label.set_text("Disc metadata loaded")
if not releases:
@@ -617,131 +739,399 @@ class WhipperGui(Gtk.Application):
finally:
self.scan_runner = None
def _build_cd_args(self):
args = [sys.executable, "-m", "whipper", "cd"]
device = self._selected_device()
if device:
args.extend(["-d", device])
country = self.country_entry.get_text().strip()
if country:
args.extend(["-c", country])
release_override = self.release_id_entry.get_text().strip()
if release_override:
args.extend(["-R", release_override])
elif self.current_release is not None:
args.extend(["-R", self.current_release.mbid])
return args
def _run_rip_command(self):
args = self._build_cd_args()
args.append("rip")
output_dir = self.output_button.get_filename()
if output_dir:
args.extend(["-O", output_dir])
cover_art = self.cover_art_combo.get_active_id()
if cover_art:
args.extend(["-C", cover_art])
def _collect_rip_settings(self):
retries = int(self.max_retries_spin.get_value())
args.extend(["-r", str(retries)])
return {
"device": self._selected_device(),
"output_directory": self.output_button.get_filename() or os.curdir,
"working_directory": self.working_directory_entry.get_text().strip() or None,
"country": self.country_entry.get_text().strip() or None,
"release_id": self.release_id_entry.get_text().strip() or None,
"unknown": self.unknown_check.get_active(),
"cdr": self.cdr_check.get_active(),
"keep_going": self.keep_going_check.get_active(),
"overread": self.overread_check.get_active(),
"cover_art": self.cover_art_combo.get_active_id() or None,
"max_retries": float("inf") if retries == 0 else retries,
"offset": int(self.offset_spin.get_value()),
"logger": self.logger_combo.get_active_id() or "whipper",
"track_template": self.track_template_entry.get_text() or cd_command.DEFAULT_TRACK_TEMPLATE,
"disc_template": self.disc_template_entry.get_text() or cd_command.DEFAULT_DISC_TEMPLATE,
}
offset = int(self.offset_spin.get_value())
if offset:
args.extend(["-o", str(offset)])
def _validate_rip_settings(self, settings):
if not settings["device"]:
raise ValueError("No optical drive selected")
if not settings["output_directory"]:
raise ValueError("Output directory is required")
if not os.path.isdir(os.path.expanduser(settings["output_directory"])):
raise ValueError("Output directory does not exist")
cd_command.validate_template(settings["track_template"], "track")
cd_command.validate_template(settings["disc_template"], "disc")
if settings["working_directory"] and not os.path.isdir(os.path.expanduser(settings["working_directory"])):
raise ValueError("Working directory does not exist")
if settings["logger"] not in result.getLoggers():
raise ValueError("Unknown logger '%s'" % settings["logger"])
if self.unknown_check.get_active():
args.append("-U")
if self.cdr_check.get_active():
args.append("--cdr")
if self.keep_going_check.get_active():
args.append("-k")
if self.overread_check.get_active():
args.append("-x")
def _install_gui_log_handler(self):
handler = GuiLogHandler(self)
whipper_logger = logging.getLogger("whipper")
previous_level = whipper_logger.level
if previous_level == logging.NOTSET or previous_level > logging.INFO:
whipper_logger.setLevel(logging.INFO)
whipper_logger.addHandler(handler)
return whipper_logger, handler, previous_level
self._clear_log()
self._reset_progress()
self._append_log("$ %s\n\n" % " ".join(args))
self._set_running_state(True, "Ripping disc")
self.progress_label.set_text("Preparing rip")
@staticmethod
def _remove_gui_log_handler(whipper_logger, handler, previous_level):
whipper_logger.removeHandler(handler)
whipper_logger.setLevel(previous_level)
def worker():
try:
self.process = subprocess.Popen(
args,
cwd=os.getcwd(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
def _resolve_release_metadata(self, mbdiscid, settings):
if settings["release_id"]:
return mbngs.getReleaseMetadata(
settings["release_id"],
discid=mbdiscid,
country=settings["country"],
record=False,
)
assert self.process.stdout is not None
for line in self.process.stdout:
GLib.idle_add(self._handle_rip_output_line, line)
returncode = self.process.wait()
GLib.idle_add(self._append_log, "\nProcess finished with exit code %d\n" % returncode)
GLib.idle_add(self._finish_rip, returncode)
except Exception as exc:
GLib.idle_add(self._append_log, "\n%s\n" % exc)
GLib.idle_add(self._finish_rip, 1)
finally:
self.process = None
if self.current_release is not None:
return self.current_release
if self.scan_data and self.scan_data.get("mbid") == mbdiscid:
releases = self.scan_data.get("releases") or []
if releases:
return releases[0]
return None
self.worker = threading.Thread(target=worker, daemon=True)
self.worker.start()
def _handle_rip_output_line(self, line):
self._append_log(line)
match = RIP_START_RE.search(line)
if match:
self.current_track_number = int(match.group(1))
self.current_track_total = int(match.group(2))
track_name = match.group(4)
overall_fraction = (self.current_track_number - 1) / max(self.current_track_total, 1)
self.overall_bar.set_fraction(overall_fraction)
self.overall_bar.set_text(
"Track %d/%d" % (self.current_track_number, self.current_track_total)
)
self.track_bar.set_fraction(0.0)
self.track_bar.set_text(track_name)
self.progress_label.set_text("Ripping %s" % track_name)
def _update_rip_task_progress(self, description, item_label, item_index, item_total, value):
value = min(max(value, 0.0), 1.0)
self.current_track_number = item_index
self.current_track_total = item_total
self.overall_bar.set_fraction(((item_index - 1) + value) / max(item_total, 1))
self.overall_bar.set_text("Track %d/%d" % (item_index, item_total))
self.track_bar.set_fraction(value)
self.track_bar.set_text(description)
self.progress_label.set_text("%s: %s" % (item_label, description))
return False
match = RIP_DONE_RE.search(line)
if match:
finished = int(match.group(1))
if self.current_track_total:
self.overall_bar.set_fraction(finished / self.current_track_total)
self.overall_bar.set_text("Track %d/%d" % (finished, self.current_track_total))
def _mark_track_finished(self, item_label, item_index, item_total, skipped=False):
self.current_track_number = item_index
self.current_track_total = item_total
self.overall_bar.set_fraction(item_index / max(item_total, 1))
self.overall_bar.set_text("Track %d/%d" % (item_index, item_total))
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("Track %d done" % finished)
self.progress_label.set_text("Track %d verified" % finished)
self.track_bar.set_text("%s %s" % (item_label, "skipped" if skipped else "done"))
self.progress_label.set_text("%s %s" % (item_label, "failed" if skipped else "verified"))
return False
match = RIP_SKIP_RE.search(line)
if match:
skipped = int(match.group(1))
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("Track %d skipped" % skipped)
self.progress_label.set_text("Track %d failed" % skipped)
return False
return False
def _finish_rip(self, returncode):
if returncode == 0:
def _finish_rip(self, returncode, status="Done"):
if returncode in (0, 5):
self.overall_bar.set_fraction(1.0)
self.overall_bar.set_text("Complete")
self.track_bar.set_fraction(1.0)
self.progress_label.set_text("Rip complete")
self._set_running_state(False, "Done")
self._set_running_state(False, status)
elif self.rip_cancel_requested:
self.progress_label.set_text("Rip cancelled")
self._set_running_state(False, "Cancelled")
else:
self.progress_label.set_text("Rip failed")
self._set_running_state(False, "Failed")
return False
def _rip_track(self, runner, program, itable, settings, track_number, item_index, item_total,
cover_art_path, skipped_tracks, mbdiscid):
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
track_result = program.result.getTrackResult(track_number)
if not track_result:
track_result = result.TrackResult()
program.result.tracks.append(track_result)
path = program.getPath(
program.outdir,
settings["track_template"],
mbdiscid,
program.metadata,
track_number=track_number,
) + ".flac"
track_result.number = track_number
track_result.filename = path
if track_number > 0:
track_result.pregap = itable.tracks[track_number - 1].getPregap()
track_result.pre_emphasis = itable.tracks[track_number - 1].pre_emphasis
item_label = "HTOA" if track_number == 0 else "Track %d" % track_number
if os.path.exists(path):
GLib.idle_add(self._append_log, "%s already exists, verifying\n" % item_label)
if not program.verifyTrack(runner, track_result):
GLib.idle_add(self._append_log, "%s verification failed, reripping\n" % item_label)
os.unlink(path)
if not os.path.exists(path):
track_result.testduration = 0.0
track_result.copyduration = 0.0
tries = 1
while tries <= settings["max_retries"]:
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
extra = "" if tries == 1 else " (try %d)" % tries
GLib.idle_add(self._append_log, "%s%s: %s\n" % (item_label, extra, os.path.basename(path)))
tag_list = program.getTagList(track_number, mbdiscid)
if track_number > 0 and itable.tracks[track_number - 1].isrc is not None:
tag_list["ISRC"] = itable.tracks[track_number - 1].isrc
rip_task = cdparanoia.ReadVerifyTrackTask(
track_result.filename,
program.result.table,
program.getHTOA()[0] if track_number == 0 else program.result.table.getTrackStart(track_number),
program.getHTOA()[1] if track_number == 0 else program.result.table.getTrackEnd(track_number),
settings["overread"],
offset=settings["offset"],
device=settings["device"],
taglist=tag_list,
what="%s%s" % (item_label.lower(), extra),
coverArtPath=cover_art_path,
)
listener = GuiTaskListener(self, item_label, item_index, item_total)
rip_task.addListener(listener)
for child_task in rip_task.tasks:
child_task.addListener(listener)
try:
runner.run(rip_task, verbose=False)
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
track_result.testcrc = rip_task.testchecksum
track_result.copycrc = rip_task.copychecksum
track_result.peak = rip_task.peak
track_result.quality = rip_task.quality
track_result.testspeed = rip_task.testspeed
track_result.copyspeed = rip_task.copyspeed
track_result.testduration += rip_task.testduration
track_result.copyduration += rip_task.copyduration
if track_result.filename != rip_task.path:
track_result.filename = rip_task.path
break
except Exception as exc:
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled") from exc
logger.debug("track %s try %d failed: %r", item_label, tries, exc)
tries += 1
if tries > settings["max_retries"]:
tries -= 1
GLib.idle_add(self._append_log, "%s giving up after %d tries\n" % (item_label, tries))
if settings["keep_going"]:
track_result.skipped = True
skipped_tracks.append(track_result)
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True)
return
raise RuntimeError("%s can't be ripped" % item_label)
if track_result in skipped_tracks:
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True)
return
if track_result.testcrc != track_result.copycrc:
raise RuntimeError("CRCs did not match for %s" % item_label)
if track_number == 0:
if track_result.peak == cd_command.SILENT:
program.result.table.setFile(
1, 0, None, program.result.table.getTrackStart(1), track_number
)
if os.path.exists(track_result.filename):
os.unlink(track_result.filename)
track_result.filename = None
GLib.idle_add(self._append_log, "HTOA discarded, contains digital silence\n")
else:
program.result.table.setFile(
1, 0, track_result.filename, program.result.table.getTrackStart(1), track_number
)
else:
program.result.table.setFile(
track_number,
1,
track_result.filename,
program.result.table.getTrackLength(track_number),
track_number,
)
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, False)
def _rip_disc_worker(self, settings):
whipper_logger, log_handler, previous_level = self._install_gui_log_handler()
original_cwd = os.getcwd()
try:
self.rip_cancel_requested = False
GLib.idle_add(self._clear_log)
GLib.idle_add(self._reset_progress)
GLib.idle_add(self._set_running_state, True, "Ripping disc")
GLib.idle_add(self.progress_label.set_text, "Preparing rip")
GLib.idle_add(self._append_log, "Starting native whipper rip\n\n")
runner = CancellableSyncRunner()
self.rip_runner = runner
conf = config.Config()
program = Program(conf, record=False)
if settings["working_directory"]:
os.chdir(os.path.expanduser(settings["working_directory"]))
utils.load_device(settings["device"])
utils.unmount_device(settings["device"])
if drive.get_cdrom_drive_status(settings["device"]) == 1:
raise OSError("No CD detected, please insert one and retry")
ittoc = program.getFastToc(runner, settings["device"])
program.getRipResult()
cddb = ittoc.getCDDBDiscId()
mbdiscid = ittoc.getMusicBrainzDiscId()
GLib.idle_add(self._append_log, "CDDB disc id: %s\n" % cddb)
GLib.idle_add(self._append_log, "MusicBrainz disc id: %s\n" % mbdiscid)
program.metadata = self._resolve_release_metadata(mbdiscid, settings)
if program.metadata is None and not settings["unknown"]:
raise RuntimeError("Unable to resolve disc metadata. Select a release or enable ripping without metadata.")
if program.metadata is not None:
program.metadata.discid = mbdiscid
GLib.idle_add(
self._append_log,
"Using release: %s - %s\n" % (
program.metadata.artist or "Unknown Artist",
program.metadata.releaseTitle or program.metadata.title or "Unknown Title",
),
)
program.result.isCdr = cdrdao.DetectCdr(settings["device"])
if program.result.isCdr and not settings["cdr"]:
raise RuntimeError("Inserted disc appears to be a CD-R. Enable 'Allow CD-R' to continue.")
out_fpath = program.getPath(
settings["output_directory"],
settings["disc_template"],
mbdiscid,
program.metadata,
)
itable = program.getTable(
runner,
ittoc.getCDDBDiscId(),
ittoc.getMusicBrainzDiscId(),
settings["device"],
settings["offset"],
out_fpath,
)
program.result.cdrdaoVersion = cdrdao.version()
program.result.cdparanoiaVersion = cdparanoia.getCdParanoiaVersion()
info = drive.getDeviceInfo(settings["device"])
if info:
try:
program.result.cdparanoiaDefeatsCache = conf.getDefeatsCache(*info)
except KeyError:
pass
program.result.artist = program.metadata.artist if program.metadata else "Unknown Artist"
program.result.title = program.metadata.releaseTitle if program.metadata else "Unknown Title"
_, program.result.vendor, program.result.model, program.result.release = \
cdio.Device(settings["device"]).get_hwinfo()
program.result.metadata = program.metadata
program.result.offset = settings["offset"]
program.result.overread = settings["overread"]
program.result.logger = settings["logger"]
program.outdir = settings["output_directory"]
disc_name = program.getPath(program.outdir, settings["disc_template"], mbdiscid, program.metadata)
dirname = os.path.dirname(disc_name)
if os.path.exists(dirname):
log_file = disc_name + ".log"
if os.path.exists(log_file):
raise RuntimeError("output directory %s is a finished rip" % dirname)
else:
os.makedirs(dirname)
cover_art_path = None
if settings["cover_art"] in {"embed", "complete"} and importlib.util.find_spec("PIL") is None:
GLib.idle_add(self._append_log, "Cover art embedding requires Pillow; continuing without embedded art\n")
elif settings["cover_art"] in {"file", "embed", "complete"}:
if getattr(program.metadata, "mbid", None):
cover_art_path = program.getCoverArt(dirname, program.metadata.mbid)
else:
GLib.idle_add(self._append_log, "Cover art requested but disc metadata is unavailable\n")
if settings["cover_art"] == "file":
embed_cover_art_path = None
else:
embed_cover_art_path = cover_art_path
skipped_tracks = []
rip_numbers = []
if program.getHTOA():
rip_numbers.append(0)
for index, track in enumerate(itable.tracks, start=1):
if track.audio:
rip_numbers.append(index)
else:
GLib.idle_add(self._append_log, "Skipping data track %d, not implemented\n" % index)
track.indexes[1].relative = 0
total_items = len(rip_numbers)
for item_index, track_number in enumerate(rip_numbers, start=1):
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
self._rip_track(
runner,
program,
itable,
settings,
track_number,
item_index,
total_items,
embed_cover_art_path,
skipped_tracks,
mbdiscid,
)
if settings["cover_art"] == "embed" and cover_art_path is not None:
os.remove(cover_art_path)
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
program.skipped_tracks = skipped_tracks or None
program.writeCue(disc_name)
program.write_m3u(disc_name)
try:
with contextlib.redirect_stdout(io.StringIO()) as stdout_buffer:
program.verifyImage(runner, itable)
accurip.print_report(program.result)
report_output = stdout_buffer.getvalue()
if report_output:
GLib.idle_add(self._append_log, "\n" + report_output + "\n")
except accurip.EntryNotFound:
GLib.idle_add(self._append_log, "AccurateRip entry not found\n")
txt_logger = result.getLoggers()[settings["logger"]]()
program.writeLog(disc_name, txt_logger)
if skipped_tracks:
GLib.idle_add(self._append_log, "%d track(s) were skipped during this rip\n" % len(skipped_tracks))
GLib.idle_add(self._finish_rip, 5, "Done with skipped tracks")
else:
GLib.idle_add(self._append_log, "Rip finished successfully\n")
GLib.idle_add(self._finish_rip, 0, "Done")
except Exception as exc:
if not isinstance(exc, RipCancelledError):
GLib.idle_add(self._append_log, "%s\n" % exc)
GLib.idle_add(self._finish_rip, 1, "Cancelled" if self.rip_cancel_requested else "Failed")
finally:
self.rip_runner = None
self._remove_gui_log_handler(whipper_logger, log_handler, previous_level)
os.chdir(original_cwd)
def _on_release_selected(self, selection):
model, tree_iter = selection.get_selected()
if tree_iter is None:
@@ -752,7 +1142,7 @@ class WhipperGui(Gtk.Application):
self.current_release = model.get_value(tree_iter, 5)
self._update_track_store(self.current_release)
self._update_release_details(self.current_release)
self.rip_button.set_sensitive(self.current_release is not None or self.unknown_check.get_active())
self.rip_button.set_sensitive(self._can_rip())
def _on_refresh_clicked(self, _button):
self._refresh_devices()
@@ -760,12 +1150,22 @@ class WhipperGui(Gtk.Application):
def _on_unknown_toggled(self, _button):
self._save_gui_settings()
self.rip_button.set_sensitive(
(self.process is None) and
(self.current_release is not None or self.unknown_check.get_active())
(self.scan_runner is None) and
(self.rip_runner is None) and
self._can_rip()
)
def _on_settings_changed(self, *_args):
self._save_gui_settings()
self.rip_button.set_sensitive(
(self.scan_runner is None) and
(self.rip_runner is None) and
self._can_rip()
)
def _on_device_changed(self, _combo):
self._apply_configured_offset()
self._save_gui_settings()
def _on_read_clicked(self, _button):
device = self._selected_device()
@@ -780,20 +1180,88 @@ class WhipperGui(Gtk.Application):
self.worker.start()
def _on_rip_clicked(self, _button):
if self.current_release is None and not self.unknown_check.get_active():
if not self._can_rip():
self._append_log("Select a release or enable ripping without metadata.\n")
self.status_label.set_text("Missing release")
return
self._run_rip_command()
settings = self._collect_rip_settings()
try:
self._validate_rip_settings(settings)
except Exception as exc:
self._append_log("%s\n" % exc)
self.status_label.set_text("Invalid settings")
return
self.worker = threading.Thread(
target=self._rip_disc_worker,
args=(settings,),
daemon=True,
)
self.worker.start()
def _on_stop_clicked(self, _button):
if self.process is not None:
self.process.terminate()
self._append_log("\nStopping process...\n")
elif self.scan_runner is not None:
if self.scan_runner is not None:
self.scan_cancel_requested = True
self._append_log("\nStopping scan...\n")
self.scan_runner.cancel()
elif self.rip_runner is not None:
self.rip_cancel_requested = True
self._append_log("\nStopping rip...\n")
self.rip_runner.cancel()
class GuiLogHandler(logging.Handler):
def __init__(self, gui):
super().__init__(level=logging.INFO)
self.gui = gui
self.setFormatter(logging.Formatter("%(message)s"))
def emit(self, record):
try:
message = self.format(record)
except Exception:
message = record.getMessage()
GLib.idle_add(self.gui._append_log, message + "\n")
class GuiTaskListener:
def __init__(self, gui, item_label, item_index, item_total):
self.gui = gui
self.item_label = item_label
self.item_index = item_index
self.item_total = item_total
def started(self, task):
GLib.idle_add(
self.gui._update_rip_task_progress,
task.description,
self.item_label,
self.item_index,
self.item_total,
task.progress,
)
def progressed(self, task, value):
GLib.idle_add(
self.gui._update_rip_task_progress,
task.description,
self.item_label,
self.item_index,
self.item_total,
value,
)
def described(self, task, description):
GLib.idle_add(
self.gui._update_rip_task_progress,
description,
self.item_label,
self.item_index,
self.item_total,
task.progress,
)
def stopped(self, task):
return None
class CancellableSyncRunner(whipper_task.SyncRunner):
@@ -818,5 +1286,10 @@ class CancellableSyncRunner(whipper_task.SyncRunner):
def main():
try:
require_gui_runtime()
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
return 1
app = WhipperGui()
return app.run(sys.argv)

View File

@@ -404,6 +404,10 @@ class ReadTrackTask(task.Task):
self.stop()
return
def abort(self):
if getattr(self, "_popen", None) is not None and self._popen.poll() is None:
self._popen.terminate()
class ReadVerifyTrackTask(task.MultiSeparateTask):
"""
@@ -566,6 +570,15 @@ class ReadVerifyTrackTask(task.MultiSeparateTask):
task.MultiSeparateTask.stop(self)
def abort(self):
if 0 < self._task <= len(self.tasks):
current_task = self.tasks[self._task - 1]
abort = getattr(current_task, "abort", None)
if callable(abort):
abort()
else:
current_task.stop()
_VERSION_RE = re.compile(
"^cdparanoia (?P<version>.+) release (?P<release>.+)")

View File

@@ -45,6 +45,12 @@ class PathTestCase(unittest.TestCase):
# TODO: Test cover art embedding too.
class CoverArtTestCase(unittest.TestCase):
def setUp(self):
self.cover_art_path = None
def tearDown(self):
if self.cover_art_path and os.path.exists(self.cover_art_path):
os.unlink(self.cover_art_path)
@staticmethod
def _mock_get_front_image(release_id):
@@ -90,5 +96,6 @@ class CoverArtTestCase(unittest.TestCase):
# https://musicbrainz.org/release/76df3287-6cda-33eb-8e9a-044b5e15ffdd
path = os.path.dirname(__file__)
release_id = "76df3287-6cda-33eb-8e9a-044b5e15ffdd"
coverArtPath = self._mock_getCoverArt(path, release_id)
self.assertTrue(os.path.isfile(coverArtPath))
self.cover_art_path = self._mock_getCoverArt(path, release_id)
self.assertEqual(self.cover_art_path, os.path.join(path, 'cover.jpg'))
self.assertTrue(os.path.isfile(self.cover_art_path))

679
whipper/test/test_gui.py Normal file
View File

@@ -0,0 +1,679 @@
import os
import threading
import time
from types import MethodType, SimpleNamespace
import pytest
from whipper import gui
def _bind_methods(app, *names):
for name in names:
setattr(app, name, MethodType(getattr(gui.WhipperGui, name), app))
return app
def _drain_glib_until(predicate, timeout=2.0):
context = gui.GLib.MainContext.default()
deadline = time.time() + timeout
while time.time() < deadline:
while context.pending():
context.iteration(False)
if predicate():
return
time.sleep(0.01)
while context.pending():
context.iteration(False)
assert predicate()
class FakeEntry:
def __init__(self, text=""):
self.text = text
def get_text(self):
return self.text
def set_text(self, value):
self.text = value
class FakeToggle:
def __init__(self, active=False):
self.active = active
def get_active(self):
return self.active
def set_active(self, value):
self.active = bool(value)
class FakeCombo:
def __init__(self, active_id=None):
self.active_id = active_id
def get_active_id(self):
return self.active_id
def set_active_id(self, value):
self.active_id = value
def set_active(self, index):
self.active_id = "" if index == 0 else self.active_id
class FakeSpin:
def __init__(self, value=0):
self.value = value
def get_value(self):
return self.value
def set_value(self, value):
self.value = value
class FakeButton:
def __init__(self):
self.sensitive = None
def set_sensitive(self, value):
self.sensitive = bool(value)
class FakeLabel:
def __init__(self, text=""):
self.text = text
def set_text(self, value):
self.text = value
def get_text(self):
return self.text
class FakeProgressBar:
def __init__(self):
self.fraction = 0.0
self.text = ""
self.pulses = 0
def set_fraction(self, value):
self.fraction = value
def set_text(self, value):
self.text = value
def pulse(self):
self.pulses += 1
class FakeFileChooser:
def __init__(self, filename=None):
self.filename = filename
def get_filename(self):
return self.filename
def set_filename(self, filename):
self.filename = filename
class FakeWindow:
def __init__(self, width=640, height=480):
self.size = (width, height)
def get_size(self):
return self.size
def resize(self, width, height):
self.size = (width, height)
class FakePane:
def __init__(self, position=0):
self.position = position
def get_position(self):
return self.position
def set_position(self, value):
self.position = value
class FakeListStore(list):
def clear(self):
del self[:]
def append(self, row):
super().append(row)
def get_value(self, tree_iter, index):
return self[tree_iter][index]
class FakeSelection:
def __init__(self, model):
self.model = model
self.selected = None
def get_selected(self):
return self.model, self.selected
def select_path(self, path):
self.selected = path
class FakeReleaseView:
def __init__(self, store):
self.selection = FakeSelection(store)
def get_selection(self):
return self.selection
class FakeRunner:
def __init__(self):
self.cancelled = False
def run(self, task, verbose=False):
self.task = task
def cancel(self):
self.cancelled = True
class FakeTrack:
def __init__(self, audio=True):
self.audio = audio
self.isrc = None
self.pre_emphasis = False
self.indexes = {
1: SimpleNamespace(relative=1),
}
def getPregap(self):
return 0
class FakeITable:
def __init__(self):
self.tracks = [FakeTrack(audio=True)]
class FakeResultTable:
def getTrackStart(self, _track_number):
return 0
def getTrackEnd(self, _track_number):
return 9
def getTrackLength(self, _track_number):
return 10
def setFile(self, *_args):
return None
class FakeRipResult:
def __init__(self):
self.tracks = []
self.table = FakeResultTable()
self.isCdr = False
def getTrackResult(self, track_number):
for track in self.tracks:
if track.number == track_number:
return track
return None
def _make_metadata(title, duration, mbid):
return SimpleNamespace(
artist="Artist",
releaseTitle=title,
title=title,
release="2024-01-01",
releaseType="Album",
countries=["US"],
duration=duration,
tracks=[],
discNumber=1,
discTotal=1,
mbid=mbid,
url="https://example.invalid/%s" % mbid,
barcode=None,
catalogNumbers=[],
)
def _make_ui_app(tmp_path):
app = SimpleNamespace()
app.window = FakeWindow(111, 222)
app.main_pane = FakePane(77)
app.output_button = FakeFileChooser(str(tmp_path))
app.working_directory_entry = FakeEntry("")
app.country_entry = FakeEntry("")
app.release_id_entry = FakeEntry("")
app.unknown_check = FakeToggle(True)
app.cdr_check = FakeToggle(False)
app.keep_going_check = FakeToggle(True)
app.overread_check = FakeToggle(False)
app.cover_art_combo = FakeCombo("")
app.max_retries_spin = FakeSpin(5)
app.offset_spin = FakeSpin(0)
app.logger_combo = FakeCombo("whipper")
app.track_template_entry = FakeEntry("%t")
app.disc_template_entry = FakeEntry("%d")
app.read_button = FakeButton()
app.refresh_button = FakeButton()
app.rip_button = FakeButton()
app.stop_button = FakeButton()
app.status_label = FakeLabel()
app.progress_label = FakeLabel()
app.overall_bar = FakeProgressBar()
app.track_bar = FakeProgressBar()
app.release_store = FakeListStore()
app.track_store = FakeListStore()
app.release_view = FakeReleaseView(app.release_store)
app.release_details = FakeLabel()
app.info_labels = {key: FakeLabel() for key in [
"device", "disc_status", "cddb", "mbid", "duration", "tracks"
]}
app.scan_runner = None
app.rip_runner = None
app.scan_cancel_requested = False
app.rip_cancel_requested = False
app.current_release = None
app.current_track_number = 0
app.current_track_total = 0
app.pulse_id = 0
app.logs = []
app._append_log = lambda text: app.logs.append(text)
app._clear_log = lambda: app.logs.clear()
app._config_path = lambda: tmp_path / "gui.json"
_bind_methods(
app,
"_collect_gui_settings",
"_save_gui_settings",
"_load_gui_settings",
"_can_rip",
"_set_label",
"_set_running_state",
"_update_release_store",
"_update_track_store",
"_update_release_details",
"_update_rip_task_progress",
"_finish_rip",
"_reset_progress",
"_pulse_progress",
"_resolve_release_metadata",
"_on_release_selected",
"_on_stop_clicked",
)
app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler
app._install_gui_log_handler = MethodType(gui.WhipperGui._install_gui_log_handler, app)
return app
def test_gui_settings_roundtrip(tmp_path, monkeypatch):
app = _make_ui_app(tmp_path)
app.output_button.set_filename(str(tmp_path / "output"))
os.mkdir(app.output_button.get_filename())
app.working_directory_entry.set_text(str(tmp_path))
app.country_entry.set_text("JP")
app.release_id_entry.set_text("release-id")
app.cdr_check.set_active(True)
app.cover_art_combo.set_active_id("embed")
app.max_retries_spin.set_value(7)
app.offset_spin.set_value(123)
app.track_template_entry.set_text("%A/%t")
app.disc_template_entry.set_text("%A/%d")
monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args))
gui.WhipperGui._save_gui_settings(app)
app.output_button.set_filename(None)
app.working_directory_entry.set_text("")
app.country_entry.set_text("")
app.release_id_entry.set_text("")
app.unknown_check.set_active(False)
app.cdr_check.set_active(False)
app.keep_going_check.set_active(False)
app.overread_check.set_active(True)
app.cover_art_combo.set_active_id(None)
app.max_retries_spin.set_value(0)
app.offset_spin.set_value(0)
app.logger_combo.set_active_id(None)
app.track_template_entry.set_text("")
app.disc_template_entry.set_text("")
gui.WhipperGui._load_gui_settings(app)
assert app.output_button.get_filename() == str(tmp_path / "output")
assert app.working_directory_entry.get_text() == str(tmp_path)
assert app.country_entry.get_text() == "JP"
assert app.release_id_entry.get_text() == "release-id"
assert app.unknown_check.get_active() is True
assert app.cdr_check.get_active() is True
assert app.keep_going_check.get_active() is True
assert app.overread_check.get_active() is False
assert app.cover_art_combo.get_active_id() == "embed"
assert app.max_retries_spin.get_value() == 7
assert app.offset_spin.get_value() == 123
assert app.logger_combo.get_active_id() == "whipper"
assert app.track_template_entry.get_text() == "%A/%t"
assert app.disc_template_entry.get_text() == "%A/%d"
assert app.window.get_size() == (111, 222)
assert app.main_pane.get_position() == 77
def test_release_selection_and_progress_updates(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
selection = app.release_view.get_selection()
metadata = _make_metadata("Chosen", 1000, "mbid-1")
metadata.tracks = [SimpleNamespace(artist="Track Artist", title="Track Title", duration=210000)]
app.release_store.append(["Artist", "Chosen", "2024", "Album", "US", metadata])
selection.select_path(0)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 99)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
gui.WhipperGui._on_release_selected(app, selection)
gui.WhipperGui._set_running_state(app, True, "Busy")
assert app.read_button.sensitive is False
assert app.refresh_button.sensitive is False
assert app.stop_button.sensitive is True
gui.WhipperGui._update_rip_task_progress(app, "Encoding", "Track 1", 1, 2, 0.25)
gui.WhipperGui._finish_rip(app, 0, "Done")
assert app.current_release is metadata
assert app.track_store[0][1] == "Track Artist"
assert app.release_details.get_text().startswith("Artist: Artist")
assert app.overall_bar.fraction == 1.0
assert app.track_bar.fraction == 1.0
assert app.progress_label.get_text() == "Rip complete"
def test_main_reports_missing_runtime(monkeypatch, capsys):
monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing"))
monkeypatch.setattr(gui, "gi", None)
monkeypatch.setattr(gui, "cdio", None)
assert gui.main() == 1
assert "whipper-gui requires" in capsys.readouterr().err
def test_read_disc_worker_processes_idle_callbacks(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
calls = []
class FakeReadTOCTask:
def __init__(self, _device, fast_toc=True):
assert fast_toc is True
self.toc = SimpleNamespace(table=SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
duration=lambda: 200000,
getAudioTracks=lambda: 10,
))
runner = FakeRunner()
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.cdrdao, "ReadTOCTask", FakeReadTOCTask)
monkeypatch.setattr(gui.utils, "load_device", lambda device: calls.append(("load", device)))
monkeypatch.setattr(gui.utils, "unmount_device", lambda device: calls.append(("unmount", device)))
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(
gui.mbngs,
"musicbrainz",
lambda *_args, **_kwargs: [
_make_metadata("Far", 260000, "r2"),
_make_metadata("Near", 205000, "r1"),
],
)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
worker = threading.Thread(
target=gui.WhipperGui._read_disc_worker,
args=(app, "/dev/cdrom", "US"),
)
worker.start()
_drain_glib_until(lambda: not worker.is_alive())
worker.join()
assert calls == [("load", "/dev/cdrom"), ("unmount", "/dev/cdrom")]
assert app.scan_data["mbid"] == "mbid-disc"
assert app.scan_data["releases"][0].releaseTitle == "Near"
assert app.info_labels["disc_status"].get_text() == "Ready"
assert "Found 2 matching release(s)" in "".join(app.logs)
def test_rip_disc_worker_success(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
app.current_release = _make_metadata("Selected", 200000, "release-mbid")
app.release_id_entry.set_text("")
app.unknown_check.set_active(False)
app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]}
program_holder = {}
class FakeProgram:
def __init__(self, _conf, record=False):
assert record is False
self.metadata = None
self.outdir = None
self.result = FakeRipResult()
self.cover_art_paths = []
self.write_cue = False
self.write_m3u_called = False
self.write_log_called = False
program_holder["program"] = self
def getFastToc(self, _runner, _device):
return SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
)
def getRipResult(self):
return self.result
def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None):
if track_number is None:
return os.path.join(base, "Artist - Selected", "Artist - Selected")
return os.path.join(base, "Artist - Selected", "track-%02d" % track_number)
def getTable(self, *_args):
return FakeITable()
def getHTOA(self):
return None
def getTagList(self, _track_number, _mbdiscid):
return {}
def getCoverArt(self, dirname, _mbid):
path = os.path.join(dirname, "cover.jpg")
with open(path, "wb") as handle:
handle.write(b"cover")
self.cover_art_paths.append(path)
return path
def verifyImage(self, _runner, _itable):
return None
def writeCue(self, _disc_name):
self.write_cue = True
def write_m3u(self, _disc_name):
self.write_m3u_called = True
def writeLog(self, _disc_name, _logger):
self.write_log_called = True
def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total,
_cover_art_path, _skipped_tracks, _mbdiscid):
track_result = SimpleNamespace(number=track_number, filename="track.flac")
program.result.tracks.append(track_result)
runner = FakeRunner()
monkeypatch.setattr(gui, "Program", FakeProgram)
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace(
getDefeatsCache=lambda *_args: True,
))
monkeypatch.setattr(gui.utils, "load_device", lambda _device: None)
monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None)
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release"))
monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False)
monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0")
monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2")
monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace(
get_hwinfo=lambda: (None, "vendor", "model", "release")
))
monkeypatch.setattr(gui.importlib.util, "find_spec", lambda name: object() if name == "PIL" else None)
monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()})
monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None)
monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args))
app._rip_track = MethodType(fake_rip_track, app)
settings = {
"device": "/dev/cdrom",
"output_directory": str(tmp_path),
"working_directory": None,
"country": None,
"release_id": None,
"unknown": False,
"cdr": False,
"keep_going": True,
"overread": False,
"cover_art": "complete",
"max_retries": 1,
"offset": 0,
"logger": "whipper",
"track_template": "%t",
"disc_template": "%d",
}
gui.WhipperGui._rip_disc_worker(app, settings)
program = program_holder["program"]
assert program.write_cue is True
assert program.write_m3u_called is True
assert program.write_log_called is True
assert program.cover_art_paths
assert app.status_label.get_text() == "Done"
assert "Rip finished successfully" in "".join(app.logs)
def test_rip_disc_worker_cancel_during_verify(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
app.current_release = _make_metadata("Selected", 200000, "release-mbid")
app.unknown_check.set_active(False)
app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]}
verify_started = threading.Event()
class FakeProgram:
def __init__(self, _conf, record=False):
assert record is False
self.metadata = None
self.outdir = None
self.result = FakeRipResult()
def getFastToc(self, _runner, _device):
return SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
)
def getRipResult(self):
return self.result
def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None):
if track_number is None:
return os.path.join(base, "Artist - Selected", "Artist - Selected")
return os.path.join(base, "Artist - Selected", "track-%02d" % track_number)
def getTable(self, *_args):
return FakeITable()
def getHTOA(self):
return None
def verifyImage(self, _runner, _itable):
verify_started.set()
while not app.rip_cancel_requested:
time.sleep(0.01)
raise gui.RipCancelledError("Rip cancelled")
def writeCue(self, _disc_name):
return None
def write_m3u(self, _disc_name):
return None
def writeLog(self, _disc_name, _logger):
return None
def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total,
_cover_art_path, _skipped_tracks, _mbdiscid):
program.result.tracks.append(SimpleNamespace(number=track_number, filename="track.flac"))
runner = FakeRunner()
monkeypatch.setattr(gui, "Program", FakeProgram)
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace(
getDefeatsCache=lambda *_args: True,
))
monkeypatch.setattr(gui.utils, "load_device", lambda _device: None)
monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None)
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release"))
monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False)
monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0")
monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2")
monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace(
get_hwinfo=lambda: (None, "vendor", "model", "release")
))
monkeypatch.setattr(gui.importlib.util, "find_spec", lambda _name: None)
monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()})
monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
app._rip_track = MethodType(fake_rip_track, app)
settings = {
"device": "/dev/cdrom",
"output_directory": str(tmp_path),
"working_directory": None,
"country": None,
"release_id": None,
"unknown": False,
"cdr": False,
"keep_going": True,
"overread": False,
"cover_art": None,
"max_retries": 1,
"offset": 0,
"logger": "whipper",
"track_template": "%t",
"disc_template": "%d",
}
worker = threading.Thread(
target=gui.WhipperGui._rip_disc_worker,
args=(app, settings),
)
worker.start()
assert verify_started.wait(timeout=2.0)
gui.WhipperGui._on_stop_clicked(app, None)
_drain_glib_until(lambda: not worker.is_alive())
worker.join()
assert app.rip_runner is None
assert app.rip_cancel_requested is True
assert runner.cancelled is True
assert app.status_label.get_text() == "Cancelled"

View File

@@ -2,6 +2,7 @@
# vi:si:et:sw=4:sts=4:ts=4
import os
from unittest import mock
from whipper.extern.task import task
@@ -89,3 +90,59 @@ class CacheTestCase(common.TestCase):
t = AnalyzeFileTask(path)
self.runner.run(t)
self.assertTrue(t.defeatsCache)
class ReadTrackAbortTestCase(common.TestCase):
def testAbortTerminatesRunningProcess(self):
popen = mock.Mock()
popen.poll.return_value = None
rip_task = cdparanoia.ReadTrackTask(
'/tmp/track.wav', mock.Mock(), 0, 0, False
)
rip_task._popen = popen
rip_task.abort()
popen.terminate.assert_called_once_with()
def testAbortIgnoresFinishedProcess(self):
popen = mock.Mock()
popen.poll.return_value = 0
rip_task = cdparanoia.ReadTrackTask(
'/tmp/track.wav', mock.Mock(), 0, 0, False
)
rip_task._popen = popen
rip_task.abort()
popen.terminate.assert_not_called()
class ReadVerifyAbortTestCase(common.TestCase):
def testAbortDelegatesToCurrentTaskAbort(self):
current_task = mock.Mock()
current_task.abort = mock.Mock()
verify_task = cdparanoia.ReadVerifyTrackTask.__new__(
cdparanoia.ReadVerifyTrackTask
)
verify_task.tasks = [mock.Mock(), current_task]
verify_task._task = 2
verify_task.abort()
current_task.abort.assert_called_once_with()
def testAbortFallsBackToStop(self):
current_task = mock.Mock()
del current_task.abort
verify_task = cdparanoia.ReadVerifyTrackTask.__new__(
cdparanoia.ReadVerifyTrackTask
)
verify_task.tasks = [current_task]
verify_task._task = 1
verify_task.abort()
current_task.stop.assert_called_once_with()