5 Commits

Author SHA1 Message Date
f78a130926 Add drive analysis and offset tools to GUI 2026-04-18 23:53:27 +03:00
5baadd9dbb Fix window icon and remove duplicate log 2026-04-18 23:39:20 +03:00
997bfef893 Reduce GUI vertical footprint 2026-04-18 23:33:42 +03:00
290d650f80 Respect system GTK theme 2026-04-18 23:27:22 +03:00
05838116d0 Make GUI more EAC-like 2026-04-18 23:22:04 +03:00
2 changed files with 612 additions and 109 deletions

View File

@@ -6,6 +6,7 @@ import io
import logging
import contextlib
import importlib.util
import tempfile
from pathlib import Path
_GUI_IMPORT_ERROR = None
@@ -30,12 +31,13 @@ except (ImportError, ValueError) as exc:
from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task
from whipper.common.program import Program
from whipper.command import cd as cd_command
from whipper.program import cdrdao, cdparanoia, utils
from whipper.command import cd as cd_command, offset as offset_command
from whipper.program import arc, cdrdao, cdparanoia, utils
from whipper.result import result
logger = logging.getLogger(__name__)
APP_ID = "com.github.whipper_team.Whipper"
class RipCancelledError(Exception):
@@ -62,6 +64,30 @@ def require_gui_runtime():
raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR
def _iter_icon_candidates():
local_icon = Path(__file__).resolve().parent.parent / "data" / ("%s.svg" % APP_ID)
yield local_icon
data_dirs = []
xdg_data_home = os.environ.get("XDG_DATA_HOME")
if xdg_data_home:
data_dirs.append(Path(xdg_data_home))
data_dirs.extend(Path(path) for path in os.environ.get("XDG_DATA_DIRS", "/usr/local/share:/usr/share").split(":") if path)
for base in data_dirs:
yield base / "icons" / "hicolor" / "scalable" / "apps" / ("%s.svg" % APP_ID)
yield base / "icons" / "hicolor" / "256x256" / "apps" / ("%s.png" % APP_ID)
yield base / "pixmaps" / ("%s.png" % APP_ID)
yield base / "pixmaps" / ("%s.svg" % APP_ID)
def _resolve_icon_path():
for candidate in _iter_icon_candidates():
if candidate.is_file():
return str(candidate)
return None
def _format_duration_ms(duration_ms):
if duration_ms is None:
return ""
@@ -86,7 +112,13 @@ def _release_duration_distance(metadata, duration_ms):
class WhipperGui(Gtk.Application if Gtk is not None else object):
def __init__(self):
require_gui_runtime()
super().__init__(application_id="com.github.whipper_team.WhipperGui")
GLib.set_prgname(APP_ID)
super().__init__(application_id=APP_ID)
GLib.set_application_name("Whipper")
Gtk.Window.set_default_icon_name(APP_ID)
icon_path = _resolve_icon_path()
if icon_path:
Gtk.Window.set_default_icon_from_file(icon_path)
self.window = None
self.main_pane = None
@@ -96,8 +128,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.scan_data = None
self.scan_runner = None
self.rip_runner = None
self.drive_runner = None
self.scan_cancel_requested = False
self.rip_cancel_requested = False
self.drive_cancel_requested = False
self.current_release = None
self.current_track_number = 0
self.current_track_total = 0
@@ -108,6 +142,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.country_entry = None
self.release_id_entry = None
self.refresh_button = None
self.analyze_button = None
self.find_offset_button = None
self.read_button = None
self.rip_button = None
self.stop_button = None
@@ -122,6 +158,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.logger_combo = None
self.track_template_entry = None
self.disc_template_entry = None
self.compact_mode_label = None
self.release_store = None
self.release_view = None
@@ -129,6 +166,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.track_view = None
self.log_buffer = None
self.release_details = None
self.track_columns = None
self.info_labels = {}
self.status_label = None
@@ -148,36 +186,61 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
def _build_window(self):
window = Gtk.ApplicationWindow(application=self)
window.set_title("Whipper")
window.set_icon_name("com.github.whipper_team.Whipper")
window.set_default_size(1120, 760)
window.set_border_width(12)
window.set_icon_name(APP_ID)
icon_path = _resolve_icon_path()
if icon_path:
window.set_icon_from_file(icon_path)
window.set_wmclass(APP_ID, APP_ID)
window.set_default_size(1380, 820)
window.set_border_width(6)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
window.add(root)
root.pack_start(self._build_controls(), False, False, 0)
root.pack_start(self._build_actions(), False, False, 0)
root.pack_start(self._build_transport_strip(), False, False, 0)
root.pack_start(self._build_progress(), False, False, 0)
root.pack_start(self._build_main_content(), True, True, 0)
root.pack_start(self._build_log(), True, True, 0)
self._refresh_devices()
self._load_gui_settings()
window.show_all()
return window
def _build_transport_strip(self):
frame = Gtk.Frame(label="Extraction Control Center")
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
frame.add(box)
box.pack_start(self._build_controls(), False, False, 0)
box.pack_start(self._build_actions(), False, False, 0)
return frame
def _build_controls(self):
grid = Gtk.Grid(column_spacing=10, row_spacing=10)
grid = Gtk.Grid(column_spacing=8, row_spacing=6)
grid.attach(Gtk.Label(label="Drive", xalign=0), 0, 0, 1, 1)
self.device_combo = Gtk.ComboBoxText()
self.device_combo.connect("changed", self._on_device_changed)
grid.attach(self.device_combo, 1, 0, 2, 1)
grid.attach(self.device_combo, 1, 0, 1, 1)
self.refresh_button = Gtk.Button(label="Refresh Drives")
self.refresh_button = Gtk.Button(label="Detect Drives")
self.refresh_button.connect("clicked", self._on_refresh_clicked)
self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives")
grid.attach(self.refresh_button, 3, 0, 1, 1)
grid.attach(self.refresh_button, 2, 0, 1, 1)
self.analyze_button = Gtk.Button(label="Analyze Drive")
self.analyze_button.connect("clicked", self._on_analyze_drive_clicked)
self.analyze_button.set_tooltip_text("Probe whether cdparanoia can defeat this drive's audio cache")
grid.attach(self.analyze_button, 3, 0, 1, 1)
self.find_offset_button = Gtk.Button(label="Find Offset")
self.find_offset_button.connect("clicked", self._on_find_offset_clicked)
self.find_offset_button.set_tooltip_text("Detect the configured read offset using an AccurateRip-known disc")
grid.attach(self.find_offset_button, 4, 0, 1, 1)
grid.attach(Gtk.Label(label="Status", xalign=0), 5, 0, 1, 1)
self.status_label = Gtk.Label(label="Idle", xalign=0)
self.status_label.set_selectable(True)
grid.attach(self.status_label, 6, 0, 1, 1)
grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1)
self.output_button = Gtk.FileChooserButton(
@@ -186,55 +249,110 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
)
self.output_button.set_filename(os.path.expanduser("~"))
self.output_button.connect("file-set", self._on_settings_changed)
grid.attach(self.output_button, 1, 1, 3, 1)
grid.attach(self.output_button, 1, 1, 2, 1)
grid.attach(Gtk.Label(label="Logger", xalign=0), 3, 1, 1, 1)
self.logger_combo = Gtk.ComboBoxText()
for logger_name in sorted(result.getLoggers()):
self.logger_combo.append(logger_name, logger_name)
self.logger_combo.set_active_id("whipper")
self.logger_combo.connect("changed", self._on_settings_changed)
grid.attach(self.logger_combo, 4, 1, 2, 1)
grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1)
self.cover_art_combo = Gtk.ComboBoxText()
self.cover_art_combo.append("", "Disabled")
self.cover_art_combo.append("file", "Save file")
self.cover_art_combo.append("embed", "Embed only")
self.cover_art_combo.append("complete", "File + embed")
self.cover_art_combo.set_active(0)
self.cover_art_combo.connect("changed", self._on_settings_changed)
grid.attach(self.cover_art_combo, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Offset", xalign=0), 2, 2, 1, 1)
self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1)
self.offset_spin.set_value(0)
self.offset_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.offset_spin, 3, 2, 1, 1)
grid.attach(Gtk.Label(label="Retries", xalign=0), 4, 2, 1, 1)
self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1)
self.max_retries_spin.set_value(5)
self.max_retries_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.max_retries_spin, 5, 2, 1, 1)
grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1)
self.working_directory_entry = Gtk.Entry()
self.working_directory_entry.set_placeholder_text("Optional working directory")
self.working_directory_entry.connect("changed", self._on_settings_changed)
grid.attach(self.working_directory_entry, 1, 2, 3, 1)
grid.attach(self.working_directory_entry, 1, 3, 2, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 3, 1, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 4, 1, 1)
self.country_entry = Gtk.Entry()
self.country_entry.set_placeholder_text("Optional MusicBrainz country filter")
self.country_entry.connect("changed", self._on_settings_changed)
grid.attach(self.country_entry, 1, 3, 1, 1)
grid.attach(self.country_entry, 1, 4, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 3, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 4, 1, 1)
self.release_id_entry = Gtk.Entry()
self.release_id_entry.set_placeholder_text("Optional release override")
self.release_id_entry.connect("changed", self._on_settings_changed)
grid.attach(self.release_id_entry, 3, 3, 1, 1)
grid.attach(self.release_id_entry, 3, 4, 3, 1)
return grid
def _build_actions(self):
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10)
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
self.read_button = Gtk.Button(label="Read Disc")
self.read_button = Gtk.Button(label="Detect TOC")
self.read_button.connect("clicked", self._on_read_clicked)
self.read_button.set_tooltip_text("Read TOC and fetch matching MusicBrainz releases")
box.pack_start(self.read_button, False, False, 0)
self.rip_button = Gtk.Button(label="Rip Selected Release")
self.rip_button = Gtk.Button(label="Secure Rip")
self.rip_button.connect("clicked", self._on_rip_clicked)
self.rip_button.set_tooltip_text("Rip the current disc using the selected release metadata")
self.rip_button.set_sensitive(False)
box.pack_start(self.rip_button, False, False, 0)
self.stop_button = Gtk.Button(label="Stop")
self.stop_button = Gtk.Button(label="Abort")
self.stop_button.connect("clicked", self._on_stop_clicked)
self.stop_button.set_tooltip_text("Cancel the current scan or rip")
self.stop_button.set_sensitive(False)
box.pack_start(self.stop_button, False, False, 0)
self.status_label = Gtk.Label(label="Idle", xalign=0)
box.pack_start(self.status_label, True, True, 0)
box.pack_start(Gtk.Separator(orientation=Gtk.Orientation.VERTICAL), False, False, 6)
self.unknown_check = Gtk.CheckButton(label="Unknown")
self.unknown_check.set_active(True)
self.unknown_check.connect("toggled", self._on_unknown_toggled)
box.pack_start(self.unknown_check, False, False, 0)
self.cdr_check = Gtk.CheckButton(label="CD-R")
self.cdr_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.cdr_check, False, False, 0)
self.keep_going_check = Gtk.CheckButton(label="Keep Going")
self.keep_going_check.set_active(True)
self.keep_going_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.keep_going_check, False, False, 0)
self.overread_check = Gtk.CheckButton(label="Overread")
self.overread_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.overread_check, False, False, 0)
box.pack_start(Gtk.Separator(orientation=Gtk.Orientation.VERTICAL), False, False, 6)
self.compact_mode_label = Gtk.Label(
label="Compact EAC-style flow: Detect TOC -> pick release -> Secure Rip",
xalign=0,
)
box.pack_start(self.compact_mode_label, True, True, 0)
return box
def _build_progress(self):
frame = Gtk.Frame(label="Progress")
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=10)
frame = Gtk.Frame(label="Extraction Progress")
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4, margin=6)
frame.add(box)
self.progress_label = Gtk.Label(label="No task running", xalign=0)
@@ -251,22 +369,36 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
return frame
def _build_main_content(self):
pane = Gtk.Paned.new(Gtk.Orientation.HORIZONTAL)
pane = Gtk.Paned.new(Gtk.Orientation.VERTICAL)
pane.set_wide_handle(True)
pane.pack1(self._build_left_panel(), resize=True, shrink=False)
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
pane.pack1(self._build_workspace(), resize=True, shrink=False)
pane.pack2(self._build_log(), resize=False, shrink=False)
pane.set_position(560)
self.main_pane = pane
return pane
def _build_workspace(self):
pane = Gtk.Paned.new(Gtk.Orientation.HORIZONTAL)
pane.set_wide_handle(True)
pane.pack1(self._build_left_panel(), resize=False, shrink=False)
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
pane.set_position(360)
return pane
def _build_left_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
info_frame = Gtk.Frame(label="Disc")
info_grid = Gtk.Grid(column_spacing=10, row_spacing=6, margin=10)
info_grid = Gtk.Grid(column_spacing=8, row_spacing=4, margin=6)
info_frame.add(info_grid)
rows = [
("Device", "device"),
("Vendor", "vendor"),
("Model", "model"),
("Release", "release"),
("Read offset", "read_offset"),
("Cache defeat", "cache_defeat"),
("Status", "disc_status"),
("CDDB", "cddb"),
("Disc ID", "mbid"),
@@ -281,22 +413,28 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
box.pack_start(info_frame, False, False, 0)
release_frame = Gtk.Frame(label="Matching Releases")
release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8)
release_frame = Gtk.Frame(label="Metadata / Releases")
release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
release_frame.add(release_box)
self.release_store = Gtk.ListStore(str, str, str, str, str, object)
self.release_view = Gtk.TreeView(model=self.release_store)
self.release_view.set_headers_clickable(False)
self.release_view.set_enable_search(False)
self.release_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
self.release_view.get_selection().connect("changed", self._on_release_selected)
for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
if title in {"Artist", "Title"}:
column.set_expand(True)
self.release_view.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(True)
scroll.set_min_content_height(180)
scroll.add(self.release_view)
release_box.pack_start(scroll, True, True, 0)
@@ -304,7 +442,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
details_scroll = Gtk.ScrolledWindow()
details_scroll.set_hexpand(True)
details_scroll.set_vexpand(False)
details_scroll.set_min_content_height(120)
details_scroll.set_min_content_height(90)
details_frame.add(details_scroll)
self.release_details = Gtk.Label(label="Select a release to inspect it.", xalign=0, yalign=0)
@@ -314,21 +452,39 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
release_box.pack_start(details_frame, False, False, 0)
box.pack_start(release_frame, True, True, 0)
box.set_size_request(420, -1)
return box
def _build_right_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12)
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
tracks_frame = Gtk.Frame(label="Tracks")
tracks_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8)
tracks_frame = Gtk.Frame(label="Track Extraction Matrix")
tracks_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
tracks_frame.add(tracks_box)
self.track_store = Gtk.ListStore(str, str, str, str)
self.track_columns = {
"number": 0,
"status": 1,
"artist": 2,
"title": 3,
"length": 4,
"test_crc": 5,
"copy_crc": 6,
"accuraterip": 7,
}
self.track_store = Gtk.ListStore(str, str, str, str, str, str, str, str)
self.track_view = Gtk.TreeView(model=self.track_store)
for index, title in enumerate(["#", "Artist", "Title", "Length"]):
self.track_view.set_headers_clickable(False)
self.track_view.set_enable_search(False)
self.track_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
for index, title in enumerate(["#", "Status", "Artist", "Title", "Length", "Test CRC", "Copy CRC", "AR"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
if title in {"Artist", "Title"}:
column.set_expand(True)
if title == "Status":
column.set_min_width(140)
self.track_view.append_column(column)
track_scroll = Gtk.ScrolledWindow()
@@ -337,89 +493,42 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
track_scroll.add(self.track_view)
tracks_box.pack_start(track_scroll, True, True, 0)
tracks_box.pack_start(self._build_rip_options(), False, False, 0)
box.pack_start(tracks_frame, True, True, 0)
box.pack_start(self._build_rip_options(), False, False, 0)
return box
def _build_rip_options(self):
frame = Gtk.Frame(label="Rip Options")
grid = Gtk.Grid(column_spacing=10, row_spacing=8, margin=10)
frame = Gtk.Frame(label="Extraction Setup")
grid = Gtk.Grid(column_spacing=8, row_spacing=4, margin=6)
frame.add(grid)
self.unknown_check = Gtk.CheckButton(label="Allow ripping without metadata")
self.unknown_check.set_active(True)
self.unknown_check.connect("toggled", self._on_unknown_toggled)
grid.attach(self.unknown_check, 0, 0, 2, 1)
self.cdr_check = Gtk.CheckButton(label="Allow CD-R")
self.cdr_check.connect("toggled", self._on_settings_changed)
grid.attach(self.cdr_check, 2, 0, 1, 1)
self.keep_going_check = Gtk.CheckButton(label="Keep going on failed tracks")
self.keep_going_check.set_active(True)
self.keep_going_check.connect("toggled", self._on_settings_changed)
grid.attach(self.keep_going_check, 0, 1, 2, 1)
self.overread_check = Gtk.CheckButton(label="Force overread")
self.overread_check.connect("toggled", self._on_settings_changed)
grid.attach(self.overread_check, 2, 1, 1, 1)
grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1)
self.cover_art_combo = Gtk.ComboBoxText()
self.cover_art_combo.append("", "Disabled")
self.cover_art_combo.append("file", "Save file")
self.cover_art_combo.append("embed", "Embed only")
self.cover_art_combo.append("complete", "File + embed")
self.cover_art_combo.set_active(0)
self.cover_art_combo.connect("changed", self._on_settings_changed)
grid.attach(self.cover_art_combo, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Max Retries", xalign=0), 2, 2, 1, 1)
self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1)
self.max_retries_spin.set_value(5)
self.max_retries_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.max_retries_spin, 3, 2, 1, 1)
grid.attach(Gtk.Label(label="Offset", xalign=0), 0, 3, 1, 1)
self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1)
self.offset_spin.set_value(0)
self.offset_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.offset_spin, 1, 3, 1, 1)
grid.attach(Gtk.Label(label="Logger", xalign=0), 2, 3, 1, 1)
self.logger_combo = Gtk.ComboBoxText()
for logger_name in sorted(result.getLoggers()):
self.logger_combo.append(logger_name, logger_name)
self.logger_combo.set_active_id("whipper")
self.logger_combo.connect("changed", self._on_settings_changed)
grid.attach(self.logger_combo, 3, 3, 1, 1)
grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 4, 1, 1)
grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 0, 1, 1)
self.track_template_entry = Gtk.Entry()
self.track_template_entry.set_text(cd_command.DEFAULT_TRACK_TEMPLATE)
self.track_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.track_template_entry, 1, 4, 3, 1)
grid.attach(self.track_template_entry, 1, 0, 3, 1)
grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 5, 1, 1)
grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 1, 1, 1)
self.disc_template_entry = Gtk.Entry()
self.disc_template_entry.set_text(cd_command.DEFAULT_DISC_TEMPLATE)
self.disc_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.disc_template_entry, 1, 5, 3, 1)
grid.attach(self.disc_template_entry, 1, 1, 3, 1)
note = Gtk.Label(
label="The configured drive offset is loaded automatically when whipper knows this drive.",
label="Advanced templates stay here; fast extraction switches moved to the control center to reduce vertical space.",
xalign=0,
)
note.set_line_wrap(True)
grid.attach(note, 0, 6, 4, 1)
grid.attach(note, 0, 2, 4, 1)
return frame
def _build_log(self):
frame = Gtk.Frame(label="Log")
frame = Gtk.Frame(label="Extraction Log")
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(True)
scroll.set_vexpand(False)
scroll.set_min_content_height(150)
frame.add(scroll)
text_view = Gtk.TextView()
@@ -551,6 +660,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
def _set_running_state(self, running, status):
self.read_button.set_sensitive(not running)
self.refresh_button.set_sensitive(not running)
self.analyze_button.set_sensitive(not running)
self.find_offset_button.set_sensitive(not running)
self.rip_button.set_sensitive((not running) and self._can_rip())
self.stop_button.set_sensitive(running)
self.status_label.set_text(status)
@@ -562,7 +673,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.pulse_id = 0
def _pulse_progress(self):
if self.scan_runner is None and self.rip_runner is None:
if self.scan_runner is None and self.rip_runner is None and self.drive_runner is None:
return False
self.track_bar.pulse()
return True
@@ -579,9 +690,14 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.device_combo.set_active(0)
self.read_button.set_sensitive(False)
self.rip_button.set_sensitive(False)
self.analyze_button.set_sensitive(False)
self.find_offset_button.set_sensitive(False)
logger.warning("no optical drives detected")
self._update_drive_info(None)
else:
self.read_button.set_sensitive(True)
self.analyze_button.set_sensitive(True)
self.find_offset_button.set_sensitive(True)
if current in devices:
self.device_combo.set_active(devices.index(current))
else:
@@ -589,6 +705,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
logger.info("detected %d optical drive(s); active device: %s",
len(devices), self.device_combo.get_active_text())
self._apply_configured_offset()
self._update_drive_info(self.device_combo.get_active_text())
def _selected_device(self):
device = self.device_combo.get_active_text()
@@ -596,20 +713,64 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
return None
return device
def _update_drive_info(self, device):
self._set_label("device", device)
for key in ["vendor", "model", "release", "read_offset", "cache_defeat"]:
self._set_label(key, None)
if not device:
return
try:
info = drive.getDeviceInfo(device)
except Exception:
logger.exception("failed to read drive info for %s", device)
return
if not info:
return
vendor, model, release = info
self._set_label("vendor", vendor)
self._set_label("model", model)
self._set_label("release", release)
conf = config.Config()
try:
self._set_label("read_offset", str(conf.getReadOffset(vendor, model, release)))
except KeyError:
self._set_label("read_offset", "Unknown")
try:
defeats_cache = conf.getDefeatsCache(vendor, model, release)
self._set_label("cache_defeat", "Yes" if defeats_cache else "No")
except KeyError:
self._set_label("cache_defeat", "Unknown")
@staticmethod
def _parse_offset_candidates(offsets_text):
offsets = []
for block in offsets_text.split(","):
block = block.strip()
if not block:
continue
if ":" in block:
start, end = block.split(":")
offsets.extend(range(int(start), int(end) + 1))
else:
offsets.append(int(block))
return offsets
def _update_release_store(self, releases):
self.release_store.clear()
self.current_release = None
for metadata in releases:
self.release_store.append([
metadata.artist or "",
metadata.releaseTitle or metadata.title or "",
metadata.artist or "Unknown Artist",
metadata.releaseTitle or metadata.title or "Unknown Release",
_release_year(metadata),
metadata.releaseType or "",
_release_country(metadata),
metadata.releaseType or "Unknown",
_release_country(metadata) or "",
metadata,
])
if releases:
self.release_view.get_selection().select_path(0)
self.release_view.set_cursor(Gtk.TreePath.new_first())
self.release_view.scroll_to_cell(Gtk.TreePath.new_first(), None, False, 0.0, 0.0)
else:
self._update_track_store(None)
self._update_release_details(None)
@@ -621,11 +782,24 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
for index, track in enumerate(metadata.tracks, start=1):
self.track_store.append([
str(index),
"Ready",
track.artist or "",
track.title or "",
_format_duration_ms(track.duration),
"",
"",
"",
])
def _set_track_field(self, track_number, field, value):
if track_number <= 0 or self.track_store is None or self.track_columns is None:
return False
row_index = track_number - 1
if row_index < 0 or row_index >= len(self.track_store):
return False
self.track_store[row_index][self.track_columns[field]] = value
return False
def _update_release_details(self, metadata):
if metadata is None:
self.release_details.set_text("Select a release to inspect it.")
@@ -669,6 +843,178 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.offset_spin.set_value(int(offset))
logger.info("loaded configured read offset %d for %s", offset, device)
def _analyze_drive_worker(self, device):
try:
self.drive_cancel_requested = False
GLib.idle_add(self._log_action, "Drive analysis requested for %s", device)
GLib.idle_add(self._set_running_state, True, "Analyzing drive")
GLib.idle_add(self.progress_label.set_text, "Analyzing drive cache behaviour")
runner = CancellableSyncRunner()
self.drive_runner = runner
analyze_task = cdparanoia.AnalyzeTask(device)
runner.run(analyze_task)
if self.drive_cancel_requested:
raise RuntimeError("Drive analysis cancelled")
if analyze_task.defeatsCache is None:
raise RuntimeError("Cannot analyze the drive; insert an audio CD and retry.")
info = drive.getDeviceInfo(device)
if info:
config.Config().setDefeatsCache(info[0], info[1], info[2], analyze_task.defeatsCache)
def apply_results():
self._log_action(
"Drive cache analysis result: cdparanoia %s defeat the audio cache",
"can" if analyze_task.defeatsCache else "cannot",
)
self._update_drive_info(device)
self._set_running_state(False, "Drive analyzed")
self.progress_label.set_text("Drive analysis complete")
return False
GLib.idle_add(apply_results)
except Exception as exc:
def apply_error():
if self.drive_cancel_requested:
self._log_action("Drive analysis cancelled", level=logging.WARNING)
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Drive analysis cancelled")
else:
self._log_action("%s", exc, level=logging.ERROR)
self._set_running_state(False, "Drive analysis failed")
self.progress_label.set_text("Drive analysis failed")
return False
GLib.idle_add(apply_error)
finally:
self.drive_runner = None
def _find_offset_worker(self, device):
try:
self.drive_cancel_requested = False
GLib.idle_add(self._log_action, "Offset detection requested for %s", device)
GLib.idle_add(self._set_running_state, True, "Finding offset")
GLib.idle_add(self.progress_label.set_text, "Finding drive read offset")
runner = CancellableSyncRunner()
self.drive_runner = runner
utils.load_device(device)
utils.unmount_device(device)
toc_task = cdrdao.ReadTOCTask(device)
runner.run(toc_task)
table = toc_task.toc.table
if len(table.tracks) < 3:
raise RuntimeError("Offset detection needs a CD with at least 3 tracks.")
responses = accurip.get_db_entry(table.accuraterip_path())
offsets = self._parse_offset_candidates(offset_command.OFFSETS)
def match(archecksums, track_number):
for index, response in enumerate(responses):
for checksum in archecksums:
if checksum == response.checksums[track_number - 1]:
return checksum, index
return None, None
def rip_arcs(track_number, offset_value):
fd, path = tempfile.mkstemp(
suffix=".track%02d.offset%d.whipper.wav" % (track_number, offset_value)
)
os.close(fd)
try:
read_task = cdparanoia.ReadTrackTask(
path,
table,
table.getTrackStart(track_number),
table.getTrackEnd(track_number),
overread=False,
offset=offset_value,
device=device,
)
read_task.description = "Ripping track %d with read offset %d" % (
track_number,
offset_value,
)
runner.run(read_task)
v1, v2 = arc.accuraterip_checksum(path, track_number, len(table.tracks))
return "%08x" % v1, "%08x" % v2
finally:
if os.path.exists(path):
os.unlink(path)
found_offset = None
for offset_value in offsets:
if self.drive_cancel_requested:
raise RuntimeError("Offset detection cancelled")
GLib.idle_add(self._log_action, "Trying read offset %d", offset_value)
try:
checksums = rip_arcs(1, offset_value)
except Exception as exc:
GLib.idle_add(self._log_action, "Cannot rip with offset %d: %s", offset_value, exc, level=logging.WARNING)
continue
checksum, response_index = match(checksums, 1)
if not checksum:
continue
GLib.idle_add(self._log_action, "Potential offset %d matched response %d; confirming", offset_value, response_index)
matched_tracks = 1
for track_number in range(2, len(table.tracks)):
if self.drive_cancel_requested:
raise RuntimeError("Offset detection cancelled")
try:
checksums = rip_arcs(track_number, offset_value)
except Exception as exc:
GLib.idle_add(self._log_action, "Track %d failed for offset %d: %s", track_number, offset_value, exc, level=logging.WARNING)
continue
checksum, _ = match(checksums, track_number)
if checksum:
matched_tracks += 1
if matched_tracks == len(table.tracks) - 1:
found_offset = offset_value
break
if found_offset is None:
raise RuntimeError("No matching offset found. Try another AccurateRip-enabled disc.")
info = drive.getDeviceInfo(device)
if info:
config.Config().setReadOffset(info[0], info[1], info[2], found_offset)
def apply_results():
self.offset_spin.set_value(found_offset)
self._log_action("Read offset of device is %d", found_offset)
self._update_drive_info(device)
self._set_running_state(False, "Offset found")
self.progress_label.set_text("Drive offset detection complete")
return False
GLib.idle_add(apply_results)
except accurip.EntryNotFound:
def apply_not_found():
self._log_action("AccurateRip entry not found; try another disc for offset detection", level=logging.WARNING)
self._set_running_state(False, "Offset unavailable")
self.progress_label.set_text("No AccurateRip entry for this disc")
return False
GLib.idle_add(apply_not_found)
except Exception as exc:
def apply_error():
if self.drive_cancel_requested:
self._log_action("Offset detection cancelled", level=logging.WARNING)
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Offset detection cancelled")
else:
self._log_action("%s", exc, level=logging.ERROR)
self._set_running_state(False, "Offset detection failed")
self.progress_label.set_text("Offset detection failed")
return False
GLib.idle_add(apply_error)
finally:
self.drive_runner = None
def _read_disc_worker(self, device, country):
try:
self.scan_cancel_requested = False
@@ -721,7 +1067,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
"tracks": track_count,
"releases": releases,
}
self._set_label("device", device)
self._update_drive_info(device)
self._set_label("disc_status", "Ready")
self._set_label("cddb", cddb)
self._set_label("mbid", mbid)
@@ -746,7 +1092,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.release_store.clear()
self.track_store.clear()
self._update_release_details(None)
self._set_label("device", device)
self._update_drive_info(device)
if self.scan_cancel_requested:
self._set_label("disc_status", "Cancelled")
self._log_action("Disc scan cancelled", level=logging.WARNING)
@@ -836,6 +1182,14 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.track_bar.set_fraction(value)
self.track_bar.set_text(description)
self.progress_label.set_text("%s: %s" % (item_label, description))
if item_label.startswith("Track "):
try:
track_number = int(item_label.split()[1])
except (IndexError, ValueError):
track_number = None
if track_number is not None:
percent = int(value * 100)
self._set_track_field(track_number, "status", "%s %d%%" % (description, percent))
return False
def _mark_track_finished(self, item_label, item_index, item_total, skipped=False):
@@ -846,6 +1200,13 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("%s %s" % (item_label, "skipped" if skipped else "done"))
self.progress_label.set_text("%s %s" % (item_label, "failed" if skipped else "verified"))
if item_label.startswith("Track "):
try:
track_number = int(item_label.split()[1])
except (IndexError, ValueError):
track_number = None
if track_number is not None:
self._set_track_field(track_number, "status", "Skipped" if skipped else "Verified")
return False
def _finish_rip(self, returncode, status="Done"):
@@ -891,6 +1252,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
if os.path.exists(path):
GLib.idle_add(self._append_log, "%s already exists, verifying\n" % item_label)
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "status", "Verifying existing")
if not program.verifyTrack(runner, track_result):
GLib.idle_add(self._append_log, "%s verification failed, reripping\n" % item_label)
os.unlink(path)
@@ -904,6 +1267,13 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
raise RipCancelledError("Rip cancelled")
extra = "" if tries == 1 else " (try %d)" % tries
GLib.idle_add(self._append_log, "%s%s: %s\n" % (item_label, extra, os.path.basename(path)))
if track_number > 0:
GLib.idle_add(
self._set_track_field,
track_number,
"status",
"Test & Copy%s" % extra,
)
tag_list = program.getTagList(track_number, mbdiscid)
if track_number > 0 and itable.tracks[track_number - 1].isrc is not None:
@@ -940,6 +1310,19 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
track_result.copyduration += rip_task.copyduration
if track_result.filename != rip_task.path:
track_result.filename = rip_task.path
if track_number > 0:
GLib.idle_add(
self._set_track_field,
track_number,
"test_crc",
"%08X" % track_result.testcrc if track_result.testcrc is not None else "",
)
GLib.idle_add(
self._set_track_field,
track_number,
"copy_crc",
"%08X" % track_result.copycrc if track_result.copycrc is not None else "",
)
break
except Exception as exc:
if self.rip_cancel_requested:
@@ -953,6 +1336,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
if settings["keep_going"]:
track_result.skipped = True
skipped_tracks.append(track_result)
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "Skipped")
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True)
return
raise RuntimeError("%s can't be ripped" % item_label)
@@ -962,6 +1347,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
return
if track_result.testcrc != track_result.copycrc:
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "CRC mismatch")
raise RuntimeError("CRCs did not match for %s" % item_label)
if track_number == 0:
@@ -985,6 +1372,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
program.result.table.getTrackLength(track_number),
track_number,
)
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "Pending AR")
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, False)
@@ -1239,7 +1627,34 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self._save_gui_settings()
device = self._selected_device()
if device:
self._update_drive_info(device)
self._log_action("Selected device %s", device)
else:
self._update_drive_info(None)
def _on_analyze_drive_clicked(self, _button):
device = self._selected_device()
if not device:
self._log_action("Drive analysis requested without an active device", level=logging.WARNING)
return
self.worker = threading.Thread(
target=self._analyze_drive_worker,
args=(device,),
daemon=True,
)
self.worker.start()
def _on_find_offset_clicked(self, _button):
device = self._selected_device()
if not device:
self._log_action("Offset detection requested without an active device", level=logging.WARNING)
return
self.worker = threading.Thread(
target=self._find_offset_worker,
args=(device,),
daemon=True,
)
self.worker.start()
def _on_read_clicked(self, _button):
device = self._selected_device()
@@ -1295,6 +1710,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
self.rip_cancel_requested = True
self._log_action("Stopping rip", level=logging.WARNING)
self.rip_runner.cancel()
elif self.drive_runner is not None:
self.drive_cancel_requested = True
self._log_action("Stopping drive operation", level=logging.WARNING)
self.drive_runner.cancel()
class GuiLogHandler(logging.Handler):

