Add GTK GUI frontend for whipper

This commit is contained in:
2026-04-18 17:24:56 +03:00
parent 71251a0b86
commit 992923bdc4
6 changed files with 871 additions and 12 deletions

View File

@@ -0,0 +1,12 @@
[Desktop Entry]
Type=Application
Version=1.0
Name=Whipper
GenericName=Audio CD Ripper
Comment=Rip audio CDs with accuracy-focused whipper workflows
Exec=whipper-gui
Icon=com.github.whipper_team.Whipper
Terminal=false
Categories=AudioVideo;Audio;Music;GTK;
Keywords=cd;ripper;accuraterip;musicbrainz;flac;
StartupNotify=true

View File

@@ -1,33 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright 2018 Frederik “Freso” S. Olesen --> <component type="desktop-application">
<component type="console-application">
<id>com.github.whipper_team.Whipper</id> <id>com.github.whipper_team.Whipper</id>
<metadata_license>CC0-1.0</metadata_license> <metadata_license>CC0-1.0</metadata_license>
<name>whipper</name>
<project_license>GPL-3.0-or-later</project_license> <project_license>GPL-3.0-or-later</project_license>
<name>Whipper</name>
<developer_name>The Whipper Team</developer_name> <developer_name>The Whipper Team</developer_name>
<summary>A CD-DA ripper prioritising accuracy over speed</summary> <summary>Accurate audio CD ripping with MusicBrainz metadata lookup</summary>
<description> <description>
<p> <p>
whipper is a command-line CD-DA ripper that focuses on making accurate Whipper is an accuracy-focused audio CD ripper. The desktop frontend can
rips over fast ones. inspect the inserted disc, browse matching MusicBrainz releases, and run
secure ripping workflows backed by the existing whipper command-line
engine.
</p> </p>
</description> </description>
<launchable type="desktop-id">com.github.whipper_team.Whipper.desktop</launchable>
<url type="homepage">https://github.com/whipper-team/whipper</url> <url type="homepage">https://github.com/whipper-team/whipper</url>
<url type="bugtracker">https://github.com/whipper-team/whipper/issues</url> <url type="bugtracker">https://github.com/whipper-team/whipper/issues</url>
<url type="help">https://github.com/whipper-team/whipper/blob/master/README.md</url> <url type="help">https://github.com/whipper-team/whipper/blob/master/README.md</url>
<categories> <categories>
<category>AudioVideo</category> <category>AudioVideo</category>
<category>Audio</category> <category>Audio</category>
<category>Music</category> <category>Music</category>
<category>ConsoleOnly</category>
</categories> </categories>
<provides> <provides>
<binary>whipper</binary> <binary>whipper</binary>
<python2>whipper</python2> <binary>whipper-gui</binary>
</provides> </provides>
</component> </component>

View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" width="128" height="128" viewBox="0 0 128 128">
<rect width="128" height="128" rx="18" fill="#1f1f1f"/>
<rect x="10" y="10" width="108" height="108" rx="14" fill="#303030" stroke="#505050" stroke-width="4"/>
<circle cx="64" cy="64" r="38" fill="#404040" stroke="#8fb3ff" stroke-width="6"/>
<circle cx="64" cy="64" r="11" fill="#151515" stroke="#7f7f7f" stroke-width="4"/>
<path d="M84 30a40 40 0 0 1 14 10" fill="none" stroke="#d8e6ff" stroke-width="4" stroke-linecap="round"/>
<path d="M31 78h26" stroke="#8fb3ff" stroke-width="8" stroke-linecap="square"/>
<path d="M31 92h18" stroke="#d8e6ff" stroke-width="8" stroke-linecap="square"/>
</svg>

After

Width:  |  Height:  |  Size: 738 B

View File

