import os import sys import threading import json import io import logging import contextlib import importlib.util from pathlib import Path _GUI_IMPORT_ERROR = None try: import cdio except ImportError as exc: cdio = None _GUI_IMPORT_ERROR = exc try: import gi gi.require_version("Gtk", "3.0") from gi.repository import GLib, Gtk except (ImportError, ValueError) as exc: gi = None GLib = None Gtk = None if _GUI_IMPORT_ERROR is None: _GUI_IMPORT_ERROR = exc from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task from whipper.common.program import Program from whipper.command import cd as cd_command from whipper.program import cdrdao, cdparanoia, utils from whipper.result import result logger = logging.getLogger(__name__) class RipCancelledError(Exception): pass def gui_runtime_available(): return _GUI_IMPORT_ERROR is None def gui_runtime_error_message(): missing = [] if gi is None: missing.append("PyGObject / GTK 3") if cdio is None: missing.append("pycdio") if not missing: missing.append("GUI runtime dependencies") return "whipper-gui requires %s" % " and ".join(missing) def require_gui_runtime(): if not gui_runtime_available(): raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR def _format_duration_ms(duration_ms): if duration_ms is None: return "" return common.formatTime(duration_ms / 1000.0) def _release_country(metadata): return ", ".join(metadata.countries) if metadata.countries else "" def _release_year(metadata): return metadata.release[:4] if metadata.release else "" def _release_duration_distance(metadata, duration_ms): metadata_duration = getattr(metadata, "duration", None) if metadata_duration is None: return float("inf") return abs(metadata_duration - duration_ms) class WhipperGui(Gtk.Application if Gtk is not None else object): def __init__(self): require_gui_runtime() super().__init__(application_id="com.github.whipper_team.WhipperGui") self.window = None self.main_pane = None self.worker = None self.pulse_id = 0 self.scan_data = None self.scan_runner = None self.rip_runner = None self.scan_cancel_requested = False self.rip_cancel_requested = False self.current_release = None self.current_track_number = 0 self.current_track_total = 0 self.device_combo = None self.output_button = None self.working_directory_entry = None self.country_entry = None self.release_id_entry = None self.refresh_button = None self.read_button = None self.rip_button = None self.stop_button = None self.unknown_check = None self.cdr_check = None self.keep_going_check = None self.overread_check = None self.cover_art_combo = None self.max_retries_spin = None self.offset_spin = None self.logger_combo = None self.track_template_entry = None self.disc_template_entry = None self.release_store = None self.release_view = None self.track_store = None self.track_view = None self.log_buffer = None self.release_details = None self.info_labels = {} self.status_label = None self.progress_label = None self.overall_bar = None self.track_bar = None def do_activate(self): if self.window is None: self.window = self._build_window() self.window.present() def do_shutdown(self): self._save_gui_settings() Gtk.Application.do_shutdown(self) def _build_window(self): window = Gtk.ApplicationWindow(application=self) window.set_title("Whipper") window.set_icon_name("com.github.whipper_team.Whipper") window.set_default_size(1120, 760) window.set_border_width(12) root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) window.add(root) root.pack_start(self._build_controls(), False, False, 0) root.pack_start(self._build_actions(), False, False, 0) root.pack_start(self._build_progress(), False, False, 0) root.pack_start(self._build_main_content(), True, True, 0) root.pack_start(self._build_log(), True, True, 0) self._refresh_devices() self._load_gui_settings() window.show_all() return window def _build_controls(self): grid = Gtk.Grid(column_spacing=10, row_spacing=10) grid.attach(Gtk.Label(label="Drive", xalign=0), 0, 0, 1, 1) self.device_combo = Gtk.ComboBoxText() self.device_combo.connect("changed", self._on_device_changed) grid.attach(self.device_combo, 1, 0, 2, 1) self.refresh_button = Gtk.Button(label="Refresh Drives") self.refresh_button.connect("clicked", self._on_refresh_clicked) self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives") grid.attach(self.refresh_button, 3, 0, 1, 1) grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1) self.output_button = Gtk.FileChooserButton( title="Select output folder", action=Gtk.FileChooserAction.SELECT_FOLDER, ) self.output_button.set_filename(os.path.expanduser("~")) self.output_button.connect("file-set", self._on_settings_changed) grid.attach(self.output_button, 1, 1, 3, 1) grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1) self.working_directory_entry = Gtk.Entry() self.working_directory_entry.set_placeholder_text("Optional working directory") self.working_directory_entry.connect("changed", self._on_settings_changed) grid.attach(self.working_directory_entry, 1, 2, 3, 1) grid.attach(Gtk.Label(label="Country", xalign=0), 0, 3, 1, 1) self.country_entry = Gtk.Entry() self.country_entry.set_placeholder_text("Optional MusicBrainz country filter") self.country_entry.connect("changed", self._on_settings_changed) grid.attach(self.country_entry, 1, 3, 1, 1) grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 3, 1, 1) self.release_id_entry = Gtk.Entry() self.release_id_entry.set_placeholder_text("Optional release override") self.release_id_entry.connect("changed", self._on_settings_changed) grid.attach(self.release_id_entry, 3, 3, 1, 1) return grid def _build_actions(self): box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10) self.read_button = Gtk.Button(label="Read Disc") self.read_button.connect("clicked", self._on_read_clicked) self.read_button.set_tooltip_text("Read TOC and fetch matching MusicBrainz releases") box.pack_start(self.read_button, False, False, 0) self.rip_button = Gtk.Button(label="Rip Selected Release") self.rip_button.connect("clicked", self._on_rip_clicked) self.rip_button.set_tooltip_text("Rip the current disc using the selected release metadata") self.rip_button.set_sensitive(False) box.pack_start(self.rip_button, False, False, 0) self.stop_button = Gtk.Button(label="Stop") self.stop_button.connect("clicked", self._on_stop_clicked) self.stop_button.set_tooltip_text("Cancel the current scan or rip") self.stop_button.set_sensitive(False) box.pack_start(self.stop_button, False, False, 0) self.status_label = Gtk.Label(label="Idle", xalign=0) box.pack_start(self.status_label, True, True, 0) return box def _build_progress(self): frame = Gtk.Frame(label="Progress") box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=10) frame.add(box) self.progress_label = Gtk.Label(label="No task running", xalign=0) box.pack_start(self.progress_label, False, False, 0) self.overall_bar = Gtk.ProgressBar(show_text=True) self.overall_bar.set_text("Overall") box.pack_start(self.overall_bar, False, False, 0) self.track_bar = Gtk.ProgressBar(show_text=True) self.track_bar.set_text("Current track") box.pack_start(self.track_bar, False, False, 0) return frame def _build_main_content(self): pane = Gtk.Paned.new(Gtk.Orientation.HORIZONTAL) pane.set_wide_handle(True) pane.pack1(self._build_left_panel(), resize=True, shrink=False) pane.pack2(self._build_right_panel(), resize=True, shrink=False) self.main_pane = pane return pane def _build_left_panel(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) info_frame = Gtk.Frame(label="Disc") info_grid = Gtk.Grid(column_spacing=10, row_spacing=6, margin=10) info_frame.add(info_grid) rows = [ ("Device", "device"), ("Status", "disc_status"), ("CDDB", "cddb"), ("Disc ID", "mbid"), ("Duration", "duration"), ("Tracks", "tracks"), ] for row, (label_text, key) in enumerate(rows): info_grid.attach(Gtk.Label(label=label_text, xalign=0), 0, row, 1, 1) value = Gtk.Label(label="—", xalign=0, selectable=True) info_grid.attach(value, 1, row, 1, 1) self.info_labels[key] = value box.pack_start(info_frame, False, False, 0) release_frame = Gtk.Frame(label="Matching Releases") release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8) release_frame.add(release_box) self.release_store = Gtk.ListStore(str, str, str, str, str, object) self.release_view = Gtk.TreeView(model=self.release_store) self.release_view.get_selection().connect("changed", self._on_release_selected) for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(title, renderer, text=index) column.set_resizable(True) self.release_view.append_column(column) scroll = Gtk.ScrolledWindow() scroll.set_hexpand(True) scroll.set_vexpand(True) scroll.add(self.release_view) release_box.pack_start(scroll, True, True, 0) details_frame = Gtk.Frame(label="Selected Release") details_scroll = Gtk.ScrolledWindow() details_scroll.set_hexpand(True) details_scroll.set_vexpand(False) details_scroll.set_min_content_height(120) details_frame.add(details_scroll) self.release_details = Gtk.Label(label="Select a release to inspect it.", xalign=0, yalign=0) self.release_details.set_line_wrap(True) self.release_details.set_selectable(True) details_scroll.add(self.release_details) release_box.pack_start(details_frame, False, False, 0) box.pack_start(release_frame, True, True, 0) return box def _build_right_panel(self): box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=12) tracks_frame = Gtk.Frame(label="Tracks") tracks_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8, margin=8) tracks_frame.add(tracks_box) self.track_store = Gtk.ListStore(str, str, str, str) self.track_view = Gtk.TreeView(model=self.track_store) for index, title in enumerate(["#", "Artist", "Title", "Length"]): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(title, renderer, text=index) column.set_resizable(True) self.track_view.append_column(column) track_scroll = Gtk.ScrolledWindow() track_scroll.set_hexpand(True) track_scroll.set_vexpand(True) track_scroll.add(self.track_view) tracks_box.pack_start(track_scroll, True, True, 0) box.pack_start(tracks_frame, True, True, 0) box.pack_start(self._build_rip_options(), False, False, 0) return box def _build_rip_options(self): frame = Gtk.Frame(label="Rip Options") grid = Gtk.Grid(column_spacing=10, row_spacing=8, margin=10) frame.add(grid) self.unknown_check = Gtk.CheckButton(label="Allow ripping without metadata") self.unknown_check.set_active(True) self.unknown_check.connect("toggled", self._on_unknown_toggled) grid.attach(self.unknown_check, 0, 0, 2, 1) self.cdr_check = Gtk.CheckButton(label="Allow CD-R") self.cdr_check.connect("toggled", self._on_settings_changed) grid.attach(self.cdr_check, 2, 0, 1, 1) self.keep_going_check = Gtk.CheckButton(label="Keep going on failed tracks") self.keep_going_check.set_active(True) self.keep_going_check.connect("toggled", self._on_settings_changed) grid.attach(self.keep_going_check, 0, 1, 2, 1) self.overread_check = Gtk.CheckButton(label="Force overread") self.overread_check.connect("toggled", self._on_settings_changed) grid.attach(self.overread_check, 2, 1, 1, 1) grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1) self.cover_art_combo = Gtk.ComboBoxText() self.cover_art_combo.append("", "Disabled") self.cover_art_combo.append("file", "Save file") self.cover_art_combo.append("embed", "Embed only") self.cover_art_combo.append("complete", "File + embed") self.cover_art_combo.set_active(0) self.cover_art_combo.connect("changed", self._on_settings_changed) grid.attach(self.cover_art_combo, 1, 2, 1, 1) grid.attach(Gtk.Label(label="Max Retries", xalign=0), 2, 2, 1, 1) self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1) self.max_retries_spin.set_value(5) self.max_retries_spin.connect("value-changed", self._on_settings_changed) grid.attach(self.max_retries_spin, 3, 2, 1, 1) grid.attach(Gtk.Label(label="Offset", xalign=0), 0, 3, 1, 1) self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1) self.offset_spin.set_value(0) self.offset_spin.connect("value-changed", self._on_settings_changed) grid.attach(self.offset_spin, 1, 3, 1, 1) grid.attach(Gtk.Label(label="Logger", xalign=0), 2, 3, 1, 1) self.logger_combo = Gtk.ComboBoxText() for logger_name in sorted(result.getLoggers()): self.logger_combo.append(logger_name, logger_name) self.logger_combo.set_active_id("whipper") self.logger_combo.connect("changed", self._on_settings_changed) grid.attach(self.logger_combo, 3, 3, 1, 1) grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 4, 1, 1) self.track_template_entry = Gtk.Entry() self.track_template_entry.set_text(cd_command.DEFAULT_TRACK_TEMPLATE) self.track_template_entry.connect("changed", self._on_settings_changed) grid.attach(self.track_template_entry, 1, 4, 3, 1) grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 5, 1, 1) self.disc_template_entry = Gtk.Entry() self.disc_template_entry.set_text(cd_command.DEFAULT_DISC_TEMPLATE) self.disc_template_entry.connect("changed", self._on_settings_changed) grid.attach(self.disc_template_entry, 1, 5, 3, 1) note = Gtk.Label( label="The configured drive offset is loaded automatically when whipper knows this drive.", xalign=0, ) note.set_line_wrap(True) grid.attach(note, 0, 6, 4, 1) return frame def _build_log(self): frame = Gtk.Frame(label="Log") scroll = Gtk.ScrolledWindow() scroll.set_hexpand(True) scroll.set_vexpand(True) frame.add(scroll) text_view = Gtk.TextView() text_view.set_editable(False) text_view.set_cursor_visible(False) text_view.set_monospace(True) self.log_buffer = text_view.get_buffer() scroll.add(text_view) return frame def _append_log(self, text): end_iter = self.log_buffer.get_end_iter() self.log_buffer.insert(end_iter, text) def _log_action(self, message, *args, level=logging.INFO): if args: message = message % args logger.log(level, message) if not message.endswith("\n"): message += "\n" self._append_log(message) def _clear_log(self): self.log_buffer.set_text("") def _config_path(self): xdg_config = os.environ.get("XDG_CONFIG_HOME") base = Path(xdg_config) if xdg_config else Path.home() / ".config" return base / "whipper" / "gui.json" def _collect_gui_settings(self): return { "output_directory": self.output_button.get_filename(), "working_directory": self.working_directory_entry.get_text(), "country": self.country_entry.get_text(), "release_id": self.release_id_entry.get_text(), "unknown": self.unknown_check.get_active(), "cdr": self.cdr_check.get_active(), "keep_going": self.keep_going_check.get_active(), "overread": self.overread_check.get_active(), "cover_art": self.cover_art_combo.get_active_id() or "", "max_retries": int(self.max_retries_spin.get_value()), "offset": int(self.offset_spin.get_value()), "logger": self.logger_combo.get_active_id() or "whipper", "track_template": self.track_template_entry.get_text(), "disc_template": self.disc_template_entry.get_text(), "window_width": self.window.get_size()[0] if self.window is not None else None, "window_height": self.window.get_size()[1] if self.window is not None else None, "pane_position": self.main_pane.get_position() if self.main_pane is not None else None, } def _save_gui_settings(self): if self.output_button is None: return path = self._config_path() path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(self._collect_gui_settings(), indent=2), encoding="utf-8") def _load_gui_settings(self): path = self._config_path() if not path.exists(): return try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: return output_directory = data.get("output_directory") if output_directory and os.path.isdir(output_directory): self.output_button.set_filename(output_directory) self.working_directory_entry.set_text(data.get("working_directory", "")) self.country_entry.set_text(data.get("country", "")) self.release_id_entry.set_text(data.get("release_id", "")) self.unknown_check.set_active(bool(data.get("unknown", True))) self.cdr_check.set_active(bool(data.get("cdr", False))) self.keep_going_check.set_active(bool(data.get("keep_going", True))) self.overread_check.set_active(bool(data.get("overread", False))) cover_art = data.get("cover_art", "") if cover_art: self.cover_art_combo.set_active_id(cover_art) else: self.cover_art_combo.set_active(0) retries = data.get("max_retries", 5) if isinstance(retries, int): self.max_retries_spin.set_value(retries) offset = data.get("offset") if isinstance(offset, int): self.offset_spin.set_value(offset) logger_name = data.get("logger", "whipper") if logger_name in result.getLoggers(): self.logger_combo.set_active_id(logger_name) self.track_template_entry.set_text( data.get("track_template", cd_command.DEFAULT_TRACK_TEMPLATE) ) self.disc_template_entry.set_text( data.get("disc_template", cd_command.DEFAULT_DISC_TEMPLATE) ) width = data.get("window_width") height = data.get("window_height") if isinstance(width, int) and isinstance(height, int): self.window.resize(width, height) pane_position = data.get("pane_position") if isinstance(pane_position, int): GLib.idle_add(self.main_pane.set_position, pane_position) def _set_label(self, key, value): self.info_labels[key].set_text(value or "—") def _can_rip(self): has_release = self.current_release is not None or bool(self.release_id_entry.get_text().strip()) return has_release or self.unknown_check.get_active() def _reset_progress(self): self.current_track_number = 0 self.current_track_total = 0 self.overall_bar.set_fraction(0.0) self.overall_bar.set_text("Overall") self.track_bar.set_fraction(0.0) self.track_bar.set_text("Current track") self.progress_label.set_text("No task running") def _set_running_state(self, running, status): self.read_button.set_sensitive(not running) self.refresh_button.set_sensitive(not running) self.rip_button.set_sensitive((not running) and self._can_rip()) self.stop_button.set_sensitive(running) self.status_label.set_text(status) if running: if self.pulse_id == 0: self.pulse_id = GLib.timeout_add(150, self._pulse_progress) elif self.pulse_id: GLib.source_remove(self.pulse_id) self.pulse_id = 0 def _pulse_progress(self): if self.scan_runner is None and self.rip_runner is None: return False self.track_bar.pulse() return True def _refresh_devices(self): current = self.device_combo.get_active_text() self.device_combo.remove_all() devices = drive.getAllDevicePaths() logger.info("refreshing optical drives") for path in devices: self.device_combo.append_text(path) if not devices: self.device_combo.append_text("No drives detected") self.device_combo.set_active(0) self.read_button.set_sensitive(False) self.rip_button.set_sensitive(False) logger.warning("no optical drives detected") else: self.read_button.set_sensitive(True) if current in devices: self.device_combo.set_active(devices.index(current)) else: self.device_combo.set_active(0) logger.info("detected %d optical drive(s); active device: %s", len(devices), self.device_combo.get_active_text()) self._apply_configured_offset() def _selected_device(self): device = self.device_combo.get_active_text() if not device or device == "No drives detected": return None return device def _update_release_store(self, releases): self.release_store.clear() self.current_release = None for metadata in releases: self.release_store.append([ metadata.artist or "", metadata.releaseTitle or metadata.title or "", _release_year(metadata), metadata.releaseType or "", _release_country(metadata), metadata, ]) if releases: self.release_view.get_selection().select_path(0) else: self._update_track_store(None) self._update_release_details(None) def _update_track_store(self, metadata): self.track_store.clear() if metadata is None: return for index, track in enumerate(metadata.tracks, start=1): self.track_store.append([ str(index), track.artist or "", track.title or "", _format_duration_ms(track.duration), ]) def _update_release_details(self, metadata): if metadata is None: self.release_details.set_text("Select a release to inspect it.") return lines = [ "Artist: %s" % (metadata.artist or "Unknown"), "Title: %s" % (metadata.releaseTitle or metadata.title or "Unknown"), "Year: %s" % (_release_year(metadata) or "Unknown"), "Type: %s" % (metadata.releaseType or "Unknown"), "Country: %s" % (_release_country(metadata) or "Unknown"), "Disc: %s/%s" % ( metadata.discNumber if metadata.discNumber is not None else "?", metadata.discTotal if metadata.discTotal is not None else "?", ), "Tracks: %d" % len(metadata.tracks), "Duration: %s" % _format_duration_ms(metadata.duration), "Release MBID: %s" % (metadata.mbid or "Unknown"), "URL: %s" % (metadata.url or "Unknown"), ] if metadata.barcode: lines.append("Barcode: %s" % metadata.barcode) if metadata.catalogNumbers: lines.append("Catalog: %s" % ", ".join(metadata.catalogNumbers)) self.release_details.set_text("\n".join(lines)) def _apply_configured_offset(self): device = self._selected_device() if not device: return info = drive.getDeviceInfo(device) if not info: logger.info("no configured offset information for %s", device) return try: offset = config.Config().getReadOffset(*info) except KeyError: logger.info("configured read offset not found for %s", device) return if offset is not None: self.offset_spin.set_value(int(offset)) logger.info("loaded configured read offset %d for %s", offset, device) def _read_disc_worker(self, device, country): try: self.scan_cancel_requested = False GLib.idle_add(self._clear_log) GLib.idle_add(self._reset_progress) GLib.idle_add(self._log_action, "Reading disc from %s", device) GLib.idle_add(self._set_running_state, True, "Reading disc") GLib.idle_add(self.progress_label.set_text, "Scanning disc and looking up MusicBrainz") runner = CancellableSyncRunner() self.scan_runner = runner GLib.idle_add(self._log_action, "Preparing drive %s", device) utils.load_device(device) utils.unmount_device(device) if drive.get_cdrom_drive_status(device) == 1: raise OSError("No CD detected, please insert one and retry") GLib.idle_add(self._log_action, "Reading disc TOC") toc_task = cdrdao.ReadTOCTask(device, fast_toc=True) runner.run(toc_task, verbose=False) ittoc = toc_task.toc.table cddb = ittoc.getCDDBDiscId() mbid = ittoc.getMusicBrainzDiscId() duration = common.formatTime(ittoc.duration() / 1000.0) track_count = str(ittoc.getAudioTracks()) if self.scan_cancel_requested: raise RuntimeError("Disc scan cancelled") GLib.idle_add( self._log_action, "Querying MusicBrainz for %s%s", mbid, " (country=%s)" % country if country else "", ) try: releases = mbngs.musicbrainz(mbid, country=country or None, record=False) except mbngs.NotFoundException: releases = [] if self.scan_cancel_requested: raise RuntimeError("Disc scan cancelled") releases = sorted(releases, key=lambda md: _release_duration_distance(md, ittoc.duration())) def apply_results(): self.scan_data = { "device": device, "cddb": cddb, "mbid": mbid, "duration": duration, "tracks": track_count, "releases": releases, } self._set_label("device", device) self._set_label("disc_status", "Ready") self._set_label("cddb", cddb) self._set_label("mbid", mbid) self._set_label("duration", duration) self._set_label("tracks", track_count) self._append_log("CDDB disc id: %s\n" % cddb) self._append_log("MusicBrainz disc id: %s\n" % mbid) self._append_log("Disc duration: %s, %s audio tracks\n" % (duration, track_count)) self._log_action("Found %d matching release(s)", len(releases)) self._update_release_store(releases) self._set_running_state(False, "Disc ready") self.progress_label.set_text("Disc metadata loaded") if not releases: self._log_action("No MusicBrainz matches found", level=logging.WARNING) return False GLib.idle_add(apply_results) except Exception as exc: def apply_error(): self.scan_data = None self.current_release = None self.release_store.clear() self.track_store.clear() self._update_release_details(None) self._set_label("device", device) if self.scan_cancel_requested: self._set_label("disc_status", "Cancelled") self._log_action("Disc scan cancelled", level=logging.WARNING) self._set_running_state(False, "Cancelled") self.progress_label.set_text("Disc scan cancelled") else: self._set_label("disc_status", "Error") self._log_action("%s", exc, level=logging.ERROR) self._set_running_state(False, "Failed") self.progress_label.set_text("Disc scan failed") return False GLib.idle_add(apply_error) finally: self.scan_runner = None def _collect_rip_settings(self): retries = int(self.max_retries_spin.get_value()) return { "device": self._selected_device(), "output_directory": self.output_button.get_filename() or os.curdir, "working_directory": self.working_directory_entry.get_text().strip() or None, "country": self.country_entry.get_text().strip() or None, "release_id": self.release_id_entry.get_text().strip() or None, "unknown": self.unknown_check.get_active(), "cdr": self.cdr_check.get_active(), "keep_going": self.keep_going_check.get_active(), "overread": self.overread_check.get_active(), "cover_art": self.cover_art_combo.get_active_id() or None, "max_retries": float("inf") if retries == 0 else retries, "offset": int(self.offset_spin.get_value()), "logger": self.logger_combo.get_active_id() or "whipper", "track_template": self.track_template_entry.get_text() or cd_command.DEFAULT_TRACK_TEMPLATE, "disc_template": self.disc_template_entry.get_text() or cd_command.DEFAULT_DISC_TEMPLATE, } def _validate_rip_settings(self, settings): if not settings["device"]: raise ValueError("No optical drive selected") if not settings["output_directory"]: raise ValueError("Output directory is required") if not os.path.isdir(os.path.expanduser(settings["output_directory"])): raise ValueError("Output directory does not exist") cd_command.validate_template(settings["track_template"], "track") cd_command.validate_template(settings["disc_template"], "disc") if settings["working_directory"] and not os.path.isdir(os.path.expanduser(settings["working_directory"])): raise ValueError("Working directory does not exist") if settings["logger"] not in result.getLoggers(): raise ValueError("Unknown logger '%s'" % settings["logger"]) def _install_gui_log_handler(self): handler = GuiLogHandler(self) whipper_logger = logging.getLogger("whipper") previous_level = whipper_logger.level if previous_level == logging.NOTSET or previous_level > logging.INFO: whipper_logger.setLevel(logging.INFO) whipper_logger.addHandler(handler) return whipper_logger, handler, previous_level @staticmethod def _remove_gui_log_handler(whipper_logger, handler, previous_level): whipper_logger.removeHandler(handler) whipper_logger.setLevel(previous_level) def _resolve_release_metadata(self, mbdiscid, settings): if settings["release_id"]: return mbngs.getReleaseMetadata( settings["release_id"], discid=mbdiscid, country=settings["country"], record=False, ) if self.current_release is not None: return self.current_release if self.scan_data and self.scan_data.get("mbid") == mbdiscid: releases = self.scan_data.get("releases") or [] if releases: return releases[0] return None def _update_rip_task_progress(self, description, item_label, item_index, item_total, value): value = min(max(value, 0.0), 1.0) self.current_track_number = item_index self.current_track_total = item_total self.overall_bar.set_fraction(((item_index - 1) + value) / max(item_total, 1)) self.overall_bar.set_text("Track %d/%d" % (item_index, item_total)) self.track_bar.set_fraction(value) self.track_bar.set_text(description) self.progress_label.set_text("%s: %s" % (item_label, description)) return False def _mark_track_finished(self, item_label, item_index, item_total, skipped=False): self.current_track_number = item_index self.current_track_total = item_total self.overall_bar.set_fraction(item_index / max(item_total, 1)) self.overall_bar.set_text("Track %d/%d" % (item_index, item_total)) self.track_bar.set_fraction(1.0) self.track_bar.set_text("%s %s" % (item_label, "skipped" if skipped else "done")) self.progress_label.set_text("%s %s" % (item_label, "failed" if skipped else "verified")) return False def _finish_rip(self, returncode, status="Done"): if returncode in (0, 5): self.overall_bar.set_fraction(1.0) self.overall_bar.set_text("Complete") self.track_bar.set_fraction(1.0) self.progress_label.set_text("Rip complete") self._set_running_state(False, status) elif self.rip_cancel_requested: self.progress_label.set_text("Rip cancelled") self._set_running_state(False, "Cancelled") else: self.progress_label.set_text("Rip failed") self._set_running_state(False, "Failed") return False def _rip_track(self, runner, program, itable, settings, track_number, item_index, item_total, cover_art_path, skipped_tracks, mbdiscid): if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") track_result = program.result.getTrackResult(track_number) if not track_result: track_result = result.TrackResult() program.result.tracks.append(track_result) path = program.getPath( program.outdir, settings["track_template"], mbdiscid, program.metadata, track_number=track_number, ) + ".flac" track_result.number = track_number track_result.filename = path if track_number > 0: track_result.pregap = itable.tracks[track_number - 1].getPregap() track_result.pre_emphasis = itable.tracks[track_number - 1].pre_emphasis item_label = "HTOA" if track_number == 0 else "Track %d" % track_number if os.path.exists(path): GLib.idle_add(self._append_log, "%s already exists, verifying\n" % item_label) if not program.verifyTrack(runner, track_result): GLib.idle_add(self._append_log, "%s verification failed, reripping\n" % item_label) os.unlink(path) if not os.path.exists(path): track_result.testduration = 0.0 track_result.copyduration = 0.0 tries = 1 while tries <= settings["max_retries"]: if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") extra = "" if tries == 1 else " (try %d)" % tries GLib.idle_add(self._append_log, "%s%s: %s\n" % (item_label, extra, os.path.basename(path))) tag_list = program.getTagList(track_number, mbdiscid) if track_number > 0 and itable.tracks[track_number - 1].isrc is not None: tag_list["ISRC"] = itable.tracks[track_number - 1].isrc rip_task = cdparanoia.ReadVerifyTrackTask( track_result.filename, program.result.table, program.getHTOA()[0] if track_number == 0 else program.result.table.getTrackStart(track_number), program.getHTOA()[1] if track_number == 0 else program.result.table.getTrackEnd(track_number), settings["overread"], offset=settings["offset"], device=settings["device"], taglist=tag_list, what="%s%s" % (item_label.lower(), extra), coverArtPath=cover_art_path, ) listener = GuiTaskListener(self, item_label, item_index, item_total) rip_task.addListener(listener) for child_task in rip_task.tasks: child_task.addListener(listener) try: runner.run(rip_task, verbose=False) if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") track_result.testcrc = rip_task.testchecksum track_result.copycrc = rip_task.copychecksum track_result.peak = rip_task.peak track_result.quality = rip_task.quality track_result.testspeed = rip_task.testspeed track_result.copyspeed = rip_task.copyspeed track_result.testduration += rip_task.testduration track_result.copyduration += rip_task.copyduration if track_result.filename != rip_task.path: track_result.filename = rip_task.path break except Exception as exc: if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") from exc logger.debug("track %s try %d failed: %r", item_label, tries, exc) tries += 1 if tries > settings["max_retries"]: tries -= 1 GLib.idle_add(self._append_log, "%s giving up after %d tries\n" % (item_label, tries)) if settings["keep_going"]: track_result.skipped = True skipped_tracks.append(track_result) GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True) return raise RuntimeError("%s can't be ripped" % item_label) if track_result in skipped_tracks: GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True) return if track_result.testcrc != track_result.copycrc: raise RuntimeError("CRCs did not match for %s" % item_label) if track_number == 0: if track_result.peak == cd_command.SILENT: program.result.table.setFile( 1, 0, None, program.result.table.getTrackStart(1), track_number ) if os.path.exists(track_result.filename): os.unlink(track_result.filename) track_result.filename = None GLib.idle_add(self._append_log, "HTOA discarded, contains digital silence\n") else: program.result.table.setFile( 1, 0, track_result.filename, program.result.table.getTrackStart(1), track_number ) else: program.result.table.setFile( track_number, 1, track_result.filename, program.result.table.getTrackLength(track_number), track_number, ) GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, False) def _rip_disc_worker(self, settings): whipper_logger, log_handler, previous_level = self._install_gui_log_handler() original_cwd = os.getcwd() try: self.rip_cancel_requested = False GLib.idle_add(self._clear_log) GLib.idle_add(self._reset_progress) GLib.idle_add(self._set_running_state, True, "Ripping disc") GLib.idle_add(self.progress_label.set_text, "Preparing rip") GLib.idle_add(self._log_action, "Starting native whipper rip") runner = CancellableSyncRunner() self.rip_runner = runner conf = config.Config() program = Program(conf, record=False) GLib.idle_add( self._log_action, "Rip settings: device=%s output=%s working_dir=%s logger=%s offset=%d overread=%s cover_art=%s retries=%s keep_going=%s cdr=%s unknown=%s", settings["device"], settings["output_directory"], settings["working_directory"] or "-", settings["logger"], settings["offset"], settings["overread"], settings["cover_art"] or "disabled", settings["max_retries"], settings["keep_going"], settings["cdr"], settings["unknown"], ) if settings["working_directory"]: GLib.idle_add(self._log_action, "Changing working directory to %s", settings["working_directory"]) os.chdir(os.path.expanduser(settings["working_directory"])) GLib.idle_add(self._log_action, "Preparing drive %s", settings["device"]) utils.load_device(settings["device"]) utils.unmount_device(settings["device"]) if drive.get_cdrom_drive_status(settings["device"]) == 1: raise OSError("No CD detected, please insert one and retry") GLib.idle_add(self._log_action, "Reading fast TOC") ittoc = program.getFastToc(runner, settings["device"]) program.getRipResult() cddb = ittoc.getCDDBDiscId() mbdiscid = ittoc.getMusicBrainzDiscId() GLib.idle_add(self._append_log, "CDDB disc id: %s\n" % cddb) GLib.idle_add(self._append_log, "MusicBrainz disc id: %s\n" % mbdiscid) GLib.idle_add(self._log_action, "Resolving release metadata") program.metadata = self._resolve_release_metadata(mbdiscid, settings) if program.metadata is None and not settings["unknown"]: raise RuntimeError("Unable to resolve disc metadata. Select a release or enable ripping without metadata.") if program.metadata is not None: program.metadata.discid = mbdiscid GLib.idle_add( self._log_action, "Using release: %s - %s\n" % ( program.metadata.artist or "Unknown Artist", program.metadata.releaseTitle or program.metadata.title or "Unknown Title", ), ) else: GLib.idle_add(self._log_action, "Continuing without release metadata", level=logging.WARNING) program.result.isCdr = cdrdao.DetectCdr(settings["device"]) if program.result.isCdr and not settings["cdr"]: raise RuntimeError("Inserted disc appears to be a CD-R. Enable 'Allow CD-R' to continue.") if program.result.isCdr: GLib.idle_add(self._log_action, "Detected CD-R media", level=logging.WARNING) out_fpath = program.getPath( settings["output_directory"], settings["disc_template"], mbdiscid, program.metadata, ) GLib.idle_add(self._log_action, "Reading full disc table") itable = program.getTable( runner, ittoc.getCDDBDiscId(), ittoc.getMusicBrainzDiscId(), settings["device"], settings["offset"], out_fpath, ) program.result.cdrdaoVersion = cdrdao.version() program.result.cdparanoiaVersion = cdparanoia.getCdParanoiaVersion() info = drive.getDeviceInfo(settings["device"]) if info: try: program.result.cdparanoiaDefeatsCache = conf.getDefeatsCache(*info) except KeyError: pass program.result.artist = program.metadata.artist if program.metadata else "Unknown Artist" program.result.title = program.metadata.releaseTitle if program.metadata else "Unknown Title" _, program.result.vendor, program.result.model, program.result.release = \ cdio.Device(settings["device"]).get_hwinfo() program.result.metadata = program.metadata program.result.offset = settings["offset"] program.result.overread = settings["overread"] program.result.logger = settings["logger"] program.outdir = settings["output_directory"] GLib.idle_add( self._log_action, "Drive info: vendor=%s model=%s release=%s offset=%s overread=%s", program.result.vendor, program.result.model, program.result.release, program.result.offset, program.result.overread, ) disc_name = program.getPath(program.outdir, settings["disc_template"], mbdiscid, program.metadata) dirname = os.path.dirname(disc_name) if os.path.exists(dirname): log_file = disc_name + ".log" if os.path.exists(log_file): raise RuntimeError("output directory %s is a finished rip" % dirname) GLib.idle_add(self._log_action, "Using existing output directory %s", dirname) else: os.makedirs(dirname) GLib.idle_add(self._log_action, "Created output directory %s", dirname) cover_art_path = None if settings["cover_art"] in {"embed", "complete"} and importlib.util.find_spec("PIL") is None: GLib.idle_add(self._log_action, "Cover art embedding requires Pillow; continuing without embedded art", level=logging.WARNING) elif settings["cover_art"] in {"file", "embed", "complete"}: if getattr(program.metadata, "mbid", None): GLib.idle_add(self._log_action, "Fetching cover art with mode %s", settings["cover_art"]) cover_art_path = program.getCoverArt(dirname, program.metadata.mbid) if cover_art_path is not None: GLib.idle_add(self._log_action, "Cover art saved to %s", cover_art_path) else: GLib.idle_add(self._log_action, "Cover art requested but disc metadata is unavailable", level=logging.WARNING) if settings["cover_art"] == "file": embed_cover_art_path = None else: embed_cover_art_path = cover_art_path skipped_tracks = [] rip_numbers = [] if program.getHTOA(): GLib.idle_add(self._log_action, "Hidden Track One Audio detected") rip_numbers.append(0) for index, track in enumerate(itable.tracks, start=1): if track.audio: rip_numbers.append(index) else: GLib.idle_add(self._log_action, "Skipping data track %d, not implemented", index, level=logging.WARNING) track.indexes[1].relative = 0 GLib.idle_add(self._log_action, "Planned rip items: %d", len(rip_numbers)) total_items = len(rip_numbers) for item_index, track_number in enumerate(rip_numbers, start=1): if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") self._rip_track( runner, program, itable, settings, track_number, item_index, total_items, embed_cover_art_path, skipped_tracks, mbdiscid, ) if settings["cover_art"] == "embed" and cover_art_path is not None: os.remove(cover_art_path) GLib.idle_add(self._log_action, "Removed temporary cover art file %s", cover_art_path) if self.rip_cancel_requested: raise RipCancelledError("Rip cancelled") program.skipped_tracks = skipped_tracks or None GLib.idle_add(self._log_action, "Writing CUE sheet") program.writeCue(disc_name) GLib.idle_add(self._log_action, "Writing M3U playlist") program.write_m3u(disc_name) try: GLib.idle_add(self._log_action, "Running AccurateRip image verification") with contextlib.redirect_stdout(io.StringIO()) as stdout_buffer: program.verifyImage(runner, itable) accurip.print_report(program.result) report_output = stdout_buffer.getvalue() if report_output: GLib.idle_add(self._append_log, "\n" + report_output + "\n") except accurip.EntryNotFound: GLib.idle_add(self._log_action, "AccurateRip entry not found", level=logging.WARNING) txt_logger = result.getLoggers()[settings["logger"]]() GLib.idle_add(self._log_action, "Writing rip log with logger '%s'", settings["logger"]) program.writeLog(disc_name, txt_logger) if skipped_tracks: GLib.idle_add(self._log_action, "%d track(s) were skipped during this rip", len(skipped_tracks), level=logging.WARNING) GLib.idle_add(self._finish_rip, 5, "Done with skipped tracks") else: GLib.idle_add(self._log_action, "Rip finished successfully") GLib.idle_add(self._finish_rip, 0, "Done") except Exception as exc: if not isinstance(exc, RipCancelledError): GLib.idle_add(self._log_action, "%s", exc, level=logging.ERROR) GLib.idle_add(self._finish_rip, 1, "Cancelled" if self.rip_cancel_requested else "Failed") finally: self.rip_runner = None self._remove_gui_log_handler(whipper_logger, log_handler, previous_level) os.chdir(original_cwd) def _on_release_selected(self, selection): model, tree_iter = selection.get_selected() if tree_iter is None: self.current_release = None self._update_track_store(None) self._update_release_details(None) else: self.current_release = model.get_value(tree_iter, 5) self._update_track_store(self.current_release) self._update_release_details(self.current_release) self.rip_button.set_sensitive(self._can_rip()) def _on_refresh_clicked(self, _button): self._log_action("Refreshing drive list") self._refresh_devices() def _on_unknown_toggled(self, _button): self._save_gui_settings() self.rip_button.set_sensitive( (self.scan_runner is None) and (self.rip_runner is None) and self._can_rip() ) def _on_settings_changed(self, *_args): self._save_gui_settings() self.rip_button.set_sensitive( (self.scan_runner is None) and (self.rip_runner is None) and self._can_rip() ) def _on_device_changed(self, _combo): self._apply_configured_offset() self._save_gui_settings() device = self._selected_device() if device: self._log_action("Selected device %s", device) def _on_read_clicked(self, _button): device = self._selected_device() if not device: self._log_action("Read requested without an active device", level=logging.WARNING) return country = self.country_entry.get_text().strip() self._log_action( "Read requested for %s%s", device, " with country filter %s" % country if country else "", ) self.worker = threading.Thread( target=self._read_disc_worker, args=(device, country), daemon=True, ) self.worker.start() def _on_rip_clicked(self, _button): if not self._can_rip(): self._log_action("Rip requested without a selected release", level=logging.WARNING) self.status_label.set_text("Missing release") return settings = self._collect_rip_settings() try: self._validate_rip_settings(settings) except Exception as exc: self._log_action("%s", exc, level=logging.ERROR) self.status_label.set_text("Invalid settings") return if self.current_release is not None: self._log_action( "Rip requested for release %s - %s", self.current_release.artist or "Unknown Artist", self.current_release.releaseTitle or self.current_release.title or "Unknown Title", ) else: self._log_action("Rip requested without bound metadata release", level=logging.WARNING) self.worker = threading.Thread( target=self._rip_disc_worker, args=(settings,), daemon=True, ) self.worker.start() def _on_stop_clicked(self, _button): if self.scan_runner is not None: self.scan_cancel_requested = True self._log_action("Stopping scan", level=logging.WARNING) self.scan_runner.cancel() elif self.rip_runner is not None: self.rip_cancel_requested = True self._log_action("Stopping rip", level=logging.WARNING) self.rip_runner.cancel() class GuiLogHandler(logging.Handler): def __init__(self, gui): super().__init__(level=logging.INFO) self.gui = gui self.setFormatter(logging.Formatter("%(message)s")) def emit(self, record): try: message = self.format(record) except Exception: message = record.getMessage() GLib.idle_add(self.gui._append_log, message + "\n") class GuiTaskListener: def __init__(self, gui, item_label, item_index, item_total): self.gui = gui self.item_label = item_label self.item_index = item_index self.item_total = item_total def started(self, task): GLib.idle_add( self.gui._update_rip_task_progress, task.description, self.item_label, self.item_index, self.item_total, task.progress, ) def progressed(self, task, value): GLib.idle_add( self.gui._update_rip_task_progress, task.description, self.item_label, self.item_index, self.item_total, value, ) def described(self, task, description): GLib.idle_add( self.gui._update_rip_task_progress, description, self.item_label, self.item_index, self.item_total, task.progress, ) def stopped(self, task): return None class CancellableSyncRunner(whipper_task.SyncRunner): def cancel(self): loop = getattr(self, "_loop", None) current_task = getattr(self, "_task", None) if loop is None or current_task is None: return def _cancel(): try: abort = getattr(current_task, "abort", None) if callable(abort): abort() else: current_task.stop() except Exception as exc: current_task.setException(exc) self.stopped(current_task) loop.call_soon_threadsafe(_cancel) def main(): try: require_gui_runtime() except RuntimeError as exc: print(str(exc), file=sys.stderr) return 1 app = WhipperGui() return app.run(sys.argv)