View File

@@ -169,10 +169,18 @@ class FakeSelection:
class FakeReleaseView:
def __init__(self, store):
self.selection = FakeSelection(store)
self.cursor = None
self.scrolled_path = None
def get_selection(self):
return self.selection
def set_cursor(self, path):
self.cursor = path
def scroll_to_cell(self, path, *_args):
self.scrolled_path = path
class FakeRunner:
def __init__(self):
@@ -269,6 +277,8 @@ def _make_ui_app(tmp_path):
app.disc_template_entry = FakeEntry("%d")
app.read_button = FakeButton()
app.refresh_button = FakeButton()
app.analyze_button = FakeButton()
app.find_offset_button = FakeButton()
app.rip_button = FakeButton()
app.stop_button = FakeButton()
app.status_label = FakeLabel()
@@ -277,10 +287,21 @@ def _make_ui_app(tmp_path):
app.track_bar = FakeProgressBar()
app.release_store = FakeListStore()
app.track_store = FakeListStore()
app.track_columns = {
"number": 0,
"status": 1,
"artist": 2,
"title": 3,
"length": 4,
"test_crc": 5,
"copy_crc": 6,
"accuraterip": 7,
}
app.release_view = FakeReleaseView(app.release_store)
app.release_details = FakeLabel()
app.info_labels = {key: FakeLabel() for key in [
"device", "disc_status", "cddb", "mbid", "duration", "tracks"
"device", "vendor", "model", "release", "read_offset", "cache_defeat",
"disc_status", "cddb", "mbid", "duration", "tracks"
]}
app.scan_runner = None
app.rip_runner = None
@@ -303,8 +324,10 @@ def _make_ui_app(tmp_path):
"_can_rip",
"_set_label",
"_set_running_state",
"_update_drive_info",
"_update_release_store",
"_update_track_store",
"_set_track_field",
"_update_release_details",
"_update_rip_task_progress",
"_finish_rip",
@@ -312,6 +335,8 @@ def _make_ui_app(tmp_path):
"_pulse_progress",
"_resolve_release_metadata",
"_on_release_selected",
"_on_analyze_drive_clicked",
"_on_find_offset_clicked",
"_on_stop_clicked",
)
app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler
@@ -390,13 +415,72 @@ def test_release_selection_and_progress_updates(monkeypatch, tmp_path):
gui.WhipperGui._finish_rip(app, 0, "Done")
assert app.current_release is metadata
assert app.track_store[0][1] == "Track Artist"
assert app.track_store[0][2] == "Track Artist"
assert app.track_store[0][1].startswith("Encoding")
assert app.release_details.get_text().startswith("Artist: Artist")
assert app.overall_bar.fraction == 1.0
assert app.track_bar.fraction == 1.0
assert app.progress_label.get_text() == "Rip complete"
def test_update_release_store_uses_visible_fallbacks(tmp_path):
app = _make_ui_app(tmp_path)
metadata = SimpleNamespace(
artist=None,
releaseTitle=None,
title=None,
release=None,
releaseType=None,
countries=[],
duration=1000,
tracks=[],
discNumber=1,
discTotal=1,
mbid="mbid-empty",
url="https://example.invalid/mbid-empty",
barcode=None,
catalogNumbers=[],
)
gui.WhipperGui._update_release_store(app, [metadata])
assert app.release_store[0][0] == "Unknown Artist"
assert app.release_store[0][1] == "Unknown Release"
assert app.release_store[0][3] == "Unknown"
assert app.release_store[0][4] == ""
assert app.release_view.cursor is not None
assert app.release_view.scrolled_path is not None
def test_update_drive_info_reads_config(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
class FakeConfig:
def getReadOffset(self, vendor, model, release):
assert (vendor, model, release) == ("Vendor", "Model", "1.0")
return 667
def getDefeatsCache(self, vendor, model, release):
assert (vendor, model, release) == ("Vendor", "Model", "1.0")
return True
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda device: ("Vendor", "Model", "1.0"))
monkeypatch.setattr(gui.config, "Config", lambda: FakeConfig())
gui.WhipperGui._update_drive_info(app, "/dev/cdrom")
assert app.info_labels["device"].get_text() == "/dev/cdrom"
assert app.info_labels["vendor"].get_text() == "Vendor"
assert app.info_labels["model"].get_text() == "Model"
assert app.info_labels["release"].get_text() == "1.0"
assert app.info_labels["read_offset"].get_text() == "667"
assert app.info_labels["cache_defeat"].get_text() == "Yes"
def test_parse_offset_candidates_supports_ranges(tmp_path):
assert gui.WhipperGui._parse_offset_candidates("-1, 0, 2:4") == [-1, 0, 2, 3, 4]
def test_main_reports_missing_runtime(monkeypatch, capsys):
monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing"))
monkeypatch.setattr(gui, "gi", None)