Files
whipper-gui/whipper/gui.py
2026-04-18 23:58:01 +03:00

1810 lines
75 KiB
Python

import os
import sys
import threading
import json
import io
import logging
import contextlib
import importlib.util
import tempfile
from pathlib import Path
_GUI_IMPORT_ERROR = None
try:
import cdio
except ImportError as exc:
cdio = None
_GUI_IMPORT_ERROR = exc
try:
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk
except (ImportError, ValueError) as exc:
gi = None
GLib = None
Gtk = None
if _GUI_IMPORT_ERROR is None:
_GUI_IMPORT_ERROR = exc
from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task
from whipper.common.program import Program
from whipper.command import cd as cd_command, offset as offset_command
from whipper.program import arc, cdrdao, cdparanoia, utils
from whipper.result import result
logger = logging.getLogger(__name__)
APP_ID = "com.github.whipper_team.Whipper"
class RipCancelledError(Exception):
pass
def gui_runtime_available():
return _GUI_IMPORT_ERROR is None
def gui_runtime_error_message():
missing = []
if gi is None:
missing.append("PyGObject / GTK 3")
if cdio is None:
missing.append("pycdio")
if not missing:
missing.append("GUI runtime dependencies")
return "whipper-gui requires %s" % " and ".join(missing)
def require_gui_runtime():
if not gui_runtime_available():
raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR
def _iter_icon_candidates():
local_icon = Path(__file__).resolve().parent.parent / "data" / ("%s.svg" % APP_ID)
yield local_icon
data_dirs = []
xdg_data_home = os.environ.get("XDG_DATA_HOME")
if xdg_data_home:
data_dirs.append(Path(xdg_data_home))
data_dirs.extend(Path(path) for path in os.environ.get("XDG_DATA_DIRS", "/usr/local/share:/usr/share").split(":") if path)
for base in data_dirs:
yield base / "icons" / "hicolor" / "scalable" / "apps" / ("%s.svg" % APP_ID)
yield base / "icons" / "hicolor" / "256x256" / "apps" / ("%s.png" % APP_ID)
yield base / "pixmaps" / ("%s.png" % APP_ID)
yield base / "pixmaps" / ("%s.svg" % APP_ID)
def _resolve_icon_path():
for candidate in _iter_icon_candidates():
if candidate.is_file():
return str(candidate)
return None
def _format_duration_ms(duration_ms):
if duration_ms is None:
return ""
return common.formatTime(duration_ms / 1000.0)
def _release_country(metadata):
return ", ".join(metadata.countries) if metadata.countries else ""
def _release_year(metadata):
return metadata.release[:4] if metadata.release else ""
def _release_duration_distance(metadata, duration_ms):
metadata_duration = getattr(metadata, "duration", None)
if metadata_duration is None:
return float("inf")
return abs(metadata_duration - duration_ms)
class WhipperGui(Gtk.Application if Gtk is not None else object):
def __init__(self):
require_gui_runtime()
GLib.set_prgname(APP_ID)
super().__init__(application_id=APP_ID)
GLib.set_application_name("Whipper")
Gtk.Window.set_default_icon_name(APP_ID)
icon_path = _resolve_icon_path()
if icon_path:
Gtk.Window.set_default_icon_from_file(icon_path)
self.window = None
self.main_pane = None
self.worker = None
self.pulse_id = 0
self.scan_data = None
self.scan_runner = None
self.rip_runner = None
self.drive_runner = None
self.scan_cancel_requested = False
self.rip_cancel_requested = False
self.drive_cancel_requested = False
self.current_release = None
self.current_track_number = 0
self.current_track_total = 0
self.device_combo = None
self.output_button = None
self.working_directory_entry = None
self.country_entry = None
self.release_id_entry = None
self.refresh_button = None
self.analyze_button = None
self.find_offset_button = None
self.read_button = None
self.rip_button = None
self.stop_button = None
self.unknown_check = None
self.cdr_check = None
self.keep_going_check = None
self.overread_check = None
self.cover_art_combo = None
self.max_retries_spin = None
self.offset_spin = None
self.logger_combo = None
self.track_template_entry = None
self.disc_template_entry = None
self.compact_mode_label = None
self.release_store = None
self.release_view = None
self.track_store = None
self.track_view = None
self.log_buffer = None
self.release_details = None
self.track_columns = None
self.info_labels = {}
self.status_label = None
self.progress_label = None
self.overall_bar = None
self.track_bar = None
def do_activate(self):
if self.window is None:
self.window = self._build_window()
self.window.present()
def do_shutdown(self):
self._save_gui_settings()
Gtk.Application.do_shutdown(self)
def _build_window(self):
window = Gtk.ApplicationWindow(application=self)
window.set_title("Whipper")
window.set_icon_name(APP_ID)
icon_path = _resolve_icon_path()
if icon_path:
window.set_icon_from_file(icon_path)
window.set_wmclass(APP_ID, APP_ID)
window.set_default_size(1380, 820)
window.set_border_width(6)
root = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
window.add(root)
root.pack_start(self._build_transport_strip(), False, False, 0)
root.pack_start(self._build_progress(), False, False, 0)
root.pack_start(self._build_main_content(), True, True, 0)
self._refresh_devices()
self._load_gui_settings()
window.show_all()
return window
def _build_transport_strip(self):
frame = Gtk.Frame(label="Extraction Control Center")
shell = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10, margin=6)
frame.add(shell)
primary = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
primary.pack_start(self._build_controls(), False, False, 0)
primary.pack_start(self._build_actions(), False, False, 0)
shell.pack_start(primary, True, True, 0)
secondary = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
secondary.pack_start(self._build_disc_info_frame(), False, False, 0)
secondary.pack_start(self._build_rip_options(), False, False, 0)
shell.pack_start(secondary, False, False, 0)
return frame
def _build_controls(self):
grid = Gtk.Grid(column_spacing=8, row_spacing=6)
grid.attach(Gtk.Label(label="Drive", xalign=0), 0, 0, 1, 1)
self.device_combo = Gtk.ComboBoxText()
self.device_combo.connect("changed", self._on_device_changed)
grid.attach(self.device_combo, 1, 0, 1, 1)
self.refresh_button = Gtk.Button(label="Detect Drives")
self.refresh_button.connect("clicked", self._on_refresh_clicked)
self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives")
grid.attach(self.refresh_button, 2, 0, 1, 1)
self.analyze_button = Gtk.Button(label="Analyze Drive")
self.analyze_button.connect("clicked", self._on_analyze_drive_clicked)
self.analyze_button.set_tooltip_text("Probe whether cdparanoia can defeat this drive's audio cache")
grid.attach(self.analyze_button, 3, 0, 1, 1)
self.find_offset_button = Gtk.Button(label="Find Offset")
self.find_offset_button.connect("clicked", self._on_find_offset_clicked)
self.find_offset_button.set_tooltip_text("Detect the configured read offset using an AccurateRip-known disc")
grid.attach(self.find_offset_button, 4, 0, 1, 1)
grid.attach(Gtk.Label(label="Status", xalign=0), 5, 0, 1, 1)
self.status_label = Gtk.Label(label="Idle", xalign=0)
self.status_label.set_selectable(True)
grid.attach(self.status_label, 6, 0, 1, 1)
grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1)
self.output_button = Gtk.FileChooserButton(
title="Select output folder",
action=Gtk.FileChooserAction.SELECT_FOLDER,
)
self.output_button.set_filename(os.path.expanduser("~"))
self.output_button.connect("file-set", self._on_settings_changed)
grid.attach(self.output_button, 1, 1, 2, 1)
grid.attach(Gtk.Label(label="Logger", xalign=0), 3, 1, 1, 1)
self.logger_combo = Gtk.ComboBoxText()
for logger_name in sorted(result.getLoggers()):
self.logger_combo.append(logger_name, logger_name)
self.logger_combo.set_active_id("whipper")
self.logger_combo.connect("changed", self._on_settings_changed)
grid.attach(self.logger_combo, 4, 1, 2, 1)
grid.attach(Gtk.Label(label="Cover Art", xalign=0), 0, 2, 1, 1)
self.cover_art_combo = Gtk.ComboBoxText()
self.cover_art_combo.append("", "Disabled")
self.cover_art_combo.append("file", "Save file")
self.cover_art_combo.append("embed", "Embed only")
self.cover_art_combo.append("complete", "File + embed")
self.cover_art_combo.set_active(0)
self.cover_art_combo.connect("changed", self._on_settings_changed)
grid.attach(self.cover_art_combo, 1, 2, 1, 1)
grid.attach(Gtk.Label(label="Offset", xalign=0), 2, 2, 1, 1)
self.offset_spin = Gtk.SpinButton.new_with_range(-5000, 5000, 1)
self.offset_spin.set_value(0)
self.offset_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.offset_spin, 3, 2, 1, 1)
grid.attach(Gtk.Label(label="Retries", xalign=0), 4, 2, 1, 1)
self.max_retries_spin = Gtk.SpinButton.new_with_range(0, 20, 1)
self.max_retries_spin.set_value(5)
self.max_retries_spin.connect("value-changed", self._on_settings_changed)
grid.attach(self.max_retries_spin, 5, 2, 1, 1)
grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1)
self.working_directory_entry = Gtk.Entry()
self.working_directory_entry.set_placeholder_text("Optional working directory")
self.working_directory_entry.connect("changed", self._on_settings_changed)
grid.attach(self.working_directory_entry, 1, 3, 2, 1)
grid.attach(Gtk.Label(label="Country", xalign=0), 0, 4, 1, 1)
self.country_entry = Gtk.Entry()
self.country_entry.set_placeholder_text("Optional MusicBrainz country filter")
self.country_entry.connect("changed", self._on_settings_changed)
grid.attach(self.country_entry, 1, 4, 1, 1)
grid.attach(Gtk.Label(label="Release ID", xalign=0), 2, 4, 1, 1)
self.release_id_entry = Gtk.Entry()
self.release_id_entry.set_placeholder_text("Optional release override")
self.release_id_entry.connect("changed", self._on_settings_changed)
grid.attach(self.release_id_entry, 3, 4, 3, 1)
return grid
def _build_actions(self):
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8)
self.read_button = Gtk.Button(label="Detect TOC")
self.read_button.connect("clicked", self._on_read_clicked)
self.read_button.set_tooltip_text("Read TOC and fetch matching MusicBrainz releases")
box.pack_start(self.read_button, False, False, 0)
self.rip_button = Gtk.Button(label="Secure Rip")
self.rip_button.connect("clicked", self._on_rip_clicked)
self.rip_button.set_tooltip_text("Rip the current disc using the selected release metadata")
self.rip_button.set_sensitive(False)
box.pack_start(self.rip_button, False, False, 0)
self.stop_button = Gtk.Button(label="Abort")
self.stop_button.connect("clicked", self._on_stop_clicked)
self.stop_button.set_tooltip_text("Cancel the current scan or rip")
self.stop_button.set_sensitive(False)
box.pack_start(self.stop_button, False, False, 0)
box.pack_start(Gtk.Separator(orientation=Gtk.Orientation.VERTICAL), False, False, 6)
self.unknown_check = Gtk.CheckButton(label="Unknown")
self.unknown_check.set_active(True)
self.unknown_check.connect("toggled", self._on_unknown_toggled)
box.pack_start(self.unknown_check, False, False, 0)
self.cdr_check = Gtk.CheckButton(label="CD-R")
self.cdr_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.cdr_check, False, False, 0)
self.keep_going_check = Gtk.CheckButton(label="Keep Going")
self.keep_going_check.set_active(True)
self.keep_going_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.keep_going_check, False, False, 0)
self.overread_check = Gtk.CheckButton(label="Overread")
self.overread_check.connect("toggled", self._on_settings_changed)
box.pack_start(self.overread_check, False, False, 0)
box.pack_start(Gtk.Separator(orientation=Gtk.Orientation.VERTICAL), False, False, 6)
self.compact_mode_label = Gtk.Label(
label="Compact EAC-style flow: Detect TOC -> pick release -> Secure Rip",
xalign=0,
)
box.pack_start(self.compact_mode_label, True, True, 0)
return box
def _build_progress(self):
frame = Gtk.Frame(label="Extraction Progress")
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=4, margin=6)
frame.add(box)
self.progress_label = Gtk.Label(label="No task running", xalign=0)
box.pack_start(self.progress_label, False, False, 0)
self.overall_bar = Gtk.ProgressBar(show_text=True)
self.overall_bar.set_text("Overall")
box.pack_start(self.overall_bar, False, False, 0)
self.track_bar = Gtk.ProgressBar(show_text=True)
self.track_bar.set_text("Current track")
box.pack_start(self.track_bar, False, False, 0)
return frame
def _build_main_content(self):
pane = Gtk.Paned.new(Gtk.Orientation.VERTICAL)
pane.set_wide_handle(True)
pane.pack1(self._build_workspace(), resize=True, shrink=False)
pane.pack2(self._build_log(), resize=False, shrink=False)
pane.set_position(470)
self.main_pane = pane
return pane
def _build_workspace(self):
pane = Gtk.Paned.new(Gtk.Orientation.HORIZONTAL)
pane.set_wide_handle(True)
pane.pack1(self._build_left_panel(), resize=False, shrink=False)
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
pane.set_position(330)
return pane
def _build_disc_info_frame(self):
info_frame = Gtk.Frame(label="Disc")
info_grid = Gtk.Grid(column_spacing=8, row_spacing=4, margin=6)
info_frame.add(info_grid)
rows = [
("Device", "device"),
("Vendor", "vendor"),
("Model", "model"),
("Release", "release"),
("Read offset", "read_offset"),
("Cache defeat", "cache_defeat"),
("Status", "disc_status"),
("CDDB", "cddb"),
("Disc ID", "mbid"),
("Duration", "duration"),
("Tracks", "tracks"),
]
for row, (label_text, key) in enumerate(rows):
info_grid.attach(Gtk.Label(label=label_text, xalign=0), 0, row, 1, 1)
value = Gtk.Label(label="", xalign=0, selectable=True)
info_grid.attach(value, 1, row, 1, 1)
self.info_labels[key] = value
return info_frame
def _build_left_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
release_frame = Gtk.Frame(label="Metadata / Releases")
release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
release_frame.add(release_box)
self.release_store = Gtk.ListStore(str, str, str, str, str, object)
self.release_view = Gtk.TreeView(model=self.release_store)
self.release_view.set_headers_clickable(False)
self.release_view.set_enable_search(False)
self.release_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
self.release_view.get_selection().connect("changed", self._on_release_selected)
for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
if title in {"Artist", "Title"}:
column.set_expand(True)
self.release_view.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(True)
scroll.set_min_content_height(180)
scroll.add(self.release_view)
release_box.pack_start(scroll, True, True, 0)
details_frame = Gtk.Frame(label="Selected Release")
details_scroll = Gtk.ScrolledWindow()
details_scroll.set_hexpand(True)
details_scroll.set_vexpand(False)
details_scroll.set_min_content_height(90)
details_frame.add(details_scroll)
self.release_details = Gtk.Label(label="Select a release to inspect it.", xalign=0, yalign=0)
self.release_details.set_line_wrap(True)
self.release_details.set_selectable(True)
details_scroll.add(self.release_details)
release_box.pack_start(details_frame, False, False, 0)
box.pack_start(release_frame, True, True, 0)
box.set_size_request(330, -1)
return box
def _build_right_panel(self):
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
tracks_frame = Gtk.Frame(label="Track Extraction Matrix")
tracks_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
tracks_frame.add(tracks_box)
self.track_columns = {
"number": 0,
"status": 1,
"artist": 2,
"title": 3,
"length": 4,
"test_crc": 5,
"copy_crc": 6,
"accuraterip": 7,
}
self.track_store = Gtk.ListStore(str, str, str, str, str, str, str, str)
self.track_view = Gtk.TreeView(model=self.track_store)
self.track_view.set_headers_clickable(False)
self.track_view.set_enable_search(False)
self.track_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
for index, title in enumerate(["#", "Status", "Artist", "Title", "Length", "Test CRC", "Copy CRC", "AR"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=index)
column.set_resizable(True)
if title in {"Artist", "Title"}:
column.set_expand(True)
if title == "Status":
column.set_min_width(140)
self.track_view.append_column(column)
track_scroll = Gtk.ScrolledWindow()
track_scroll.set_hexpand(True)
track_scroll.set_vexpand(True)
track_scroll.set_min_content_height(220)
track_scroll.add(self.track_view)
tracks_box.pack_start(track_scroll, True, True, 0)
box.pack_start(tracks_frame, True, True, 0)
return box
def _build_rip_options(self):
frame = Gtk.Frame(label="Extraction Setup")
grid = Gtk.Grid(column_spacing=8, row_spacing=4, margin=6)
frame.add(grid)
grid.attach(Gtk.Label(label="Track Tpl", xalign=0), 0, 0, 1, 1)
self.track_template_entry = Gtk.Entry()
self.track_template_entry.set_text(cd_command.DEFAULT_TRACK_TEMPLATE)
self.track_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.track_template_entry, 1, 0, 3, 1)
grid.attach(Gtk.Label(label="Disc Tpl", xalign=0), 0, 1, 1, 1)
self.disc_template_entry = Gtk.Entry()
self.disc_template_entry.set_text(cd_command.DEFAULT_DISC_TEMPLATE)
self.disc_template_entry.connect("changed", self._on_settings_changed)
grid.attach(self.disc_template_entry, 1, 1, 3, 1)
note = Gtk.Label(
label="Templates are kept here so the lower workspace can stay focused on releases, tracks and log visibility.",
xalign=0,
)
note.set_line_wrap(True)
grid.attach(note, 0, 2, 4, 1)
return frame
def _build_log(self):
frame = Gtk.Frame(label="Extraction Log")
scroll = Gtk.ScrolledWindow()
scroll.set_hexpand(True)
scroll.set_vexpand(False)
scroll.set_min_content_height(150)
frame.add(scroll)
text_view = Gtk.TextView()
text_view.set_editable(False)
text_view.set_cursor_visible(False)
text_view.set_monospace(True)
self.log_buffer = text_view.get_buffer()
scroll.add(text_view)
return frame
def _append_log(self, text):
end_iter = self.log_buffer.get_end_iter()
self.log_buffer.insert(end_iter, text)
def _log_action(self, message, *args, level=logging.INFO):
if args:
message = message % args
logger.log(level, message)
if not message.endswith("\n"):
message += "\n"
self._append_log(message)
def _clear_log(self):
self.log_buffer.set_text("")
def _config_path(self):
xdg_config = os.environ.get("XDG_CONFIG_HOME")
base = Path(xdg_config) if xdg_config else Path.home() / ".config"
return base / "whipper" / "gui.json"
def _collect_gui_settings(self):
return {
"output_directory": self.output_button.get_filename(),
"working_directory": self.working_directory_entry.get_text(),
"country": self.country_entry.get_text(),
"release_id": self.release_id_entry.get_text(),
"unknown": self.unknown_check.get_active(),
"cdr": self.cdr_check.get_active(),
"keep_going": self.keep_going_check.get_active(),
"overread": self.overread_check.get_active(),
"cover_art": self.cover_art_combo.get_active_id() or "",
"max_retries": int(self.max_retries_spin.get_value()),
"offset": int(self.offset_spin.get_value()),
"logger": self.logger_combo.get_active_id() or "whipper",
"track_template": self.track_template_entry.get_text(),
"disc_template": self.disc_template_entry.get_text(),
"window_width": self.window.get_size()[0] if self.window is not None else None,
"window_height": self.window.get_size()[1] if self.window is not None else None,
"pane_position": self.main_pane.get_position() if self.main_pane is not None else None,
}
def _save_gui_settings(self):
if self.output_button is None:
return
path = self._config_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(self._collect_gui_settings(), indent=2), encoding="utf-8")
def _load_gui_settings(self):
path = self._config_path()
if not path.exists():
return
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return
output_directory = data.get("output_directory")
if output_directory and os.path.isdir(output_directory):
self.output_button.set_filename(output_directory)
self.working_directory_entry.set_text(data.get("working_directory", ""))
self.country_entry.set_text(data.get("country", ""))
self.release_id_entry.set_text(data.get("release_id", ""))
self.unknown_check.set_active(bool(data.get("unknown", True)))
self.cdr_check.set_active(bool(data.get("cdr", False)))
self.keep_going_check.set_active(bool(data.get("keep_going", True)))
self.overread_check.set_active(bool(data.get("overread", False)))
cover_art = data.get("cover_art", "")
if cover_art:
self.cover_art_combo.set_active_id(cover_art)
else:
self.cover_art_combo.set_active(0)
retries = data.get("max_retries", 5)
if isinstance(retries, int):
self.max_retries_spin.set_value(retries)
offset = data.get("offset")
if isinstance(offset, int):
self.offset_spin.set_value(offset)
logger_name = data.get("logger", "whipper")
if logger_name in result.getLoggers():
self.logger_combo.set_active_id(logger_name)
self.track_template_entry.set_text(
data.get("track_template", cd_command.DEFAULT_TRACK_TEMPLATE)
)
self.disc_template_entry.set_text(
data.get("disc_template", cd_command.DEFAULT_DISC_TEMPLATE)
)
width = data.get("window_width")
height = data.get("window_height")
if isinstance(width, int) and isinstance(height, int):
self.window.resize(width, height)
pane_position = data.get("pane_position")
if isinstance(pane_position, int):
GLib.idle_add(self.main_pane.set_position, pane_position)
def _set_label(self, key, value):
self.info_labels[key].set_text(value or "")
def _can_rip(self):
has_release = self.current_release is not None or bool(self.release_id_entry.get_text().strip())
return has_release or self.unknown_check.get_active()
def _reset_progress(self):
self.current_track_number = 0
self.current_track_total = 0
self.overall_bar.set_fraction(0.0)
self.overall_bar.set_text("Overall")
self.track_bar.set_fraction(0.0)
self.track_bar.set_text("Current track")
self.progress_label.set_text("No task running")
def _set_running_state(self, running, status):
self.read_button.set_sensitive(not running)
self.refresh_button.set_sensitive(not running)
self.analyze_button.set_sensitive(not running)
self.find_offset_button.set_sensitive(not running)
self.rip_button.set_sensitive((not running) and self._can_rip())
self.stop_button.set_sensitive(running)
self.status_label.set_text(status)
if running:
if self.pulse_id == 0:
self.pulse_id = GLib.timeout_add(150, self._pulse_progress)
elif self.pulse_id:
GLib.source_remove(self.pulse_id)
self.pulse_id = 0
def _pulse_progress(self):
if self.scan_runner is None and self.rip_runner is None and self.drive_runner is None:
return False
self.track_bar.pulse()
return True
def _refresh_devices(self):
current = self.device_combo.get_active_text()
self.device_combo.remove_all()
devices = drive.getAllDevicePaths()
logger.info("refreshing optical drives")
for path in devices:
self.device_combo.append_text(path)
if not devices:
self.device_combo.append_text("No drives detected")
self.device_combo.set_active(0)
self.read_button.set_sensitive(False)
self.rip_button.set_sensitive(False)
self.analyze_button.set_sensitive(False)
self.find_offset_button.set_sensitive(False)
logger.warning("no optical drives detected")
self._update_drive_info(None)
else:
self.read_button.set_sensitive(True)
self.analyze_button.set_sensitive(True)
self.find_offset_button.set_sensitive(True)
if current in devices:
self.device_combo.set_active(devices.index(current))
else:
self.device_combo.set_active(0)
logger.info("detected %d optical drive(s); active device: %s",
len(devices), self.device_combo.get_active_text())
self._apply_configured_offset()
self._update_drive_info(self.device_combo.get_active_text())
def _selected_device(self):
device = self.device_combo.get_active_text()
if not device or device == "No drives detected":
return None
return device
def _update_drive_info(self, device):
self._set_label("device", device)
for key in ["vendor", "model", "release", "read_offset", "cache_defeat"]:
self._set_label(key, None)
if not device:
return
try:
info = drive.getDeviceInfo(device)
except Exception:
logger.exception("failed to read drive info for %s", device)
return
if not info:
return
vendor, model, release = info
self._set_label("vendor", vendor)
self._set_label("model", model)
self._set_label("release", release)
conf = config.Config()
try:
self._set_label("read_offset", str(conf.getReadOffset(vendor, model, release)))
except KeyError:
self._set_label("read_offset", "Unknown")
try:
defeats_cache = conf.getDefeatsCache(vendor, model, release)
self._set_label("cache_defeat", "Yes" if defeats_cache else "No")
except KeyError:
self._set_label("cache_defeat", "Unknown")
@staticmethod
def _parse_offset_candidates(offsets_text):
offsets = []
for block in offsets_text.split(","):
block = block.strip()
if not block:
continue
if ":" in block:
start, end = block.split(":")
offsets.extend(range(int(start), int(end) + 1))
else:
offsets.append(int(block))
return offsets
def _update_release_store(self, releases):
self.release_store.clear()
self.current_release = None
for metadata in releases:
self.release_store.append([
metadata.artist or "Unknown Artist",
metadata.releaseTitle or metadata.title or "Unknown Release",
_release_year(metadata),
metadata.releaseType or "Unknown",
_release_country(metadata) or "",
metadata,
])
if releases:
self.release_view.get_selection().select_path(0)
self.release_view.set_cursor(Gtk.TreePath.new_first())
self.release_view.scroll_to_cell(Gtk.TreePath.new_first(), None, False, 0.0, 0.0)
else:
self._update_track_store(None)
self._update_release_details(None)
def _update_track_store(self, metadata):
self.track_store.clear()
if metadata is None:
return
for index, track in enumerate(metadata.tracks, start=1):
self.track_store.append([
str(index),
"Ready",
track.artist or "",
track.title or "",
_format_duration_ms(track.duration),
"",
"",
"",
])
def _set_track_field(self, track_number, field, value):
if track_number <= 0 or self.track_store is None or self.track_columns is None:
return False
row_index = track_number - 1
if row_index < 0 or row_index >= len(self.track_store):
return False
self.track_store[row_index][self.track_columns[field]] = value
return False
def _update_release_details(self, metadata):
if metadata is None:
self.release_details.set_text("Select a release to inspect it.")
return
lines = [
"Artist: %s" % (metadata.artist or "Unknown"),
"Title: %s" % (metadata.releaseTitle or metadata.title or "Unknown"),
"Year: %s" % (_release_year(metadata) or "Unknown"),
"Type: %s" % (metadata.releaseType or "Unknown"),
"Country: %s" % (_release_country(metadata) or "Unknown"),
"Disc: %s/%s" % (
metadata.discNumber if metadata.discNumber is not None else "?",
metadata.discTotal if metadata.discTotal is not None else "?",
),
"Tracks: %d" % len(metadata.tracks),
"Duration: %s" % _format_duration_ms(metadata.duration),
"Release MBID: %s" % (metadata.mbid or "Unknown"),
"URL: %s" % (metadata.url or "Unknown"),
]
if metadata.barcode:
lines.append("Barcode: %s" % metadata.barcode)
if metadata.catalogNumbers:
lines.append("Catalog: %s" % ", ".join(metadata.catalogNumbers))
self.release_details.set_text("\n".join(lines))
def _apply_configured_offset(self):
device = self._selected_device()
if not device:
return
info = drive.getDeviceInfo(device)
if not info:
logger.info("no configured offset information for %s", device)
return
try:
offset = config.Config().getReadOffset(*info)
except KeyError:
logger.info("configured read offset not found for %s", device)
return
if offset is not None:
self.offset_spin.set_value(int(offset))
logger.info("loaded configured read offset %d for %s", offset, device)
def _analyze_drive_worker(self, device):
try:
self.drive_cancel_requested = False
GLib.idle_add(self._log_action, "Drive analysis requested for %s", device)
GLib.idle_add(self._set_running_state, True, "Analyzing drive")
GLib.idle_add(self.progress_label.set_text, "Analyzing drive cache behaviour")
runner = CancellableSyncRunner()
self.drive_runner = runner
analyze_task = cdparanoia.AnalyzeTask(device)
runner.run(analyze_task)
if self.drive_cancel_requested:
raise RuntimeError("Drive analysis cancelled")
if analyze_task.defeatsCache is None:
raise RuntimeError("Cannot analyze the drive; insert an audio CD and retry.")
info = drive.getDeviceInfo(device)
if info:
config.Config().setDefeatsCache(info[0], info[1], info[2], analyze_task.defeatsCache)
def apply_results():
self._log_action(
"Drive cache analysis result: cdparanoia %s defeat the audio cache",
"can" if analyze_task.defeatsCache else "cannot",
)
self._update_drive_info(device)
self._set_running_state(False, "Drive analyzed")
self.progress_label.set_text("Drive analysis complete")
return False
GLib.idle_add(apply_results)
except Exception as exc:
def apply_error():
if self.drive_cancel_requested:
self._log_action("Drive analysis cancelled", level=logging.WARNING)
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Drive analysis cancelled")
else:
self._log_action("%s", exc, level=logging.ERROR)
self._set_running_state(False, "Drive analysis failed")
self.progress_label.set_text("Drive analysis failed")
return False
GLib.idle_add(apply_error)
finally:
self.drive_runner = None
def _find_offset_worker(self, device):
try:
self.drive_cancel_requested = False
GLib.idle_add(self._log_action, "Offset detection requested for %s", device)
GLib.idle_add(self._set_running_state, True, "Finding offset")
GLib.idle_add(self.progress_label.set_text, "Finding drive read offset")
runner = CancellableSyncRunner()
self.drive_runner = runner
utils.load_device(device)
utils.unmount_device(device)
toc_task = cdrdao.ReadTOCTask(device)
runner.run(toc_task)
table = toc_task.toc.table
if len(table.tracks) < 3:
raise RuntimeError("Offset detection needs a CD with at least 3 tracks.")
responses = accurip.get_db_entry(table.accuraterip_path())
offsets = self._parse_offset_candidates(offset_command.OFFSETS)
def match(archecksums, track_number):
for index, response in enumerate(responses):
for checksum in archecksums:
if checksum == response.checksums[track_number - 1]:
return checksum, index
return None, None
def rip_arcs(track_number, offset_value):
fd, path = tempfile.mkstemp(
suffix=".track%02d.offset%d.whipper.wav" % (track_number, offset_value)
)
os.close(fd)
try:
read_task = cdparanoia.ReadTrackTask(
path,
table,
table.getTrackStart(track_number),
table.getTrackEnd(track_number),
overread=False,
offset=offset_value,
device=device,
)
read_task.description = "Ripping track %d with read offset %d" % (
track_number,
offset_value,
)
runner.run(read_task)
v1, v2 = arc.accuraterip_checksum(path, track_number, len(table.tracks))
return "%08x" % v1, "%08x" % v2
finally:
if os.path.exists(path):
os.unlink(path)
found_offset = None
for offset_value in offsets:
if self.drive_cancel_requested:
raise RuntimeError("Offset detection cancelled")
GLib.idle_add(self._log_action, "Trying read offset %d", offset_value)
try:
checksums = rip_arcs(1, offset_value)
except Exception as exc:
GLib.idle_add(self._log_action, "Cannot rip with offset %d: %s", offset_value, exc, level=logging.WARNING)
continue
checksum, response_index = match(checksums, 1)
if not checksum:
continue
GLib.idle_add(self._log_action, "Potential offset %d matched response %d; confirming", offset_value, response_index)
matched_tracks = 1
for track_number in range(2, len(table.tracks)):
if self.drive_cancel_requested:
raise RuntimeError("Offset detection cancelled")
try:
checksums = rip_arcs(track_number, offset_value)
except Exception as exc:
GLib.idle_add(self._log_action, "Track %d failed for offset %d: %s", track_number, offset_value, exc, level=logging.WARNING)
continue
checksum, _ = match(checksums, track_number)
if checksum:
matched_tracks += 1
if matched_tracks == len(table.tracks) - 1:
found_offset = offset_value
break
if found_offset is None:
raise RuntimeError("No matching offset found. Try another AccurateRip-enabled disc.")
info = drive.getDeviceInfo(device)
if info:
config.Config().setReadOffset(info[0], info[1], info[2], found_offset)
def apply_results():
self.offset_spin.set_value(found_offset)
self._log_action("Read offset of device is %d", found_offset)
self._update_drive_info(device)
self._set_running_state(False, "Offset found")
self.progress_label.set_text("Drive offset detection complete")
return False
GLib.idle_add(apply_results)
except accurip.EntryNotFound:
def apply_not_found():
self._log_action("AccurateRip entry not found; try another disc for offset detection", level=logging.WARNING)
self._set_running_state(False, "Offset unavailable")
self.progress_label.set_text("No AccurateRip entry for this disc")
return False
GLib.idle_add(apply_not_found)
except Exception as exc:
def apply_error():
if self.drive_cancel_requested:
self._log_action("Offset detection cancelled", level=logging.WARNING)
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Offset detection cancelled")
else:
self._log_action("%s", exc, level=logging.ERROR)
self._set_running_state(False, "Offset detection failed")
self.progress_label.set_text("Offset detection failed")
return False
GLib.idle_add(apply_error)
finally:
self.drive_runner = None
def _read_disc_worker(self, device, country):
try:
self.scan_cancel_requested = False
GLib.idle_add(self._clear_log)
GLib.idle_add(self._reset_progress)
GLib.idle_add(self._log_action, "Reading disc from %s", device)
GLib.idle_add(self._set_running_state, True, "Reading disc")
GLib.idle_add(self.progress_label.set_text, "Scanning disc and looking up MusicBrainz")
runner = CancellableSyncRunner()
self.scan_runner = runner
GLib.idle_add(self._log_action, "Preparing drive %s", device)
utils.load_device(device)
utils.unmount_device(device)
if drive.get_cdrom_drive_status(device) == 1:
raise OSError("No CD detected, please insert one and retry")
GLib.idle_add(self._log_action, "Reading disc TOC")
toc_task = cdrdao.ReadTOCTask(device, fast_toc=True)
runner.run(toc_task, verbose=False)
ittoc = toc_task.toc.table
cddb = ittoc.getCDDBDiscId()
mbid = ittoc.getMusicBrainzDiscId()
duration = common.formatTime(ittoc.duration() / 1000.0)
track_count = str(ittoc.getAudioTracks())
if self.scan_cancel_requested:
raise RuntimeError("Disc scan cancelled")
GLib.idle_add(
self._log_action,
"Querying MusicBrainz for %s%s",
mbid,
" (country=%s)" % country if country else "",
)
try:
releases = mbngs.musicbrainz(mbid, country=country or None, record=False)
except mbngs.NotFoundException:
releases = []
if self.scan_cancel_requested:
raise RuntimeError("Disc scan cancelled")
releases = sorted(releases, key=lambda md: _release_duration_distance(md, ittoc.duration()))
def apply_results():
self.scan_data = {
"device": device,
"cddb": cddb,
"mbid": mbid,
"duration": duration,
"tracks": track_count,
"releases": releases,
}
self._update_drive_info(device)
self._set_label("disc_status", "Ready")
self._set_label("cddb", cddb)
self._set_label("mbid", mbid)
self._set_label("duration", duration)
self._set_label("tracks", track_count)
self._append_log("CDDB disc id: %s\n" % cddb)
self._append_log("MusicBrainz disc id: %s\n" % mbid)
self._append_log("Disc duration: %s, %s audio tracks\n" % (duration, track_count))
self._log_action("Found %d matching release(s)", len(releases))
self._update_release_store(releases)
self._set_running_state(False, "Disc ready")
self.progress_label.set_text("Disc metadata loaded")
if not releases:
self._log_action("No MusicBrainz matches found", level=logging.WARNING)
return False
GLib.idle_add(apply_results)
except Exception as exc:
def apply_error():
self.scan_data = None
self.current_release = None
self.release_store.clear()
self.track_store.clear()
self._update_release_details(None)
self._update_drive_info(device)
if self.scan_cancel_requested:
self._set_label("disc_status", "Cancelled")
self._log_action("Disc scan cancelled", level=logging.WARNING)
self._set_running_state(False, "Cancelled")
self.progress_label.set_text("Disc scan cancelled")
else:
self._set_label("disc_status", "Error")
self._log_action("%s", exc, level=logging.ERROR)
self._set_running_state(False, "Failed")
self.progress_label.set_text("Disc scan failed")
return False
GLib.idle_add(apply_error)
finally:
self.scan_runner = None
def _collect_rip_settings(self):
retries = int(self.max_retries_spin.get_value())
return {
"device": self._selected_device(),
"output_directory": self.output_button.get_filename() or os.curdir,
"working_directory": self.working_directory_entry.get_text().strip() or None,
"country": self.country_entry.get_text().strip() or None,
"release_id": self.release_id_entry.get_text().strip() or None,
"unknown": self.unknown_check.get_active(),
"cdr": self.cdr_check.get_active(),
"keep_going": self.keep_going_check.get_active(),
"overread": self.overread_check.get_active(),
"cover_art": self.cover_art_combo.get_active_id() or None,
"max_retries": float("inf") if retries == 0 else retries,
"offset": int(self.offset_spin.get_value()),
"logger": self.logger_combo.get_active_id() or "whipper",
"track_template": self.track_template_entry.get_text() or cd_command.DEFAULT_TRACK_TEMPLATE,
"disc_template": self.disc_template_entry.get_text() or cd_command.DEFAULT_DISC_TEMPLATE,
}
def _validate_rip_settings(self, settings):
if not settings["device"]:
raise ValueError("No optical drive selected")
if not settings["output_directory"]:
raise ValueError("Output directory is required")
if not os.path.isdir(os.path.expanduser(settings["output_directory"])):
raise ValueError("Output directory does not exist")
cd_command.validate_template(settings["track_template"], "track")
cd_command.validate_template(settings["disc_template"], "disc")
if settings["working_directory"] and not os.path.isdir(os.path.expanduser(settings["working_directory"])):
raise ValueError("Working directory does not exist")
if settings["logger"] not in result.getLoggers():
raise ValueError("Unknown logger '%s'" % settings["logger"])
def _install_gui_log_handler(self):
handler = GuiLogHandler(self)
whipper_logger = logging.getLogger("whipper")
previous_level = whipper_logger.level
if previous_level == logging.NOTSET or previous_level > logging.INFO:
whipper_logger.setLevel(logging.INFO)
whipper_logger.addHandler(handler)
return whipper_logger, handler, previous_level
@staticmethod
def _remove_gui_log_handler(whipper_logger, handler, previous_level):
whipper_logger.removeHandler(handler)
whipper_logger.setLevel(previous_level)
def _resolve_release_metadata(self, mbdiscid, settings):
if settings["release_id"]:
return mbngs.getReleaseMetadata(
settings["release_id"],
discid=mbdiscid,
country=settings["country"],
record=False,
)
if self.current_release is not None:
return self.current_release
if self.scan_data and self.scan_data.get("mbid") == mbdiscid:
releases = self.scan_data.get("releases") or []
if releases:
return releases[0]
return None
def _update_rip_task_progress(self, description, item_label, item_index, item_total, value):
value = min(max(value, 0.0), 1.0)
self.current_track_number = item_index
self.current_track_total = item_total
self.overall_bar.set_fraction(((item_index - 1) + value) / max(item_total, 1))
self.overall_bar.set_text("Track %d/%d" % (item_index, item_total))
self.track_bar.set_fraction(value)
self.track_bar.set_text(description)
self.progress_label.set_text("%s: %s" % (item_label, description))
if item_label.startswith("Track "):
try:
track_number = int(item_label.split()[1])
except (IndexError, ValueError):
track_number = None
if track_number is not None:
percent = int(value * 100)
self._set_track_field(track_number, "status", "%s %d%%" % (description, percent))
return False
def _mark_track_finished(self, item_label, item_index, item_total, skipped=False):
self.current_track_number = item_index
self.current_track_total = item_total
self.overall_bar.set_fraction(item_index / max(item_total, 1))
self.overall_bar.set_text("Track %d/%d" % (item_index, item_total))
self.track_bar.set_fraction(1.0)
self.track_bar.set_text("%s %s" % (item_label, "skipped" if skipped else "done"))
self.progress_label.set_text("%s %s" % (item_label, "failed" if skipped else "verified"))
if item_label.startswith("Track "):
try:
track_number = int(item_label.split()[1])
except (IndexError, ValueError):
track_number = None
if track_number is not None:
self._set_track_field(track_number, "status", "Skipped" if skipped else "Verified")
return False
def _finish_rip(self, returncode, status="Done"):
if returncode in (0, 5):
self.overall_bar.set_fraction(1.0)
self.overall_bar.set_text("Complete")
self.track_bar.set_fraction(1.0)
self.progress_label.set_text("Rip complete")
self._set_running_state(False, status)
elif self.rip_cancel_requested:
self.progress_label.set_text("Rip cancelled")
self._set_running_state(False, "Cancelled")
else:
self.progress_label.set_text("Rip failed")
self._set_running_state(False, "Failed")
return False
def _rip_track(self, runner, program, itable, settings, track_number, item_index, item_total,
cover_art_path, skipped_tracks, mbdiscid):
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
track_result = program.result.getTrackResult(track_number)
if not track_result:
track_result = result.TrackResult()
program.result.tracks.append(track_result)
path = program.getPath(
program.outdir,
settings["track_template"],
mbdiscid,
program.metadata,
track_number=track_number,
) + ".flac"
track_result.number = track_number
track_result.filename = path
if track_number > 0:
track_result.pregap = itable.tracks[track_number - 1].getPregap()
track_result.pre_emphasis = itable.tracks[track_number - 1].pre_emphasis
item_label = "HTOA" if track_number == 0 else "Track %d" % track_number
if os.path.exists(path):
GLib.idle_add(self._append_log, "%s already exists, verifying\n" % item_label)
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "status", "Verifying existing")
if not program.verifyTrack(runner, track_result):
GLib.idle_add(self._append_log, "%s verification failed, reripping\n" % item_label)
os.unlink(path)
if not os.path.exists(path):
track_result.testduration = 0.0
track_result.copyduration = 0.0
tries = 1
while tries <= settings["max_retries"]:
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
extra = "" if tries == 1 else " (try %d)" % tries
GLib.idle_add(self._append_log, "%s%s: %s\n" % (item_label, extra, os.path.basename(path)))
if track_number > 0:
GLib.idle_add(
self._set_track_field,
track_number,
"status",
"Test & Copy%s" % extra,
)
tag_list = program.getTagList(track_number, mbdiscid)
if track_number > 0 and itable.tracks[track_number - 1].isrc is not None:
tag_list["ISRC"] = itable.tracks[track_number - 1].isrc
rip_task = cdparanoia.ReadVerifyTrackTask(
track_result.filename,
program.result.table,
program.getHTOA()[0] if track_number == 0 else program.result.table.getTrackStart(track_number),
program.getHTOA()[1] if track_number == 0 else program.result.table.getTrackEnd(track_number),
settings["overread"],
offset=settings["offset"],
device=settings["device"],
taglist=tag_list,
what="%s%s" % (item_label.lower(), extra),
coverArtPath=cover_art_path,
)
listener = GuiTaskListener(self, item_label, item_index, item_total)
rip_task.addListener(listener)
for child_task in rip_task.tasks:
child_task.addListener(listener)
try:
runner.run(rip_task, verbose=False)
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
track_result.testcrc = rip_task.testchecksum
track_result.copycrc = rip_task.copychecksum
track_result.peak = rip_task.peak
track_result.quality = rip_task.quality
track_result.testspeed = rip_task.testspeed
track_result.copyspeed = rip_task.copyspeed
track_result.testduration += rip_task.testduration
track_result.copyduration += rip_task.copyduration
if track_result.filename != rip_task.path:
track_result.filename = rip_task.path
if track_number > 0:
GLib.idle_add(
self._set_track_field,
track_number,
"test_crc",
"%08X" % track_result.testcrc if track_result.testcrc is not None else "",
)
GLib.idle_add(
self._set_track_field,
track_number,
"copy_crc",
"%08X" % track_result.copycrc if track_result.copycrc is not None else "",
)
break
except Exception as exc:
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled") from exc
logger.debug("track %s try %d failed: %r", item_label, tries, exc)
tries += 1
if tries > settings["max_retries"]:
tries -= 1
GLib.idle_add(self._append_log, "%s giving up after %d tries\n" % (item_label, tries))
if settings["keep_going"]:
track_result.skipped = True
skipped_tracks.append(track_result)
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "Skipped")
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True)
return
raise RuntimeError("%s can't be ripped" % item_label)
if track_result in skipped_tracks:
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, True)
return
if track_result.testcrc != track_result.copycrc:
if track_number > 0:
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "CRC mismatch")
raise RuntimeError("CRCs did not match for %s" % item_label)
if track_number == 0:
if track_result.peak == cd_command.SILENT:
program.result.table.setFile(
1, 0, None, program.result.table.getTrackStart(1), track_number
)
if os.path.exists(track_result.filename):
os.unlink(track_result.filename)
track_result.filename = None
GLib.idle_add(self._append_log, "HTOA discarded, contains digital silence\n")
else:
program.result.table.setFile(
1, 0, track_result.filename, program.result.table.getTrackStart(1), track_number
)
else:
program.result.table.setFile(
track_number,
1,
track_result.filename,
program.result.table.getTrackLength(track_number),
track_number,
)
GLib.idle_add(self._set_track_field, track_number, "accuraterip", "Pending AR")
GLib.idle_add(self._mark_track_finished, item_label, item_index, item_total, False)
def _rip_disc_worker(self, settings):
whipper_logger, log_handler, previous_level = self._install_gui_log_handler()
original_cwd = os.getcwd()
try:
self.rip_cancel_requested = False
GLib.idle_add(self._clear_log)
GLib.idle_add(self._reset_progress)
GLib.idle_add(self._set_running_state, True, "Ripping disc")
GLib.idle_add(self.progress_label.set_text, "Preparing rip")
GLib.idle_add(self._log_action, "Starting native whipper rip")
runner = CancellableSyncRunner()
self.rip_runner = runner
conf = config.Config()
program = Program(conf, record=False)
GLib.idle_add(
self._log_action,
"Rip settings: device=%s output=%s working_dir=%s logger=%s offset=%d overread=%s cover_art=%s retries=%s keep_going=%s cdr=%s unknown=%s",
settings["device"],
settings["output_directory"],
settings["working_directory"] or "-",
settings["logger"],
settings["offset"],
settings["overread"],
settings["cover_art"] or "disabled",
settings["max_retries"],
settings["keep_going"],
settings["cdr"],
settings["unknown"],
)
if settings["working_directory"]:
GLib.idle_add(self._log_action, "Changing working directory to %s", settings["working_directory"])
os.chdir(os.path.expanduser(settings["working_directory"]))
GLib.idle_add(self._log_action, "Preparing drive %s", settings["device"])
utils.load_device(settings["device"])
utils.unmount_device(settings["device"])
if drive.get_cdrom_drive_status(settings["device"]) == 1:
raise OSError("No CD detected, please insert one and retry")
GLib.idle_add(self._log_action, "Reading fast TOC")
ittoc = program.getFastToc(runner, settings["device"])
program.getRipResult()
cddb = ittoc.getCDDBDiscId()
mbdiscid = ittoc.getMusicBrainzDiscId()
GLib.idle_add(self._append_log, "CDDB disc id: %s\n" % cddb)
GLib.idle_add(self._append_log, "MusicBrainz disc id: %s\n" % mbdiscid)
GLib.idle_add(self._log_action, "Resolving release metadata")
program.metadata = self._resolve_release_metadata(mbdiscid, settings)
if program.metadata is None and not settings["unknown"]:
raise RuntimeError("Unable to resolve disc metadata. Select a release or enable ripping without metadata.")
if program.metadata is not None:
program.metadata.discid = mbdiscid
GLib.idle_add(
self._log_action,
"Using release: %s - %s\n" % (
program.metadata.artist or "Unknown Artist",
program.metadata.releaseTitle or program.metadata.title or "Unknown Title",
),
)
else:
GLib.idle_add(self._log_action, "Continuing without release metadata", level=logging.WARNING)
program.result.isCdr = cdrdao.DetectCdr(settings["device"])
if program.result.isCdr and not settings["cdr"]:
raise RuntimeError("Inserted disc appears to be a CD-R. Enable 'Allow CD-R' to continue.")
if program.result.isCdr:
GLib.idle_add(self._log_action, "Detected CD-R media", level=logging.WARNING)
out_fpath = program.getPath(
settings["output_directory"],
settings["disc_template"],
mbdiscid,
program.metadata,
)
GLib.idle_add(self._log_action, "Reading full disc table")
itable = program.getTable(
runner,
ittoc.getCDDBDiscId(),
ittoc.getMusicBrainzDiscId(),
settings["device"],
settings["offset"],
out_fpath,
)
program.result.cdrdaoVersion = cdrdao.version()
program.result.cdparanoiaVersion = cdparanoia.getCdParanoiaVersion()
info = drive.getDeviceInfo(settings["device"])
if info:
try:
program.result.cdparanoiaDefeatsCache = conf.getDefeatsCache(*info)
except KeyError:
pass
program.result.artist = program.metadata.artist if program.metadata else "Unknown Artist"
program.result.title = program.metadata.releaseTitle if program.metadata else "Unknown Title"
_, program.result.vendor, program.result.model, program.result.release = \
cdio.Device(settings["device"]).get_hwinfo()
program.result.metadata = program.metadata
program.result.offset = settings["offset"]
program.result.overread = settings["overread"]
program.result.logger = settings["logger"]
program.outdir = settings["output_directory"]
GLib.idle_add(
self._log_action,
"Drive info: vendor=%s model=%s release=%s offset=%s overread=%s",
program.result.vendor,
program.result.model,
program.result.release,
program.result.offset,
program.result.overread,
)
disc_name = program.getPath(program.outdir, settings["disc_template"], mbdiscid, program.metadata)
dirname = os.path.dirname(disc_name)
if os.path.exists(dirname):
log_file = disc_name + ".log"
if os.path.exists(log_file):
raise RuntimeError("output directory %s is a finished rip" % dirname)
GLib.idle_add(self._log_action, "Using existing output directory %s", dirname)
else:
os.makedirs(dirname)
GLib.idle_add(self._log_action, "Created output directory %s", dirname)
cover_art_path = None
if settings["cover_art"] in {"embed", "complete"} and importlib.util.find_spec("PIL") is None:
GLib.idle_add(self._log_action, "Cover art embedding requires Pillow; continuing without embedded art", level=logging.WARNING)
elif settings["cover_art"] in {"file", "embed", "complete"}:
if getattr(program.metadata, "mbid", None):
GLib.idle_add(self._log_action, "Fetching cover art with mode %s", settings["cover_art"])
cover_art_path = program.getCoverArt(dirname, program.metadata.mbid)
if cover_art_path is not None:
GLib.idle_add(self._log_action, "Cover art saved to %s", cover_art_path)
else:
GLib.idle_add(self._log_action, "Cover art requested but disc metadata is unavailable", level=logging.WARNING)
if settings["cover_art"] == "file":
embed_cover_art_path = None
else:
embed_cover_art_path = cover_art_path
skipped_tracks = []
rip_numbers = []
if program.getHTOA():
GLib.idle_add(self._log_action, "Hidden Track One Audio detected")
rip_numbers.append(0)
for index, track in enumerate(itable.tracks, start=1):
if track.audio:
rip_numbers.append(index)
else:
GLib.idle_add(self._log_action, "Skipping data track %d, not implemented", index, level=logging.WARNING)
track.indexes[1].relative = 0
GLib.idle_add(self._log_action, "Planned rip items: %d", len(rip_numbers))
total_items = len(rip_numbers)
for item_index, track_number in enumerate(rip_numbers, start=1):
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
self._rip_track(
runner,
program,
itable,
settings,
track_number,
item_index,
total_items,
embed_cover_art_path,
skipped_tracks,
mbdiscid,
)
if settings["cover_art"] == "embed" and cover_art_path is not None:
os.remove(cover_art_path)
GLib.idle_add(self._log_action, "Removed temporary cover art file %s", cover_art_path)
if self.rip_cancel_requested:
raise RipCancelledError("Rip cancelled")
program.skipped_tracks = skipped_tracks or None
GLib.idle_add(self._log_action, "Writing CUE sheet")
program.writeCue(disc_name)
GLib.idle_add(self._log_action, "Writing M3U playlist")
program.write_m3u(disc_name)
try:
GLib.idle_add(self._log_action, "Running AccurateRip image verification")
with contextlib.redirect_stdout(io.StringIO()) as stdout_buffer:
program.verifyImage(runner, itable)
accurip.print_report(program.result)
report_output = stdout_buffer.getvalue()
if report_output:
GLib.idle_add(self._append_log, "\n" + report_output + "\n")
except accurip.EntryNotFound:
GLib.idle_add(self._log_action, "AccurateRip entry not found", level=logging.WARNING)
txt_logger = result.getLoggers()[settings["logger"]]()
GLib.idle_add(self._log_action, "Writing rip log with logger '%s'", settings["logger"])
program.writeLog(disc_name, txt_logger)
if skipped_tracks:
GLib.idle_add(self._log_action, "%d track(s) were skipped during this rip", len(skipped_tracks), level=logging.WARNING)
GLib.idle_add(self._finish_rip, 5, "Done with skipped tracks")
else:
GLib.idle_add(self._log_action, "Rip finished successfully")
GLib.idle_add(self._finish_rip, 0, "Done")
except Exception as exc:
if not isinstance(exc, RipCancelledError):
GLib.idle_add(self._log_action, "%s", exc, level=logging.ERROR)
GLib.idle_add(self._finish_rip, 1, "Cancelled" if self.rip_cancel_requested else "Failed")
finally:
self.rip_runner = None
self._remove_gui_log_handler(whipper_logger, log_handler, previous_level)
os.chdir(original_cwd)
def _on_release_selected(self, selection):
model, tree_iter = selection.get_selected()
if tree_iter is None:
self.current_release = None
self._update_track_store(None)
self._update_release_details(None)
else:
self.current_release = model.get_value(tree_iter, 5)
self._update_track_store(self.current_release)
self._update_release_details(self.current_release)
self.rip_button.set_sensitive(self._can_rip())
def _on_refresh_clicked(self, _button):
self._log_action("Refreshing drive list")
self._refresh_devices()
def _on_unknown_toggled(self, _button):
self._save_gui_settings()
self.rip_button.set_sensitive(
(self.scan_runner is None) and
(self.rip_runner is None) and
self._can_rip()
)
def _on_settings_changed(self, *_args):
self._save_gui_settings()
self.rip_button.set_sensitive(
(self.scan_runner is None) and
(self.rip_runner is None) and
self._can_rip()
)
def _on_device_changed(self, _combo):
self._apply_configured_offset()
self._save_gui_settings()
device = self._selected_device()
if device:
self._update_drive_info(device)
self._log_action("Selected device %s", device)
else:
self._update_drive_info(None)
def _on_analyze_drive_clicked(self, _button):
device = self._selected_device()
if not device:
self._log_action("Drive analysis requested without an active device", level=logging.WARNING)
return
self.worker = threading.Thread(
target=self._analyze_drive_worker,
args=(device,),
daemon=True,
)
self.worker.start()
def _on_find_offset_clicked(self, _button):
device = self._selected_device()
if not device:
self._log_action("Offset detection requested without an active device", level=logging.WARNING)
return
self.worker = threading.Thread(
target=self._find_offset_worker,
args=(device,),
daemon=True,
)
self.worker.start()
def _on_read_clicked(self, _button):
device = self._selected_device()
if not device:
self._log_action("Read requested without an active device", level=logging.WARNING)
return
country = self.country_entry.get_text().strip()
self._log_action(
"Read requested for %s%s",
device,
" with country filter %s" % country if country else "",
)
self.worker = threading.Thread(
target=self._read_disc_worker,
args=(device, country),
daemon=True,
)
self.worker.start()
def _on_rip_clicked(self, _button):
if not self._can_rip():
self._log_action("Rip requested without a selected release", level=logging.WARNING)
self.status_label.set_text("Missing release")
return
settings = self._collect_rip_settings()
try:
self._validate_rip_settings(settings)
except Exception as exc:
self._log_action("%s", exc, level=logging.ERROR)
self.status_label.set_text("Invalid settings")
return
if self.current_release is not None:
self._log_action(
"Rip requested for release %s - %s",
self.current_release.artist or "Unknown Artist",
self.current_release.releaseTitle or self.current_release.title or "Unknown Title",
)
else:
self._log_action("Rip requested without bound metadata release", level=logging.WARNING)
self.worker = threading.Thread(
target=self._rip_disc_worker,
args=(settings,),
daemon=True,
)
self.worker.start()
def _on_stop_clicked(self, _button):
if self.scan_runner is not None:
self.scan_cancel_requested = True
self._log_action("Stopping scan", level=logging.WARNING)
self.scan_runner.cancel()
elif self.rip_runner is not None:
self.rip_cancel_requested = True
self._log_action("Stopping rip", level=logging.WARNING)
self.rip_runner.cancel()
elif self.drive_runner is not None:
self.drive_cancel_requested = True
self._log_action("Stopping drive operation", level=logging.WARNING)
self.drive_runner.cancel()
class GuiLogHandler(logging.Handler):
def __init__(self, gui):
super().__init__(level=logging.INFO)
self.gui = gui
self.setFormatter(logging.Formatter("%(message)s"))
def emit(self, record):
try:
message = self.format(record)
except Exception:
message = record.getMessage()
GLib.idle_add(self.gui._append_log, message + "\n")
class GuiTaskListener:
def __init__(self, gui, item_label, item_index, item_total):
self.gui = gui
self.item_label = item_label
self.item_index = item_index
self.item_total = item_total
def started(self, task):
GLib.idle_add(
self.gui._update_rip_task_progress,
task.description,
self.item_label,
self.item_index,
self.item_total,
task.progress,
)
def progressed(self, task, value):
GLib.idle_add(
self.gui._update_rip_task_progress,
task.description,
self.item_label,
self.item_index,
self.item_total,
value,
)
def described(self, task, description):
GLib.idle_add(
self.gui._update_rip_task_progress,
description,
self.item_label,
self.item_index,
self.item_total,
task.progress,
)
def stopped(self, task):
return None
class CancellableSyncRunner(whipper_task.SyncRunner):
def cancel(self):
loop = getattr(self, "_loop", None)
current_task = getattr(self, "_task", None)
if loop is None or current_task is None:
return
def _cancel():
try:
abort = getattr(current_task, "abort", None)
if callable(abort):
abort()
else:
current_task.stop()
except Exception as exc:
current_task.setException(exc)
self.stopped(current_task)
loop.call_soon_threadsafe(_cancel)
def main():
try:
require_gui_runtime()
except RuntimeError as exc:
print(str(exc), file=sys.stderr)
return 1
app = WhipperGui()
return app.run(sys.argv)