diff --git a/whipper/gui.py b/whipper/gui.py index 438b322..60ca845 100644 --- a/whipper/gui.py +++ b/whipper/gui.py @@ -6,6 +6,7 @@ import io import logging import contextlib import importlib.util +import tempfile from pathlib import Path _GUI_IMPORT_ERROR = None @@ -30,8 +31,8 @@ except (ImportError, ValueError) as exc: from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task from whipper.common.program import Program -from whipper.command import cd as cd_command -from whipper.program import cdrdao, cdparanoia, utils +from whipper.command import cd as cd_command, offset as offset_command +from whipper.program import arc, cdrdao, cdparanoia, utils from whipper.result import result @@ -63,6 +64,30 @@ def require_gui_runtime(): raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR +def _iter_icon_candidates(): + local_icon = Path(__file__).resolve().parent.parent / "data" / ("%s.svg" % APP_ID) + yield local_icon + + data_dirs = [] + xdg_data_home = os.environ.get("XDG_DATA_HOME") + if xdg_data_home: + data_dirs.append(Path(xdg_data_home)) + data_dirs.extend(Path(path) for path in os.environ.get("XDG_DATA_DIRS", "/usr/local/share:/usr/share").split(":") if path) + + for base in data_dirs: + yield base / "icons" / "hicolor" / "scalable" / "apps" / ("%s.svg" % APP_ID) + yield base / "icons" / "hicolor" / "256x256" / "apps" / ("%s.png" % APP_ID) + yield base / "pixmaps" / ("%s.png" % APP_ID) + yield base / "pixmaps" / ("%s.svg" % APP_ID) + + +def _resolve_icon_path(): + for candidate in _iter_icon_candidates(): + if candidate.is_file(): + return str(candidate) + return None + + def _format_duration_ms(duration_ms): if duration_ms is None: return "" @@ -87,9 +112,13 @@ def _release_duration_distance(metadata, duration_ms): class WhipperGui(Gtk.Application if Gtk is not None else object): def __init__(self): require_gui_runtime() + GLib.set_prgname(APP_ID) super().__init__(application_id=APP_ID) GLib.set_application_name("Whipper") Gtk.Window.set_default_icon_name(APP_ID) + icon_path = _resolve_icon_path() + if icon_path: + Gtk.Window.set_default_icon_from_file(icon_path) self.window = None self.main_pane = None @@ -99,8 +128,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.scan_data = None self.scan_runner = None self.rip_runner = None + self.drive_runner = None self.scan_cancel_requested = False self.rip_cancel_requested = False + self.drive_cancel_requested = False self.current_release = None self.current_track_number = 0 self.current_track_total = 0 @@ -111,6 +142,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.country_entry = None self.release_id_entry = None self.refresh_button = None + self.analyze_button = None + self.find_offset_button = None self.read_button = None self.rip_button = None self.stop_button = None @@ -154,6 +187,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): window = Gtk.ApplicationWindow(application=self) window.set_title("Whipper") window.set_icon_name(APP_ID) + icon_path = _resolve_icon_path() + if icon_path: + window.set_icon_from_file(icon_path) + window.set_wmclass(APP_ID, APP_ID) window.set_default_size(1380, 820) window.set_border_width(6) @@ -190,10 +227,20 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives") grid.attach(self.refresh_button, 2, 0, 1, 1) - grid.attach(Gtk.Label(label="Status", xalign=0), 3, 0, 1, 1) + self.analyze_button = Gtk.Button(label="Analyze Drive") + self.analyze_button.connect("clicked", self._on_analyze_drive_clicked) + self.analyze_button.set_tooltip_text("Probe whether cdparanoia can defeat this drive's audio cache") + grid.attach(self.analyze_button, 3, 0, 1, 1) + + self.find_offset_button = Gtk.Button(label="Find Offset") + self.find_offset_button.connect("clicked", self._on_find_offset_clicked) + self.find_offset_button.set_tooltip_text("Detect the configured read offset using an AccurateRip-known disc") + grid.attach(self.find_offset_button, 4, 0, 1, 1) + + grid.attach(Gtk.Label(label="Status", xalign=0), 5, 0, 1, 1) self.status_label = Gtk.Label(label="Idle", xalign=0) self.status_label.set_selectable(True) - grid.attach(self.status_label, 4, 0, 2, 1) + grid.attach(self.status_label, 6, 0, 1, 1) grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1) self.output_button = Gtk.FileChooserButton( @@ -347,6 +394,11 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): rows = [ ("Device", "device"), + ("Vendor", "vendor"), + ("Model", "model"), + ("Release", "release"), + ("Read offset", "read_offset"), + ("Cache defeat", "cache_defeat"), ("Status", "disc_status"), ("CDDB", "cddb"), ("Disc ID", "mbid"), @@ -369,7 +421,6 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.release_view = Gtk.TreeView(model=self.release_store) self.release_view.set_headers_clickable(False) self.release_view.set_enable_search(False) - self.release_view.set_fixed_height_mode(True) self.release_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH) self.release_view.get_selection().connect("changed", self._on_release_selected) for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]): @@ -383,6 +434,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): scroll = Gtk.ScrolledWindow() scroll.set_hexpand(True) scroll.set_vexpand(True) + scroll.set_min_content_height(180) scroll.add(self.release_view) release_box.pack_start(scroll, True, True, 0) @@ -424,7 +476,6 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.track_view = Gtk.TreeView(model=self.track_store) self.track_view.set_headers_clickable(False) self.track_view.set_enable_search(False) - self.track_view.set_fixed_height_mode(True) self.track_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH) for index, title in enumerate(["#", "Status", "Artist", "Title", "Length", "Test CRC", "Copy CRC", "AR"]): renderer = Gtk.CellRendererText() @@ -609,6 +660,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): def _set_running_state(self, running, status): self.read_button.set_sensitive(not running) self.refresh_button.set_sensitive(not running) + self.analyze_button.set_sensitive(not running) + self.find_offset_button.set_sensitive(not running) self.rip_button.set_sensitive((not running) and self._can_rip()) self.stop_button.set_sensitive(running) self.status_label.set_text(status) @@ -620,7 +673,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.pulse_id = 0 def _pulse_progress(self): - if self.scan_runner is None and self.rip_runner is None: + if self.scan_runner is None and self.rip_runner is None and self.drive_runner is None: return False self.track_bar.pulse() return True @@ -637,9 +690,14 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.device_combo.set_active(0) self.read_button.set_sensitive(False) self.rip_button.set_sensitive(False) + self.analyze_button.set_sensitive(False) + self.find_offset_button.set_sensitive(False) logger.warning("no optical drives detected") + self._update_drive_info(None) else: self.read_button.set_sensitive(True) + self.analyze_button.set_sensitive(True) + self.find_offset_button.set_sensitive(True) if current in devices: self.device_combo.set_active(devices.index(current)) else: @@ -647,6 +705,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): logger.info("detected %d optical drive(s); active device: %s", len(devices), self.device_combo.get_active_text()) self._apply_configured_offset() + self._update_drive_info(self.device_combo.get_active_text()) def _selected_device(self): device = self.device_combo.get_active_text() @@ -654,20 +713,64 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): return None return device + def _update_drive_info(self, device): + self._set_label("device", device) + for key in ["vendor", "model", "release", "read_offset", "cache_defeat"]: + self._set_label(key, None) + if not device: + return + try: + info = drive.getDeviceInfo(device) + except Exception: + logger.exception("failed to read drive info for %s", device) + return + if not info: + return + vendor, model, release = info + self._set_label("vendor", vendor) + self._set_label("model", model) + self._set_label("release", release) + conf = config.Config() + try: + self._set_label("read_offset", str(conf.getReadOffset(vendor, model, release))) + except KeyError: + self._set_label("read_offset", "Unknown") + try: + defeats_cache = conf.getDefeatsCache(vendor, model, release) + self._set_label("cache_defeat", "Yes" if defeats_cache else "No") + except KeyError: + self._set_label("cache_defeat", "Unknown") + + @staticmethod + def _parse_offset_candidates(offsets_text): + offsets = [] + for block in offsets_text.split(","): + block = block.strip() + if not block: + continue + if ":" in block: + start, end = block.split(":") + offsets.extend(range(int(start), int(end) + 1)) + else: + offsets.append(int(block)) + return offsets + def _update_release_store(self, releases): self.release_store.clear() self.current_release = None for metadata in releases: self.release_store.append([ - metadata.artist or "", - metadata.releaseTitle or metadata.title or "", + metadata.artist or "Unknown Artist", + metadata.releaseTitle or metadata.title or "Unknown Release", _release_year(metadata), - metadata.releaseType or "", - _release_country(metadata), + metadata.releaseType or "Unknown", + _release_country(metadata) or "—", metadata, ]) if releases: self.release_view.get_selection().select_path(0) + self.release_view.set_cursor(Gtk.TreePath.new_first()) + self.release_view.scroll_to_cell(Gtk.TreePath.new_first(), None, False, 0.0, 0.0) else: self._update_track_store(None) self._update_release_details(None) @@ -740,6 +843,178 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.offset_spin.set_value(int(offset)) logger.info("loaded configured read offset %d for %s", offset, device) + def _analyze_drive_worker(self, device): + try: + self.drive_cancel_requested = False + GLib.idle_add(self._log_action, "Drive analysis requested for %s", device) + GLib.idle_add(self._set_running_state, True, "Analyzing drive") + GLib.idle_add(self.progress_label.set_text, "Analyzing drive cache behaviour") + + runner = CancellableSyncRunner() + self.drive_runner = runner + analyze_task = cdparanoia.AnalyzeTask(device) + runner.run(analyze_task) + if self.drive_cancel_requested: + raise RuntimeError("Drive analysis cancelled") + if analyze_task.defeatsCache is None: + raise RuntimeError("Cannot analyze the drive; insert an audio CD and retry.") + + info = drive.getDeviceInfo(device) + if info: + config.Config().setDefeatsCache(info[0], info[1], info[2], analyze_task.defeatsCache) + + def apply_results(): + self._log_action( + "Drive cache analysis result: cdparanoia %s defeat the audio cache", + "can" if analyze_task.defeatsCache else "cannot", + ) + self._update_drive_info(device) + self._set_running_state(False, "Drive analyzed") + self.progress_label.set_text("Drive analysis complete") + return False + + GLib.idle_add(apply_results) + except Exception as exc: + def apply_error(): + if self.drive_cancel_requested: + self._log_action("Drive analysis cancelled", level=logging.WARNING) + self._set_running_state(False, "Cancelled") + self.progress_label.set_text("Drive analysis cancelled") + else: + self._log_action("%s", exc, level=logging.ERROR) + self._set_running_state(False, "Drive analysis failed") + self.progress_label.set_text("Drive analysis failed") + return False + + GLib.idle_add(apply_error) + finally: + self.drive_runner = None + + def _find_offset_worker(self, device): + try: + self.drive_cancel_requested = False + GLib.idle_add(self._log_action, "Offset detection requested for %s", device) + GLib.idle_add(self._set_running_state, True, "Finding offset") + GLib.idle_add(self.progress_label.set_text, "Finding drive read offset") + + runner = CancellableSyncRunner() + self.drive_runner = runner + + utils.load_device(device) + utils.unmount_device(device) + toc_task = cdrdao.ReadTOCTask(device) + runner.run(toc_task) + table = toc_task.toc.table + if len(table.tracks) < 3: + raise RuntimeError("Offset detection needs a CD with at least 3 tracks.") + + responses = accurip.get_db_entry(table.accuraterip_path()) + offsets = self._parse_offset_candidates(offset_command.OFFSETS) + + def match(archecksums, track_number): + for index, response in enumerate(responses): + for checksum in archecksums: + if checksum == response.checksums[track_number - 1]: + return checksum, index + return None, None + + def rip_arcs(track_number, offset_value): + fd, path = tempfile.mkstemp( + suffix=".track%02d.offset%d.whipper.wav" % (track_number, offset_value) + ) + os.close(fd) + try: + read_task = cdparanoia.ReadTrackTask( + path, + table, + table.getTrackStart(track_number), + table.getTrackEnd(track_number), + overread=False, + offset=offset_value, + device=device, + ) + read_task.description = "Ripping track %d with read offset %d" % ( + track_number, + offset_value, + ) + runner.run(read_task) + v1, v2 = arc.accuraterip_checksum(path, track_number, len(table.tracks)) + return "%08x" % v1, "%08x" % v2 + finally: + if os.path.exists(path): + os.unlink(path) + + found_offset = None + for offset_value in offsets: + if self.drive_cancel_requested: + raise RuntimeError("Offset detection cancelled") + GLib.idle_add(self._log_action, "Trying read offset %d", offset_value) + try: + checksums = rip_arcs(1, offset_value) + except Exception as exc: + GLib.idle_add(self._log_action, "Cannot rip with offset %d: %s", offset_value, exc, level=logging.WARNING) + continue + checksum, response_index = match(checksums, 1) + if not checksum: + continue + + GLib.idle_add(self._log_action, "Potential offset %d matched response %d; confirming", offset_value, response_index) + matched_tracks = 1 + for track_number in range(2, len(table.tracks)): + if self.drive_cancel_requested: + raise RuntimeError("Offset detection cancelled") + try: + checksums = rip_arcs(track_number, offset_value) + except Exception as exc: + GLib.idle_add(self._log_action, "Track %d failed for offset %d: %s", track_number, offset_value, exc, level=logging.WARNING) + continue + checksum, _ = match(checksums, track_number) + if checksum: + matched_tracks += 1 + if matched_tracks == len(table.tracks) - 1: + found_offset = offset_value + break + + if found_offset is None: + raise RuntimeError("No matching offset found. Try another AccurateRip-enabled disc.") + + info = drive.getDeviceInfo(device) + if info: + config.Config().setReadOffset(info[0], info[1], info[2], found_offset) + + def apply_results(): + self.offset_spin.set_value(found_offset) + self._log_action("Read offset of device is %d", found_offset) + self._update_drive_info(device) + self._set_running_state(False, "Offset found") + self.progress_label.set_text("Drive offset detection complete") + return False + + GLib.idle_add(apply_results) + except accurip.EntryNotFound: + def apply_not_found(): + self._log_action("AccurateRip entry not found; try another disc for offset detection", level=logging.WARNING) + self._set_running_state(False, "Offset unavailable") + self.progress_label.set_text("No AccurateRip entry for this disc") + return False + + GLib.idle_add(apply_not_found) + except Exception as exc: + def apply_error(): + if self.drive_cancel_requested: + self._log_action("Offset detection cancelled", level=logging.WARNING) + self._set_running_state(False, "Cancelled") + self.progress_label.set_text("Offset detection cancelled") + else: + self._log_action("%s", exc, level=logging.ERROR) + self._set_running_state(False, "Offset detection failed") + self.progress_label.set_text("Offset detection failed") + return False + + GLib.idle_add(apply_error) + finally: + self.drive_runner = None + def _read_disc_worker(self, device, country): try: self.scan_cancel_requested = False @@ -792,7 +1067,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): "tracks": track_count, "releases": releases, } - self._set_label("device", device) + self._update_drive_info(device) self._set_label("disc_status", "Ready") self._set_label("cddb", cddb) self._set_label("mbid", mbid) @@ -817,7 +1092,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.release_store.clear() self.track_store.clear() self._update_release_details(None) - self._set_label("device", device) + self._update_drive_info(device) if self.scan_cancel_requested: self._set_label("disc_status", "Cancelled") self._log_action("Disc scan cancelled", level=logging.WARNING) @@ -1352,7 +1627,34 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self._save_gui_settings() device = self._selected_device() if device: + self._update_drive_info(device) self._log_action("Selected device %s", device) + else: + self._update_drive_info(None) + + def _on_analyze_drive_clicked(self, _button): + device = self._selected_device() + if not device: + self._log_action("Drive analysis requested without an active device", level=logging.WARNING) + return + self.worker = threading.Thread( + target=self._analyze_drive_worker, + args=(device,), + daemon=True, + ) + self.worker.start() + + def _on_find_offset_clicked(self, _button): + device = self._selected_device() + if not device: + self._log_action("Offset detection requested without an active device", level=logging.WARNING) + return + self.worker = threading.Thread( + target=self._find_offset_worker, + args=(device,), + daemon=True, + ) + self.worker.start() def _on_read_clicked(self, _button): device = self._selected_device() @@ -1408,6 +1710,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object): self.rip_cancel_requested = True self._log_action("Stopping rip", level=logging.WARNING) self.rip_runner.cancel() + elif self.drive_runner is not None: + self.drive_cancel_requested = True + self._log_action("Stopping drive operation", level=logging.WARNING) + self.drive_runner.cancel() class GuiLogHandler(logging.Handler): diff --git a/whipper/test/test_gui.py b/whipper/test/test_gui.py index 9b70f34..9c34907 100644 --- a/whipper/test/test_gui.py +++ b/whipper/test/test_gui.py @@ -169,10 +169,18 @@ class FakeSelection: class FakeReleaseView: def __init__(self, store): self.selection = FakeSelection(store) + self.cursor = None + self.scrolled_path = None def get_selection(self): return self.selection + def set_cursor(self, path): + self.cursor = path + + def scroll_to_cell(self, path, *_args): + self.scrolled_path = path + class FakeRunner: def __init__(self): @@ -269,6 +277,8 @@ def _make_ui_app(tmp_path): app.disc_template_entry = FakeEntry("%d") app.read_button = FakeButton() app.refresh_button = FakeButton() + app.analyze_button = FakeButton() + app.find_offset_button = FakeButton() app.rip_button = FakeButton() app.stop_button = FakeButton() app.status_label = FakeLabel() @@ -290,7 +300,8 @@ def _make_ui_app(tmp_path): app.release_view = FakeReleaseView(app.release_store) app.release_details = FakeLabel() app.info_labels = {key: FakeLabel() for key in [ - "device", "disc_status", "cddb", "mbid", "duration", "tracks" + "device", "vendor", "model", "release", "read_offset", "cache_defeat", + "disc_status", "cddb", "mbid", "duration", "tracks" ]} app.scan_runner = None app.rip_runner = None @@ -313,6 +324,7 @@ def _make_ui_app(tmp_path): "_can_rip", "_set_label", "_set_running_state", + "_update_drive_info", "_update_release_store", "_update_track_store", "_set_track_field", @@ -323,6 +335,8 @@ def _make_ui_app(tmp_path): "_pulse_progress", "_resolve_release_metadata", "_on_release_selected", + "_on_analyze_drive_clicked", + "_on_find_offset_clicked", "_on_stop_clicked", ) app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler @@ -409,6 +423,64 @@ def test_release_selection_and_progress_updates(monkeypatch, tmp_path): assert app.progress_label.get_text() == "Rip complete" +def test_update_release_store_uses_visible_fallbacks(tmp_path): + app = _make_ui_app(tmp_path) + metadata = SimpleNamespace( + artist=None, + releaseTitle=None, + title=None, + release=None, + releaseType=None, + countries=[], + duration=1000, + tracks=[], + discNumber=1, + discTotal=1, + mbid="mbid-empty", + url="https://example.invalid/mbid-empty", + barcode=None, + catalogNumbers=[], + ) + + gui.WhipperGui._update_release_store(app, [metadata]) + + assert app.release_store[0][0] == "Unknown Artist" + assert app.release_store[0][1] == "Unknown Release" + assert app.release_store[0][3] == "Unknown" + assert app.release_store[0][4] == "—" + assert app.release_view.cursor is not None + assert app.release_view.scrolled_path is not None + + +def test_update_drive_info_reads_config(monkeypatch, tmp_path): + app = _make_ui_app(tmp_path) + + class FakeConfig: + def getReadOffset(self, vendor, model, release): + assert (vendor, model, release) == ("Vendor", "Model", "1.0") + return 667 + + def getDefeatsCache(self, vendor, model, release): + assert (vendor, model, release) == ("Vendor", "Model", "1.0") + return True + + monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda device: ("Vendor", "Model", "1.0")) + monkeypatch.setattr(gui.config, "Config", lambda: FakeConfig()) + + gui.WhipperGui._update_drive_info(app, "/dev/cdrom") + + assert app.info_labels["device"].get_text() == "/dev/cdrom" + assert app.info_labels["vendor"].get_text() == "Vendor" + assert app.info_labels["model"].get_text() == "Model" + assert app.info_labels["release"].get_text() == "1.0" + assert app.info_labels["read_offset"].get_text() == "667" + assert app.info_labels["cache_defeat"].get_text() == "Yes" + + +def test_parse_offset_candidates_supports_ranges(tmp_path): + assert gui.WhipperGui._parse_offset_candidates("-1, 0, 2:4") == [-1, 0, 2, 3, 4] + + def test_main_reports_missing_runtime(monkeypatch, capsys): monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing")) monkeypatch.setattr(gui, "gi", None)