Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6ae1a42d90 | |||
| 63aee123e7 | |||
| 1999ce3657 | |||
| f78a130926 | |||
| 5baadd9dbb |
378
whipper/gui.py
378
whipper/gui.py
@@ -6,6 +6,7 @@ import io
|
||||
import logging
|
||||
import contextlib
|
||||
import importlib.util
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
_GUI_IMPORT_ERROR = None
|
||||
@@ -30,12 +31,13 @@ except (ImportError, ValueError) as exc:
|
||||
|
||||
from whipper.common import accurip, common, config, drive, mbngs, task as whipper_task
|
||||
from whipper.common.program import Program
|
||||
from whipper.command import cd as cd_command
|
||||
from whipper.program import cdrdao, cdparanoia, utils
|
||||
from whipper.command import cd as cd_command, offset as offset_command
|
||||
from whipper.program import arc, cdrdao, cdparanoia, utils
|
||||
from whipper.result import result
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
APP_ID = "com.github.whipper_team.Whipper"
|
||||
|
||||
|
||||
class RipCancelledError(Exception):
|
||||
@@ -62,6 +64,30 @@ def require_gui_runtime():
|
||||
raise RuntimeError(gui_runtime_error_message()) from _GUI_IMPORT_ERROR
|
||||
|
||||
|
||||
def _iter_icon_candidates():
|
||||
local_icon = Path(__file__).resolve().parent.parent / "data" / ("%s.svg" % APP_ID)
|
||||
yield local_icon
|
||||
|
||||
data_dirs = []
|
||||
xdg_data_home = os.environ.get("XDG_DATA_HOME")
|
||||
if xdg_data_home:
|
||||
data_dirs.append(Path(xdg_data_home))
|
||||
data_dirs.extend(Path(path) for path in os.environ.get("XDG_DATA_DIRS", "/usr/local/share:/usr/share").split(":") if path)
|
||||
|
||||
for base in data_dirs:
|
||||
yield base / "icons" / "hicolor" / "scalable" / "apps" / ("%s.svg" % APP_ID)
|
||||
yield base / "icons" / "hicolor" / "256x256" / "apps" / ("%s.png" % APP_ID)
|
||||
yield base / "pixmaps" / ("%s.png" % APP_ID)
|
||||
yield base / "pixmaps" / ("%s.svg" % APP_ID)
|
||||
|
||||
|
||||
def _resolve_icon_path():
|
||||
for candidate in _iter_icon_candidates():
|
||||
if candidate.is_file():
|
||||
return str(candidate)
|
||||
return None
|
||||
|
||||
|
||||
def _format_duration_ms(duration_ms):
|
||||
if duration_ms is None:
|
||||
return ""
|
||||
@@ -86,7 +112,13 @@ def _release_duration_distance(metadata, duration_ms):
|
||||
class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
def __init__(self):
|
||||
require_gui_runtime()
|
||||
super().__init__(application_id="com.github.whipper_team.WhipperGui")
|
||||
GLib.set_prgname(APP_ID)
|
||||
super().__init__(application_id=APP_ID)
|
||||
GLib.set_application_name("Whipper")
|
||||
Gtk.Window.set_default_icon_name(APP_ID)
|
||||
icon_path = _resolve_icon_path()
|
||||
if icon_path:
|
||||
Gtk.Window.set_default_icon_from_file(icon_path)
|
||||
self.window = None
|
||||
self.main_pane = None
|
||||
|
||||
@@ -96,8 +128,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.scan_data = None
|
||||
self.scan_runner = None
|
||||
self.rip_runner = None
|
||||
self.drive_runner = None
|
||||
self.scan_cancel_requested = False
|
||||
self.rip_cancel_requested = False
|
||||
self.drive_cancel_requested = False
|
||||
self.current_release = None
|
||||
self.current_track_number = 0
|
||||
self.current_track_total = 0
|
||||
@@ -108,6 +142,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.country_entry = None
|
||||
self.release_id_entry = None
|
||||
self.refresh_button = None
|
||||
self.analyze_button = None
|
||||
self.find_offset_button = None
|
||||
self.read_button = None
|
||||
self.rip_button = None
|
||||
self.stop_button = None
|
||||
@@ -150,7 +186,11 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
def _build_window(self):
|
||||
window = Gtk.ApplicationWindow(application=self)
|
||||
window.set_title("Whipper")
|
||||
window.set_icon_name("com.github.whipper_team.Whipper")
|
||||
window.set_icon_name(APP_ID)
|
||||
icon_path = _resolve_icon_path()
|
||||
if icon_path:
|
||||
window.set_icon_from_file(icon_path)
|
||||
window.set_wmclass(APP_ID, APP_ID)
|
||||
window.set_default_size(1380, 820)
|
||||
window.set_border_width(6)
|
||||
|
||||
@@ -160,7 +200,6 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
root.pack_start(self._build_transport_strip(), False, False, 0)
|
||||
root.pack_start(self._build_progress(), False, False, 0)
|
||||
root.pack_start(self._build_main_content(), True, True, 0)
|
||||
root.pack_start(self._build_log(), False, True, 0)
|
||||
|
||||
self._refresh_devices()
|
||||
self._load_gui_settings()
|
||||
@@ -169,10 +208,19 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
|
||||
def _build_transport_strip(self):
|
||||
frame = Gtk.Frame(label="Extraction Control Center")
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
|
||||
frame.add(box)
|
||||
box.pack_start(self._build_controls(), False, False, 0)
|
||||
box.pack_start(self._build_actions(), False, False, 0)
|
||||
shell = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=10, margin=6)
|
||||
frame.add(shell)
|
||||
|
||||
primary = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
|
||||
primary.pack_start(self._build_controls(), False, False, 0)
|
||||
primary.pack_start(self._build_actions(), False, False, 0)
|
||||
shell.pack_start(primary, True, True, 0)
|
||||
|
||||
shell.pack_start(self._build_rip_options(), False, False, 0)
|
||||
|
||||
secondary = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
|
||||
secondary.pack_start(self._build_disc_info_frame(), False, False, 0)
|
||||
shell.pack_start(secondary, False, False, 0)
|
||||
return frame
|
||||
|
||||
def _build_controls(self):
|
||||
@@ -188,10 +236,20 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.refresh_button.set_tooltip_text("Refresh the list of detected optical drives")
|
||||
grid.attach(self.refresh_button, 2, 0, 1, 1)
|
||||
|
||||
grid.attach(Gtk.Label(label="Status", xalign=0), 3, 0, 1, 1)
|
||||
self.analyze_button = Gtk.Button(label="Analyze Drive")
|
||||
self.analyze_button.connect("clicked", self._on_analyze_drive_clicked)
|
||||
self.analyze_button.set_tooltip_text("Probe whether cdparanoia can defeat this drive's audio cache")
|
||||
grid.attach(self.analyze_button, 3, 0, 1, 1)
|
||||
|
||||
self.find_offset_button = Gtk.Button(label="Find Offset")
|
||||
self.find_offset_button.connect("clicked", self._on_find_offset_clicked)
|
||||
self.find_offset_button.set_tooltip_text("Detect the configured read offset using an AccurateRip-known disc")
|
||||
grid.attach(self.find_offset_button, 4, 0, 1, 1)
|
||||
|
||||
grid.attach(Gtk.Label(label="Status", xalign=0), 5, 0, 1, 1)
|
||||
self.status_label = Gtk.Label(label="Idle", xalign=0)
|
||||
self.status_label.set_selectable(True)
|
||||
grid.attach(self.status_label, 4, 0, 2, 1)
|
||||
grid.attach(self.status_label, 6, 0, 1, 1)
|
||||
|
||||
grid.attach(Gtk.Label(label="Output", xalign=0), 0, 1, 1, 1)
|
||||
self.output_button = Gtk.FileChooserButton(
|
||||
@@ -232,7 +290,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.max_retries_spin.connect("value-changed", self._on_settings_changed)
|
||||
grid.attach(self.max_retries_spin, 5, 2, 1, 1)
|
||||
|
||||
grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 2, 1, 1)
|
||||
grid.attach(Gtk.Label(label="Working Dir", xalign=0), 0, 3, 1, 1)
|
||||
self.working_directory_entry = Gtk.Entry()
|
||||
self.working_directory_entry.set_placeholder_text("Optional working directory")
|
||||
self.working_directory_entry.connect("changed", self._on_settings_changed)
|
||||
@@ -324,7 +382,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
pane.set_wide_handle(True)
|
||||
pane.pack1(self._build_workspace(), resize=True, shrink=False)
|
||||
pane.pack2(self._build_log(), resize=False, shrink=False)
|
||||
pane.set_position(560)
|
||||
pane.set_position(470)
|
||||
self.main_pane = pane
|
||||
return pane
|
||||
|
||||
@@ -333,18 +391,21 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
pane.set_wide_handle(True)
|
||||
pane.pack1(self._build_left_panel(), resize=False, shrink=False)
|
||||
pane.pack2(self._build_right_panel(), resize=True, shrink=False)
|
||||
pane.set_position(360)
|
||||
pane.set_position(330)
|
||||
return pane
|
||||
|
||||
def _build_left_panel(self):
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
|
||||
|
||||
def _build_disc_info_frame(self):
|
||||
info_frame = Gtk.Frame(label="Disc")
|
||||
info_grid = Gtk.Grid(column_spacing=8, row_spacing=4, margin=6)
|
||||
info_frame.add(info_grid)
|
||||
|
||||
rows = [
|
||||
("Device", "device"),
|
||||
("Vendor", "vendor"),
|
||||
("Model", "model"),
|
||||
("Release", "release"),
|
||||
("Read offset", "read_offset"),
|
||||
("Cache defeat", "cache_defeat"),
|
||||
("Status", "disc_status"),
|
||||
("CDDB", "cddb"),
|
||||
("Disc ID", "mbid"),
|
||||
@@ -356,8 +417,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
value = Gtk.Label(label="—", xalign=0, selectable=True)
|
||||
info_grid.attach(value, 1, row, 1, 1)
|
||||
self.info_labels[key] = value
|
||||
return info_frame
|
||||
|
||||
box.pack_start(info_frame, False, False, 0)
|
||||
def _build_left_panel(self):
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=8)
|
||||
|
||||
release_frame = Gtk.Frame(label="Metadata / Releases")
|
||||
release_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6, margin=6)
|
||||
@@ -367,7 +430,6 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.release_view = Gtk.TreeView(model=self.release_store)
|
||||
self.release_view.set_headers_clickable(False)
|
||||
self.release_view.set_enable_search(False)
|
||||
self.release_view.set_fixed_height_mode(True)
|
||||
self.release_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
|
||||
self.release_view.get_selection().connect("changed", self._on_release_selected)
|
||||
for index, title in enumerate(["Artist", "Title", "Year", "Type", "Country"]):
|
||||
@@ -381,6 +443,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
scroll = Gtk.ScrolledWindow()
|
||||
scroll.set_hexpand(True)
|
||||
scroll.set_vexpand(True)
|
||||
scroll.set_min_content_height(180)
|
||||
scroll.add(self.release_view)
|
||||
release_box.pack_start(scroll, True, True, 0)
|
||||
|
||||
@@ -398,7 +461,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
release_box.pack_start(details_frame, False, False, 0)
|
||||
|
||||
box.pack_start(release_frame, True, True, 0)
|
||||
box.set_size_request(420, -1)
|
||||
box.set_size_request(330, -1)
|
||||
return box
|
||||
|
||||
def _build_right_panel(self):
|
||||
@@ -422,7 +485,6 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.track_view = Gtk.TreeView(model=self.track_store)
|
||||
self.track_view.set_headers_clickable(False)
|
||||
self.track_view.set_enable_search(False)
|
||||
self.track_view.set_fixed_height_mode(True)
|
||||
self.track_view.set_grid_lines(Gtk.TreeViewGridLines.BOTH)
|
||||
for index, title in enumerate(["#", "Status", "Artist", "Title", "Length", "Test CRC", "Copy CRC", "AR"]):
|
||||
renderer = Gtk.CellRendererText()
|
||||
@@ -437,10 +499,9 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
track_scroll = Gtk.ScrolledWindow()
|
||||
track_scroll.set_hexpand(True)
|
||||
track_scroll.set_vexpand(True)
|
||||
track_scroll.set_min_content_height(220)
|
||||
track_scroll.add(self.track_view)
|
||||
tracks_box.pack_start(track_scroll, True, True, 0)
|
||||
|
||||
tracks_box.pack_start(self._build_rip_options(), False, False, 0)
|
||||
box.pack_start(tracks_frame, True, True, 0)
|
||||
return box
|
||||
|
||||
@@ -462,7 +523,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
grid.attach(self.disc_template_entry, 1, 1, 3, 1)
|
||||
|
||||
note = Gtk.Label(
|
||||
label="Advanced templates stay here; fast extraction switches moved to the control center to reduce vertical space.",
|
||||
label="Templates are kept here so the lower workspace can stay focused on releases, tracks and log visibility.",
|
||||
xalign=0,
|
||||
)
|
||||
note.set_line_wrap(True)
|
||||
@@ -607,6 +668,8 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
def _set_running_state(self, running, status):
|
||||
self.read_button.set_sensitive(not running)
|
||||
self.refresh_button.set_sensitive(not running)
|
||||
self.analyze_button.set_sensitive(not running)
|
||||
self.find_offset_button.set_sensitive(not running)
|
||||
self.rip_button.set_sensitive((not running) and self._can_rip())
|
||||
self.stop_button.set_sensitive(running)
|
||||
self.status_label.set_text(status)
|
||||
@@ -618,7 +681,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.pulse_id = 0
|
||||
|
||||
def _pulse_progress(self):
|
||||
if self.scan_runner is None and self.rip_runner is None:
|
||||
if self.scan_runner is None and self.rip_runner is None and self.drive_runner is None:
|
||||
return False
|
||||
self.track_bar.pulse()
|
||||
return True
|
||||
@@ -635,9 +698,14 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.device_combo.set_active(0)
|
||||
self.read_button.set_sensitive(False)
|
||||
self.rip_button.set_sensitive(False)
|
||||
self.analyze_button.set_sensitive(False)
|
||||
self.find_offset_button.set_sensitive(False)
|
||||
logger.warning("no optical drives detected")
|
||||
self._update_drive_info(None)
|
||||
else:
|
||||
self.read_button.set_sensitive(True)
|
||||
self.analyze_button.set_sensitive(True)
|
||||
self.find_offset_button.set_sensitive(True)
|
||||
if current in devices:
|
||||
self.device_combo.set_active(devices.index(current))
|
||||
else:
|
||||
@@ -645,6 +713,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
logger.info("detected %d optical drive(s); active device: %s",
|
||||
len(devices), self.device_combo.get_active_text())
|
||||
self._apply_configured_offset()
|
||||
self._update_drive_info(self.device_combo.get_active_text())
|
||||
|
||||
def _selected_device(self):
|
||||
device = self.device_combo.get_active_text()
|
||||
@@ -652,20 +721,64 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
return None
|
||||
return device
|
||||
|
||||
def _update_drive_info(self, device):
|
||||
self._set_label("device", device)
|
||||
for key in ["vendor", "model", "release", "read_offset", "cache_defeat"]:
|
||||
self._set_label(key, None)
|
||||
if not device:
|
||||
return
|
||||
try:
|
||||
info = drive.getDeviceInfo(device)
|
||||
except Exception:
|
||||
logger.exception("failed to read drive info for %s", device)
|
||||
return
|
||||
if not info:
|
||||
return
|
||||
vendor, model, release = info
|
||||
self._set_label("vendor", vendor)
|
||||
self._set_label("model", model)
|
||||
self._set_label("release", release)
|
||||
conf = config.Config()
|
||||
try:
|
||||
self._set_label("read_offset", str(conf.getReadOffset(vendor, model, release)))
|
||||
except KeyError:
|
||||
self._set_label("read_offset", "Unknown")
|
||||
try:
|
||||
defeats_cache = conf.getDefeatsCache(vendor, model, release)
|
||||
self._set_label("cache_defeat", "Yes" if defeats_cache else "No")
|
||||
except KeyError:
|
||||
self._set_label("cache_defeat", "Unknown")
|
||||
|
||||
@staticmethod
|
||||
def _parse_offset_candidates(offsets_text):
|
||||
offsets = []
|
||||
for block in offsets_text.split(","):
|
||||
block = block.strip()
|
||||
if not block:
|
||||
continue
|
||||
if ":" in block:
|
||||
start, end = block.split(":")
|
||||
offsets.extend(range(int(start), int(end) + 1))
|
||||
else:
|
||||
offsets.append(int(block))
|
||||
return offsets
|
||||
|
||||
def _update_release_store(self, releases):
|
||||
self.release_store.clear()
|
||||
self.current_release = None
|
||||
for metadata in releases:
|
||||
self.release_store.append([
|
||||
metadata.artist or "",
|
||||
metadata.releaseTitle or metadata.title or "",
|
||||
metadata.artist or "Unknown Artist",
|
||||
metadata.releaseTitle or metadata.title or "Unknown Release",
|
||||
_release_year(metadata),
|
||||
metadata.releaseType or "",
|
||||
_release_country(metadata),
|
||||
metadata.releaseType or "Unknown",
|
||||
_release_country(metadata) or "—",
|
||||
metadata,
|
||||
])
|
||||
if releases:
|
||||
self.release_view.get_selection().select_path(0)
|
||||
self.release_view.set_cursor(Gtk.TreePath.new_first())
|
||||
self.release_view.scroll_to_cell(Gtk.TreePath.new_first(), None, False, 0.0, 0.0)
|
||||
else:
|
||||
self._update_track_store(None)
|
||||
self._update_release_details(None)
|
||||
@@ -738,6 +851,178 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.offset_spin.set_value(int(offset))
|
||||
logger.info("loaded configured read offset %d for %s", offset, device)
|
||||
|
||||
def _analyze_drive_worker(self, device):
|
||||
try:
|
||||
self.drive_cancel_requested = False
|
||||
GLib.idle_add(self._log_action, "Drive analysis requested for %s", device)
|
||||
GLib.idle_add(self._set_running_state, True, "Analyzing drive")
|
||||
GLib.idle_add(self.progress_label.set_text, "Analyzing drive cache behaviour")
|
||||
|
||||
runner = CancellableSyncRunner()
|
||||
self.drive_runner = runner
|
||||
analyze_task = cdparanoia.AnalyzeTask(device)
|
||||
runner.run(analyze_task)
|
||||
if self.drive_cancel_requested:
|
||||
raise RuntimeError("Drive analysis cancelled")
|
||||
if analyze_task.defeatsCache is None:
|
||||
raise RuntimeError("Cannot analyze the drive; insert an audio CD and retry.")
|
||||
|
||||
info = drive.getDeviceInfo(device)
|
||||
if info:
|
||||
config.Config().setDefeatsCache(info[0], info[1], info[2], analyze_task.defeatsCache)
|
||||
|
||||
def apply_results():
|
||||
self._log_action(
|
||||
"Drive cache analysis result: cdparanoia %s defeat the audio cache",
|
||||
"can" if analyze_task.defeatsCache else "cannot",
|
||||
)
|
||||
self._update_drive_info(device)
|
||||
self._set_running_state(False, "Drive analyzed")
|
||||
self.progress_label.set_text("Drive analysis complete")
|
||||
return False
|
||||
|
||||
GLib.idle_add(apply_results)
|
||||
except Exception as exc:
|
||||
def apply_error():
|
||||
if self.drive_cancel_requested:
|
||||
self._log_action("Drive analysis cancelled", level=logging.WARNING)
|
||||
self._set_running_state(False, "Cancelled")
|
||||
self.progress_label.set_text("Drive analysis cancelled")
|
||||
else:
|
||||
self._log_action("%s", exc, level=logging.ERROR)
|
||||
self._set_running_state(False, "Drive analysis failed")
|
||||
self.progress_label.set_text("Drive analysis failed")
|
||||
return False
|
||||
|
||||
GLib.idle_add(apply_error)
|
||||
finally:
|
||||
self.drive_runner = None
|
||||
|
||||
def _find_offset_worker(self, device):
|
||||
try:
|
||||
self.drive_cancel_requested = False
|
||||
GLib.idle_add(self._log_action, "Offset detection requested for %s", device)
|
||||
GLib.idle_add(self._set_running_state, True, "Finding offset")
|
||||
GLib.idle_add(self.progress_label.set_text, "Finding drive read offset")
|
||||
|
||||
runner = CancellableSyncRunner()
|
||||
self.drive_runner = runner
|
||||
|
||||
utils.load_device(device)
|
||||
utils.unmount_device(device)
|
||||
toc_task = cdrdao.ReadTOCTask(device)
|
||||
runner.run(toc_task)
|
||||
table = toc_task.toc.table
|
||||
if len(table.tracks) < 3:
|
||||
raise RuntimeError("Offset detection needs a CD with at least 3 tracks.")
|
||||
|
||||
responses = accurip.get_db_entry(table.accuraterip_path())
|
||||
offsets = self._parse_offset_candidates(offset_command.OFFSETS)
|
||||
|
||||
def match(archecksums, track_number):
|
||||
for index, response in enumerate(responses):
|
||||
for checksum in archecksums:
|
||||
if checksum == response.checksums[track_number - 1]:
|
||||
return checksum, index
|
||||
return None, None
|
||||
|
||||
def rip_arcs(track_number, offset_value):
|
||||
fd, path = tempfile.mkstemp(
|
||||
suffix=".track%02d.offset%d.whipper.wav" % (track_number, offset_value)
|
||||
)
|
||||
os.close(fd)
|
||||
try:
|
||||
read_task = cdparanoia.ReadTrackTask(
|
||||
path,
|
||||
table,
|
||||
table.getTrackStart(track_number),
|
||||
table.getTrackEnd(track_number),
|
||||
overread=False,
|
||||
offset=offset_value,
|
||||
device=device,
|
||||
)
|
||||
read_task.description = "Ripping track %d with read offset %d" % (
|
||||
track_number,
|
||||
offset_value,
|
||||
)
|
||||
runner.run(read_task)
|
||||
v1, v2 = arc.accuraterip_checksum(path, track_number, len(table.tracks))
|
||||
return "%08x" % v1, "%08x" % v2
|
||||
finally:
|
||||
if os.path.exists(path):
|
||||
os.unlink(path)
|
||||
|
||||
found_offset = None
|
||||
for offset_value in offsets:
|
||||
if self.drive_cancel_requested:
|
||||
raise RuntimeError("Offset detection cancelled")
|
||||
GLib.idle_add(self._log_action, "Trying read offset %d", offset_value)
|
||||
try:
|
||||
checksums = rip_arcs(1, offset_value)
|
||||
except Exception as exc:
|
||||
GLib.idle_add(self._log_action, "Cannot rip with offset %d: %s", offset_value, exc, level=logging.WARNING)
|
||||
continue
|
||||
checksum, response_index = match(checksums, 1)
|
||||
if not checksum:
|
||||
continue
|
||||
|
||||
GLib.idle_add(self._log_action, "Potential offset %d matched response %d; confirming", offset_value, response_index)
|
||||
matched_tracks = 1
|
||||
for track_number in range(2, len(table.tracks)):
|
||||
if self.drive_cancel_requested:
|
||||
raise RuntimeError("Offset detection cancelled")
|
||||
try:
|
||||
checksums = rip_arcs(track_number, offset_value)
|
||||
except Exception as exc:
|
||||
GLib.idle_add(self._log_action, "Track %d failed for offset %d: %s", track_number, offset_value, exc, level=logging.WARNING)
|
||||
continue
|
||||
checksum, _ = match(checksums, track_number)
|
||||
if checksum:
|
||||
matched_tracks += 1
|
||||
if matched_tracks == len(table.tracks) - 1:
|
||||
found_offset = offset_value
|
||||
break
|
||||
|
||||
if found_offset is None:
|
||||
raise RuntimeError("No matching offset found. Try another AccurateRip-enabled disc.")
|
||||
|
||||
info = drive.getDeviceInfo(device)
|
||||
if info:
|
||||
config.Config().setReadOffset(info[0], info[1], info[2], found_offset)
|
||||
|
||||
def apply_results():
|
||||
self.offset_spin.set_value(found_offset)
|
||||
self._log_action("Read offset of device is %d", found_offset)
|
||||
self._update_drive_info(device)
|
||||
self._set_running_state(False, "Offset found")
|
||||
self.progress_label.set_text("Drive offset detection complete")
|
||||
return False
|
||||
|
||||
GLib.idle_add(apply_results)
|
||||
except accurip.EntryNotFound:
|
||||
def apply_not_found():
|
||||
self._log_action("AccurateRip entry not found; try another disc for offset detection", level=logging.WARNING)
|
||||
self._set_running_state(False, "Offset unavailable")
|
||||
self.progress_label.set_text("No AccurateRip entry for this disc")
|
||||
return False
|
||||
|
||||
GLib.idle_add(apply_not_found)
|
||||
except Exception as exc:
|
||||
def apply_error():
|
||||
if self.drive_cancel_requested:
|
||||
self._log_action("Offset detection cancelled", level=logging.WARNING)
|
||||
self._set_running_state(False, "Cancelled")
|
||||
self.progress_label.set_text("Offset detection cancelled")
|
||||
else:
|
||||
self._log_action("%s", exc, level=logging.ERROR)
|
||||
self._set_running_state(False, "Offset detection failed")
|
||||
self.progress_label.set_text("Offset detection failed")
|
||||
return False
|
||||
|
||||
GLib.idle_add(apply_error)
|
||||
finally:
|
||||
self.drive_runner = None
|
||||
|
||||
def _read_disc_worker(self, device, country):
|
||||
try:
|
||||
self.scan_cancel_requested = False
|
||||
@@ -790,7 +1075,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
"tracks": track_count,
|
||||
"releases": releases,
|
||||
}
|
||||
self._set_label("device", device)
|
||||
self._update_drive_info(device)
|
||||
self._set_label("disc_status", "Ready")
|
||||
self._set_label("cddb", cddb)
|
||||
self._set_label("mbid", mbid)
|
||||
@@ -815,7 +1100,7 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.release_store.clear()
|
||||
self.track_store.clear()
|
||||
self._update_release_details(None)
|
||||
self._set_label("device", device)
|
||||
self._update_drive_info(device)
|
||||
if self.scan_cancel_requested:
|
||||
self._set_label("disc_status", "Cancelled")
|
||||
self._log_action("Disc scan cancelled", level=logging.WARNING)
|
||||
@@ -1350,7 +1635,34 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self._save_gui_settings()
|
||||
device = self._selected_device()
|
||||
if device:
|
||||
self._update_drive_info(device)
|
||||
self._log_action("Selected device %s", device)
|
||||
else:
|
||||
self._update_drive_info(None)
|
||||
|
||||
def _on_analyze_drive_clicked(self, _button):
|
||||
device = self._selected_device()
|
||||
if not device:
|
||||
self._log_action("Drive analysis requested without an active device", level=logging.WARNING)
|
||||
return
|
||||
self.worker = threading.Thread(
|
||||
target=self._analyze_drive_worker,
|
||||
args=(device,),
|
||||
daemon=True,
|
||||
)
|
||||
self.worker.start()
|
||||
|
||||
def _on_find_offset_clicked(self, _button):
|
||||
device = self._selected_device()
|
||||
if not device:
|
||||
self._log_action("Offset detection requested without an active device", level=logging.WARNING)
|
||||
return
|
||||
self.worker = threading.Thread(
|
||||
target=self._find_offset_worker,
|
||||
args=(device,),
|
||||
daemon=True,
|
||||
)
|
||||
self.worker.start()
|
||||
|
||||
def _on_read_clicked(self, _button):
|
||||
device = self._selected_device()
|
||||
@@ -1406,6 +1718,10 @@ class WhipperGui(Gtk.Application if Gtk is not None else object):
|
||||
self.rip_cancel_requested = True
|
||||
self._log_action("Stopping rip", level=logging.WARNING)
|
||||
self.rip_runner.cancel()
|
||||
elif self.drive_runner is not None:
|
||||
self.drive_cancel_requested = True
|
||||
self._log_action("Stopping drive operation", level=logging.WARNING)
|
||||
self.drive_runner.cancel()
|
||||
|
||||
|
||||
class GuiLogHandler(logging.Handler):
|
||||
|
||||
@@ -169,10 +169,18 @@ class FakeSelection:
|
||||
class FakeReleaseView:
|
||||
def __init__(self, store):
|
||||
self.selection = FakeSelection(store)
|
||||
self.cursor = None
|
||||
self.scrolled_path = None
|
||||
|
||||
def get_selection(self):
|
||||
return self.selection
|
||||
|
||||
def set_cursor(self, path):
|
||||
self.cursor = path
|
||||
|
||||
def scroll_to_cell(self, path, *_args):
|
||||
self.scrolled_path = path
|
||||
|
||||
|
||||
class FakeRunner:
|
||||
def __init__(self):
|
||||
@@ -269,6 +277,8 @@ def _make_ui_app(tmp_path):
|
||||
app.disc_template_entry = FakeEntry("%d")
|
||||
app.read_button = FakeButton()
|
||||
app.refresh_button = FakeButton()
|
||||
app.analyze_button = FakeButton()
|
||||
app.find_offset_button = FakeButton()
|
||||
app.rip_button = FakeButton()
|
||||
app.stop_button = FakeButton()
|
||||
app.status_label = FakeLabel()
|
||||
@@ -290,7 +300,8 @@ def _make_ui_app(tmp_path):
|
||||
app.release_view = FakeReleaseView(app.release_store)
|
||||
app.release_details = FakeLabel()
|
||||
app.info_labels = {key: FakeLabel() for key in [
|
||||
"device", "disc_status", "cddb", "mbid", "duration", "tracks"
|
||||
"device", "vendor", "model", "release", "read_offset", "cache_defeat",
|
||||
"disc_status", "cddb", "mbid", "duration", "tracks"
|
||||
]}
|
||||
app.scan_runner = None
|
||||
app.rip_runner = None
|
||||
@@ -313,6 +324,7 @@ def _make_ui_app(tmp_path):
|
||||
"_can_rip",
|
||||
"_set_label",
|
||||
"_set_running_state",
|
||||
"_update_drive_info",
|
||||
"_update_release_store",
|
||||
"_update_track_store",
|
||||
"_set_track_field",
|
||||
@@ -323,6 +335,8 @@ def _make_ui_app(tmp_path):
|
||||
"_pulse_progress",
|
||||
"_resolve_release_metadata",
|
||||
"_on_release_selected",
|
||||
"_on_analyze_drive_clicked",
|
||||
"_on_find_offset_clicked",
|
||||
"_on_stop_clicked",
|
||||
)
|
||||
app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler
|
||||
@@ -409,6 +423,64 @@ def test_release_selection_and_progress_updates(monkeypatch, tmp_path):
|
||||
assert app.progress_label.get_text() == "Rip complete"
|
||||
|
||||
|
||||
def test_update_release_store_uses_visible_fallbacks(tmp_path):
|
||||
app = _make_ui_app(tmp_path)
|
||||
metadata = SimpleNamespace(
|
||||
artist=None,
|
||||
releaseTitle=None,
|
||||
title=None,
|
||||
release=None,
|
||||
releaseType=None,
|
||||
countries=[],
|
||||
duration=1000,
|
||||
tracks=[],
|
||||
discNumber=1,
|
||||
discTotal=1,
|
||||
mbid="mbid-empty",
|
||||
url="https://example.invalid/mbid-empty",
|
||||
barcode=None,
|
||||
catalogNumbers=[],
|
||||
)
|
||||
|
||||
gui.WhipperGui._update_release_store(app, [metadata])
|
||||
|
||||
assert app.release_store[0][0] == "Unknown Artist"
|
||||
assert app.release_store[0][1] == "Unknown Release"
|
||||
assert app.release_store[0][3] == "Unknown"
|
||||
assert app.release_store[0][4] == "—"
|
||||
assert app.release_view.cursor is not None
|
||||
assert app.release_view.scrolled_path is not None
|
||||
|
||||
|
||||
def test_update_drive_info_reads_config(monkeypatch, tmp_path):
|
||||
app = _make_ui_app(tmp_path)
|
||||
|
||||
class FakeConfig:
|
||||
def getReadOffset(self, vendor, model, release):
|
||||
assert (vendor, model, release) == ("Vendor", "Model", "1.0")
|
||||
return 667
|
||||
|
||||
def getDefeatsCache(self, vendor, model, release):
|
||||
assert (vendor, model, release) == ("Vendor", "Model", "1.0")
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda device: ("Vendor", "Model", "1.0"))
|
||||
monkeypatch.setattr(gui.config, "Config", lambda: FakeConfig())
|
||||
|
||||
gui.WhipperGui._update_drive_info(app, "/dev/cdrom")
|
||||
|
||||
assert app.info_labels["device"].get_text() == "/dev/cdrom"
|
||||
assert app.info_labels["vendor"].get_text() == "Vendor"
|
||||
assert app.info_labels["model"].get_text() == "Model"
|
||||
assert app.info_labels["release"].get_text() == "1.0"
|
||||
assert app.info_labels["read_offset"].get_text() == "667"
|
||||
assert app.info_labels["cache_defeat"].get_text() == "Yes"
|
||||
|
||||
|
||||
def test_parse_offset_candidates_supports_ranges(tmp_path):
|
||||
assert gui.WhipperGui._parse_offset_candidates("-1, 0, 2:4") == [-1, 0, 2, 3, 4]
|
||||
|
||||
|
||||
def test_main_reports_missing_runtime(monkeypatch, capsys):
|
||||
monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing"))
|
||||
monkeypatch.setattr(gui, "gi", None)
|
||||
|
||||
Reference in New Issue
Block a user