@@ -38,6 +38,7 @@ docs = ["docutils"]
[project.scripts] [project.scripts]
whipper = "whipper.command.main:main" whipper = "whipper.command.main:main"
whipper-gui = "whipper.gui:main"
# This is necessary, since whipper uses a flat-layout, but has as a 'src' # This is necessary, since whipper uses a flat-layout, but has as a 'src'
# directory and setuptools gets confused otherwise. # directory and setuptools gets confused otherwise.
@@ -47,5 +48,10 @@ namespaces = false
[tool.setuptools.package-data] [tool.setuptools.package-data]
"whipper" = ["test/**"] "whipper" = ["test/**"]
[tool.setuptools.data-files]
"share/applications" = ["data/com.github.whipper_team.Whipper.desktop"]
"share/metainfo" = ["data/com.github.whipper_team.Whipper.metainfo.xml"]
"share/icons/hicolor/scalable/apps" = ["data/com.github.whipper_team.Whipper.svg"]
[tool.setuptools_scm] [tool.setuptools_scm]
# Empty, but needed to enable SCM # Empty, but needed to enable SCM

822
whipper/gui.py Normal file
View File

@@ -0,0 +1,822 @@
import os
import re
import subprocess
import sys
import threading
import json
from pathlib import Path
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk
from whipper.common import common, config, drive, mbngs, task as whipper_task
from whipper.common.program import Program
from whipper.program import cdrdao, utils
RIP_START_RE = re.compile(r"ripping track (\d+) of (\d+)(?: \(try (\d+)\))?: (.+)")
RIP_DONE_RE = re.compile(r"CRCs match for track (\d+)")
RIP_SKIP_RE = re.compile(r"giving up on track (\d+) after (\d+) times")
def _format_duration_ms(duration_ms):
if duration_ms is None:
return ""
return common.formatTime(duration_ms / 1000.0)
def _release_country(metadata):
return ", ".join(metadata.countries) if metadata.countries else ""
def _release_year(metadata):
return metadata.release[:4] if metadata.release else ""
class WhipperGui(Gtk.Application):
def __init__(self):
super().__init__(application_id="com.github.whipper_team.WhipperGui")
self.window = None
self.process = None
self.worker = None
self.pulse_id = 0
self.scan_data = None
self.scan_runner = None
self.scan_cancel_requested = False
self.current_release = None
self.current_track_number = 0
self.current_track_total = 0
self.device_combo = None
self.output_button = None
self.country_entry = None
self.release_id_entry = None
self.refresh_button = None
self.read_button = None
self.rip_button = None
self.stop_button = None
self.unknown_check = None
self.cdr_check = None
self.keep_going_check = None
self.overread_check = None
self.cover_art_combo = None
self.max_retries_spin = None
self.offset_spin = None
self.release_store = None
self.release_view = None
self.track_store = None
self.track_view = None
self.log_buffer = None
self.release_details = None
self.info_labels = {}
self.status_label = None
self.progress_label = None
self.overall_bar = None
self.track_bar = None
def do_activate(self):
if self.window is None:
self.window = self._build_window()
self.window.present()
def do_shutdown(self):
self._save_gui_settings()
Gtk.Application.do_shutdown(self)
def _build_window(self):
window = Gtk.ApplicationWindow(application=self)
window.set_title("Whipper")
window.set_icon_name("com.github.whipper_team.Whipper")
window.set_default_size(1120, 760)
window.set_border_width(12)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
window.add(root)
root.pack_start(self._build_controls(), False, False, 0)
root.pack_start(self._build_actions(), False, False, 0)
root.pack_start(self._build_progress(), False, False, 0)
root.pack_start(self._build_main_content(), True, True, 0)
root.pack_start(self._build_log(), True, True, 0)
self._refresh_devices()
self._load_gui_settings()
return window
def _build_controls(self):
grid = Gtk.Grid(column_spacing=10, row_spacing=10)
grid.attach(Gtk.Label(label="Drive", xalign=0), 0, 0, 1, 1)
self.device_combo = Gtk.ComboBoxText()
self.device_combo.connect("changed", self._on_device_changed)
grid.attach(self.device_combo, 1, 0, 2, 1)
self.refresh_button = Gtk.Button(label="Refresh Drives")
self.refresh_button.connect("clicked", self._on_refresh_clicked)
self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives")
grid.attach(self.refresh_button, 3, 0, 1, 1)
grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1)
self.output_button = Gtk.FileChooserButton(
title="Select output folder",
action=Gtk.FileChooserAction.SELECT_FOLDER,
)
self.output_button.set_filename(os.path.expanduser("~"))
grid.attach(self.output_button, 1, 1, 3, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 2, 1, 1)
self.country_entry = Gtk.Entry()
self.country_entry.set_placeholder_text("Optional MusicBrainz country filter")
grid.attach(self.country_entry, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 2, 1, 1)
self.release_id_entry = Gtk.Entry()
self.release_id_entry.set_placeholder_text("Optional release override")
grid.attach(self.release_id_entry, 3, 2, 1, 1)
return grid
def _build_actions(self):
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
self.read_button = Gtk.Button(label="Read Disc")
self.read_button.connect("clicked", self._on_read_clicked)
self.read_button.set_tooltip_text("Read TOC and fetch matching MusicBrainz releases")
box.pack_start(self.read_button, False, False, 0)
self.rip_button = Gtk.Button(label="Rip Selected Release")
self.rip_button.connect("clicked", self._on_rip_clicked)
self.rip_button.set_tooltip_text("Rip the current disc using the selected release metadata")
self.rip_button.set_sensitive(False)
box.pack_start(self.rip_button, False, False, 0)
self.stop_button = Gtk.Button(label="Stop")
self.stop_button.connect("clicked", self._on_stop_clicked)
self.stop_button.set_tooltip_text("Cancel the current scan or rip")
self.stop_button.set_sensitive(False)
box.pack_start(self.stop_button, False, False, 0)
self.status_label = Gtk.Label(label="Idle", xalign=0)
box.pack_start(self.status_label, True, True, 0)
return box
def _build_progress(self):
frame = Gtk.Frame(label="Progress")
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=10)
frame.add(box)
self.progress_label = Gtk.Label(label="No task running", xalign=0)
box.pack_start(self.progress_label, False, False, 0)
self.overall_bar = Gtk.ProgressBar(show_text=True)
self.overall_bar.set_text("Overall")
box.pack_start(self.overall_bar, False, False, 0)
self.track_bar = Gtk.ProgressBar(show_text=True)
self.track_bar.set_text("Current track")
box.pack_start(self.track_bar, False, False, 0)
return frame
def _build_main_content(self):
pane = Gtk.Paned.new(Gtk.Orientation.HORIZONTAL)
pane.set_wide_handle(True)
pane.pack1(self._build_left_panel(), resize=True, shrink=False)
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
return pane
def _build_left_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
info_frame = Gtk.Frame(label="Disc")
info_grid = Gtk.Grid(column_spacing=10, row_spacing=6, margin=10)
info_frame.add(info_grid)
rows = [
("Device", "device"),
("Status", "disc_status"),
("CDDB", "cddb"),
("Disc ID", "mbid"),
("Duration", "duration"),
("Tracks", "tracks"),
]
for row, (label_text, key) in enumerate(rows):
info_grid.attach(Gtk.Label(label=label_text, xalign=0), 0, row, 1, 1)
value = Gtk.Label(label="", xalign=0, selectable=True)
info_grid.attach(value, 1, row, 1, 1)
self.info_labels[key] = value
box.pack_start(info_frame, False, False, 0)
release_frame = Gtk.Frame(label="Matching Releases")
release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8)
release_frame.add(release_box)
self.release_store = Gtk.ListStore(str, str, str, str, str, object)
self.release_view = Gtk.TreeView(model=self.release_store)
self.release_view.get_selection().connect("changed", self._on_release_selected)
for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
self.release_view.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(True)
scroll.add(self.release_view)
release_box.pack_start(scroll, True, True, 0)
details_frame = Gtk.Frame(label="Selected Release")
details_scroll = Gtk.ScrolledWindow()
details_scroll.set_hexpand(True)
details_scroll.set_vexpand(False)
details_scroll.set_min_content_height(120)
details_frame.add(details_scroll)
self.release_details = Gtk.Label(label="Select a release to inspect it.", xalign=0, yalign=0)
self.release_details.set_line_wrap(True)
self.release_details.set_selectable(True)
details_scroll.add(self.release_details)
release_box.pack_start(details_frame, False, False, 0)
box.pack_start(release_frame, True, True, 0)
return box
def _build_right_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
tracks_frame = Gtk.Frame(label="Tracks")
tracks_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8)
tracks_frame.add(tracks_box)
self.track_store = Gtk.ListStore(str, str, str, str)
self.track_view = Gtk.TreeView(model=self.track_store)
for index, title in enumerate(["#", "Artist", "Title", "Length"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
self.track_view.append_column(column)
track_scroll = Gtk.ScrolledWindow()
track_scroll.set_hexpand(True)
track_scroll.set_vexpand(True)
track_scroll.add(self.track_view)
tracks_box.pack_start(track_scroll, True, True, 0)
box.pack_start(tracks_frame, True, True, 0)
box.pack_start(self._build_rip_options(), False, False, 0)
return box
def _build_rip_options(self):
frame = Gtk.Frame(label="Rip Options")
grid = Gtk.Grid(column_spacing=10, row_spacing=8, margin=10)
frame.add(grid)
self.unknown_check = Gtk.CheckButton(label="Allow ripping without metadata")
self.unknown_check.set_active(True)
self.unknown_check.connect("toggled", self._on_unknown_toggled)
grid.attach(self.unknown_check, 0, 0, 2, 1)
self.cdr_check = Gtk.CheckButton(label="Allow CD-R")
grid.attach(self.cdr_check, 2, 0, 1, 1)
self.keep_going_check = Gtk.CheckButton(label="Keep going on failed tracks")
self.keep_going_check.set_active(True)
grid.attach(self.keep_going_check, 0, 1, 2, 1)
self.overread_check = Gtk.CheckButton(label="Force overread")
grid.attach(self.overread_check, 2, 1, 1, 1)
grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1)
self.cover_art_combo = Gtk.ComboBoxText()
self.cover_art_combo.append("", "Disabled")
self.cover_art_combo.append("file", "Save file")
self.cover_art_combo.append("embed", "Embed only")
self.cover_art_combo.append("complete", "File + embed")
self.cover_art_combo.set_active(0)
grid.attach(self.cover_art_combo, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Max Retries", xalign=0), 2, 2, 1, 1)
self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1)
self.max_retries_spin.set_value(5)
grid.attach(self.max_retries_spin, 3, 2, 1, 1)
grid.attach(Gtk.Label(label="Offset", xalign=0), 0, 3, 1, 1)
self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1)
self.offset_spin.set_value(0)
grid.attach(self.offset_spin, 1, 3, 1, 1)
note = Gtk.Label(
label="If your drive offset is configured in whipper, leave Offset at 0 and use the config value.",
xalign=0,
)
note.set_line_wrap(True)
grid.attach(note, 2, 3, 2, 1)
return frame
def _build_log(self):
frame = Gtk.Frame(label="Log")
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(True)
frame.add(scroll)
text_view = Gtk.TextView()
text_view.set_editable(False)
text_view.set_cursor_visible(False)
text_view.set_monospace(True)
self.log_buffer = text_view.get_buffer()
scroll.add(text_view)
return frame
def _append_log(self, text):
end_iter = self.log_buffer.get_end_iter()
self.log_buffer.insert(end_iter, text)
def _clear_log(self):
self.log_buffer.set_text("")
def _config_path(self):
xdg_config = os.environ.get("XDG_CONFIG_HOME")
base = Path(xdg_config) if xdg_config else Path.home() / ".config"
return base / "whipper" / "gui.json"
def _collect_gui_settings(self):
return {
"output_directory": self.output_button.get_filename(),
"country": self.country_entry.get_text(),
"release_id": self.release_id_entry.get_text(),
"unknown": self.unknown_check.get_active(),
"cdr": self.cdr_check.get_active(),
"keep_going": self.keep_going_check.get_active(),
"overread": self.overread_check.get_active(),
"cover_art": self.cover_art_combo.get_active_id() or "",
"max_retries": int(self.max_retries_spin.get_value()),
}
def _save_gui_settings(self):
if self.output_button is None:
return
path = self._config_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(self._collect_gui_settings(), indent=2), encoding="utf-8")
def _load_gui_settings(self):
path = self._config_path()
if not path.exists():
return
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return
output_directory = data.get("output_directory")
if output_directory and os.path.isdir(output_directory):
self.output_button.set_filename(output_directory)
self.country_entry.set_text(data.get("country", ""))
self.release_id_entry.set_text(data.get("release_id", ""))
self.unknown_check.set_active(bool(data.get("unknown", True)))
self.cdr_check.set_active(bool(data.get("cdr", False)))
self.keep_going_check.set_active(bool(data.get("keep_going", True)))
self.overread_check.set_active(bool(data.get("overread", False)))
cover_art = data.get("cover_art", "")
if cover_art:
self.cover_art_combo.set_active_id(cover_art)
else:
self.cover_art_combo.set_active(0)
retries = data.get("max_retries", 5)
if isinstance(retries, int):
self.max_retries_spin.set_value(retries)
def _set_label(self, key, value):
self.info_labels[key].set_text(value or "")
def _reset_progress(self):
self.current_track_number = 0
self.current_track_total = 0
self.overall_bar.set_fraction(0.0)
self.overall_bar.set_text("Overall")
self.track_bar.set_fraction(0.0)
self.track_bar.set_text("Current track")
self.progress_label.set_text("No task running")
def _set_running_state(self, running, status):
self.read_button.set_sensitive(not running)
self.refresh_button.set_sensitive(not running)
self.rip_button.set_sensitive((not running) and (self.current_release is not None or self.unknown_check.get_active()))
self.stop_button.set_sensitive(running)
self.status_label.set_text(status)
if running:
if self.pulse_id == 0:
self.pulse_id = GLib.timeout_add(150, self._pulse_track_bar)
elif self.pulse_id:
GLib.source_remove(self.pulse_id)
self.pulse_id = 0
def _pulse_track_bar(self):
if self.process is None:
return False
self.track_bar.pulse()
return True
def _refresh_devices(self):
current = self.device_combo.get_active_text()
self.device_combo.remove_all()
devices = drive.getAllDevicePaths()
for path in devices:
self.device_combo.append_text(path)
if not devices:
self.device_combo.append_text("No drives detected")
self.device_combo.set_active(0)
self.read_button.set_sensitive(False)
self.rip_button.set_sensitive(False)
else:
self.read_button.set_sensitive(True)
if current in devices:
self.device_combo.set_active(devices.index(current))
else:
self.device_combo.set_active(0)
self._apply_configured_offset()
def _selected_device(self):
device = self.device_combo.get_active_text()
if not device or device == "No drives detected":
return None
return device
def _update_release_store(self, releases):
self.release_store.clear()
for metadata in releases:
self.release_store.append([
metadata.artist or "",
metadata.releaseTitle or metadata.title or "",
_release_year(metadata),
metadata.releaseType or "",
_release_country(metadata),
metadata,
])
if releases:
self.release_view.get_selection().select_path(0)
def _update_track_store(self, metadata):
self.track_store.clear()
if metadata is None:
return
for index, track in enumerate(metadata.tracks, start=1):
self.track_store.append([
str(index),
track.artist or "",
track.title or "",
_format_duration_ms(track.duration),
])
def _update_release_details(self, metadata):
if metadata is None:
self.release_details.set_text("Select a release to inspect it.")
return
lines = [
"Artist: %s" % (metadata.artist or "Unknown"),
"Title: %s" % (metadata.releaseTitle or metadata.title or "Unknown"),
"Year: %s" % (_release_year(metadata) or "Unknown"),
"Type: %s" % (metadata.releaseType or "Unknown"),
"Country: %s" % (_release_country(metadata) or "Unknown"),
"Disc: %s/%s" % (
metadata.discNumber if metadata.discNumber is not None else "?",
metadata.discTotal if metadata.discTotal is not None else "?",
),
"Tracks: %d" % len(metadata.tracks),
"Duration: %s" % _format_duration_ms(metadata.duration),
"Release MBID: %s" % (metadata.mbid or "Unknown"),
"URL: %s" % (metadata.url or "Unknown"),
]
if metadata.barcode:
lines.append("Barcode: %s" % metadata.barcode)
if metadata.catalogNumbers:
lines.append("Catalog: %s" % ", ".join(metadata.catalogNumbers))
self.release_details.set_text("\n".join(lines))
def _apply_configured_offset(self):
device = self._selected_device()
if not device:
return
info = drive.getDeviceInfo(device)
if not info:
return
try:
offset = config.Config().getReadOffset(*info)
except KeyError:
return
if offset is not None:
self.offset_spin.set_value(int(offset))
def _selected_release(self):
selection = self.release_view.get_selection()
model, tree_iter = selection.get_selected()
if tree_iter is None:
return None
return model.get_value(tree_iter, 5)
def _read_disc_worker(self, device, country):
try:
self.scan_cancel_requested = False
GLib.idle_add(self._clear_log)
GLib.idle_add(self._reset_progress)
GLib.idle_add(self._append_log, "Reading disc from %s\n\n" % device)
GLib.idle_add(self._set_running_state, True, "Reading disc")
GLib.idle_add(self.progress_label.set_text, "Scanning disc and looking up MusicBrainz")
conf = config.Config()
runner = CancellableSyncRunner()
self.scan_runner = runner
prog = Program(conf, record=False)
utils.load_device(device)
utils.unmount_device(device)
if drive.get_cdrom_drive_status(device) == 1:
raise OSError("No CD detected, please insert one and retry")
toc_task = cdrdao.ReadTOCTask(device, fast_toc=True)
runner.run(toc_task, verbose=False)
ittoc = toc_task.toc.table
cddb = ittoc.getCDDBDiscId()
mbid = ittoc.getMusicBrainzDiscId()
duration = common.formatTime(ittoc.duration() / 1000.0)
track_count = str(ittoc.getAudioTracks())
if self.scan_cancel_requested:
raise RuntimeError("Disc scan cancelled")
try:
releases = mbngs.musicbrainz(mbid, country=country or None, record=False)
except mbngs.NotFoundException:
releases = []
if self.scan_cancel_requested:
raise RuntimeError("Disc scan cancelled")
releases = sorted(releases, key=lambda md: abs(md.duration - ittoc.duration()))
def apply_results():
self.scan_data = {
"device": device,
"cddb": cddb,
"mbid": mbid,
"duration": duration,
"tracks": track_count,
"releases": releases,
}
self._set_label("device", device)
self._set_label("disc_status", "Ready")
self._set_label("cddb", cddb)
self._set_label("mbid", mbid)
self._set_label("duration", duration)
self._set_label("tracks", track_count)
self._append_log("CDDB disc id: %s\n" % cddb)
self._append_log("MusicBrainz disc id: %s\n" % mbid)
self._append_log("Disc duration: %s, %s audio tracks\n" % (duration, track_count))
self._append_log("\nFound %d matching release(s)\n" % len(releases))
self._update_release_store(releases)
self._update_release_details(None)
self._set_running_state(False, "Disc ready")
self.progress_label.set_text("Disc metadata loaded")
if not releases:
self._append_log("No MusicBrainz matches found\n")
return False
GLib.idle_add(apply_results)
except Exception as exc:
def apply_error():
self.scan_data = None
self.current_release = None
self.release_store.clear()
self.track_store.clear()
self._update_release_details(None)
self._set_label("device", device)
if self.scan_cancel_requested:
self._set_label("disc_status", "Cancelled")
self._append_log("Disc scan cancelled\n")
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Disc scan cancelled")
else:
self._set_label("disc_status", "Error")
self._append_log("%s\n" % exc)
self._set_running_state(False, "Failed")
self.progress_label.set_text("Disc scan failed")
return False
GLib.idle_add(apply_error)
finally:
self.scan_runner = None
def _build_cd_args(self):
args = [sys.executable, "-m", "whipper", "cd"]
device = self._selected_device()
if device:
args.extend(["-d", device])
country = self.country_entry.get_text().strip()
if country:
args.extend(["-c", country])
release_override = self.release_id_entry.get_text().strip()
if release_override:
args.extend(["-R", release_override])
elif self.current_release is not None:
args.extend(["-R", self.current_release.mbid])
return args
def _run_rip_command(self):
args = self._build_cd_args()
args.append("rip")
output_dir = self.output_button.get_filename()
if output_dir:
args.extend(["-O", output_dir])
cover_art = self.cover_art_combo.get_active_id()
if cover_art:
args.extend(["-C", cover_art])
retries = int(self.max_retries_spin.get_value())
args.extend(["-r", str(retries)])
offset = int(self.offset_spin.get_value())
if offset:
args.extend(["-o", str(offset)])
if self.unknown_check.get_active():
args.append("-U")
if self.cdr_check.get_active():
args.append("--cdr")
if self.keep_going_check.get_active():
args.append("-k")
if self.overread_check.get_active():
args.append("-x")
self._clear_log()
self._reset_progress()
self._append_log("$ %s\n\n" % " ".join(args))
self._set_running_state(True, "Ripping disc")
self.progress_label.set_text("Preparing rip")
def worker():
try:
self.process = subprocess.Popen(
args,
cwd=os.getcwd(),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
assert self.process.stdout is not None
for line in self.process.stdout:
GLib.idle_add(self._handle_rip_output_line, line)
returncode = self.process.wait()
GLib.idle_add(self._append_log, "\nProcess finished with exit code %d\n" % returncode)
GLib.idle_add(self._finish_rip, returncode)
except Exception as exc:
GLib.idle_add(self._append_log, "\n%s\n" % exc)
GLib.idle_add(self._finish_rip, 1)
finally:
self.process = None
self.worker = threading.Thread(target=worker, daemon=True)
self.worker.start()
def _handle_rip_output_line(self, line):
self._append_log(line)
match = RIP_START_RE.search(line)
if match:
self.current_track_number = int(match.group(1))
self.current_track_total = int(match.group(2))
track_name = match.group(4)
overall_fraction = (self.current_track_number - 1) / max(self.current_track_total, 1)
self.overall_bar.set_fraction(overall_fraction)
self.overall_bar.set_text(
"Track %d/%d" % (self.current_track_number, self.current_track_total)
)
self.track_bar.set_fraction(0.0)
self.track_bar.set_text(track_name)
self.progress_label.set_text("Ripping %s" % track_name)
return False
match = RIP_DONE_RE.search(line)
if match:
finished = int(match.group(1))
if self.current_track_total:
self.overall_bar.set_fraction(finished / self.current_track_total)
self.overall_bar.set_text("Track %d/%d" % (finished, self.current_track_total))
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("Track %d done" % finished)
self.progress_label.set_text("Track %d verified" % finished)
return False
match = RIP_SKIP_RE.search(line)
if match:
skipped = int(match.group(1))
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("Track %d skipped" % skipped)
self.progress_label.set_text("Track %d failed" % skipped)
return False
return False
def _finish_rip(self, returncode):
if returncode == 0:
self.overall_bar.set_fraction(1.0)
self.overall_bar.set_text("Complete")
self.track_bar.set_fraction(1.0)
self.progress_label.set_text("Rip complete")
self._set_running_state(False, "Done")
else:
self.progress_label.set_text("Rip failed")
self._set_running_state(False, "Failed")
return False
def _on_release_selected(self, selection):
model, tree_iter = selection.get_selected()
if tree_iter is None:
self.current_release = None
self._update_track_store(None)
self._update_release_details(None)
else:
self.current_release = model.get_value(tree_iter, 5)
self._update_track_store(self.current_release)
self._update_release_details(self.current_release)
self.rip_button.set_sensitive(self.current_release is not None or self.unknown_check.get_active())
def _on_refresh_clicked(self, _button):
self._refresh_devices()
def _on_unknown_toggled(self, _button):
self._save_gui_settings()
self.rip_button.set_sensitive(
(self.process is None) and
(self.current_release is not None or self.unknown_check.get_active())
)
def _on_device_changed(self, _combo):
self._apply_configured_offset()
def _on_read_clicked(self, _button):
device = self._selected_device()
if not device:
return
country = self.country_entry.get_text().strip()
self.worker = threading.Thread(
target=self._read_disc_worker,
args=(device, country),
daemon=True,
)
self.worker.start()
def _on_rip_clicked(self, _button):
if self.current_release is None and not self.unknown_check.get_active():
self._append_log("Select a release or enable ripping without metadata.\n")
self.status_label.set_text("Missing release")
return
self._run_rip_command()
def _on_stop_clicked(self, _button):
if self.process is not None:
self.process.terminate()
self._append_log("\nStopping process...\n")
elif self.scan_runner is not None:
self.scan_cancel_requested = True
self._append_log("\nStopping scan...\n")
self.scan_runner.cancel()
class CancellableSyncRunner(whipper_task.SyncRunner):
def cancel(self):
loop = getattr(self, "_loop", None)
current_task = getattr(self, "_task", None)
if loop is None or current_task is None:
return
def _cancel():
try:
abort = getattr(current_task, "abort", None)
if callable(abort):
abort()
else:
current_task.stop()
except Exception as exc:
current_task.setException(exc)
self.stopped(current_task)
loop.call_soon_threadsafe(_cancel)
def main():
app = WhipperGui()
return app.run(sys.argv)

View File

@@ -3,6 +3,7 @@ import re
import shutil import shutil
import tempfile import tempfile
import subprocess import subprocess
import signal
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from whipper.common.common import truncate_filename from whipper.common.common import truncate_filename
@@ -86,6 +87,7 @@ class ReadTOCTask(task.Task):
self.toc_path = toc_path self.toc_path = toc_path
self._buffer = "" # accumulate characters self._buffer = "" # accumulate characters
self._parser = ProgressParser() self._parser = ProgressParser()
self._aborted = False
self.fd, self.tocfile = tempfile.mkstemp( self.fd, self.tocfile = tempfile.mkstemp(
suffix='.cdrdao.read-toc.whipper.task') suffix='.cdrdao.read-toc.whipper.task')
@@ -148,6 +150,11 @@ class ReadTOCTask(task.Task):
self._done() self._done()
def _done(self): def _done(self):
if self._aborted:
if os.path.exists(self.tocfile):
os.unlink(self.tocfile)
self.stop()
return
self.setProgress(1.0) self.setProgress(1.0)
self.toc = TocFile(self.tocfile) self.toc = TocFile(self.tocfile)
self.toc.parse() self.toc.parse()
@@ -167,6 +174,11 @@ class ReadTOCTask(task.Task):
self.stop() self.stop()
return return
def abort(self):
self._aborted = True
if getattr(self, "_popen", None) is not None and self._popen.poll() is None:
os.kill(self._popen.pid, signal.SIGTERM)
def DetectCdr(device): def DetectCdr(device):
"""Whether cdrdao detects a CD-R for ``device``.""" """Whether cdrdao detects a CD-R for ``device``."""