import os import threading import time from types import MethodType, SimpleNamespace import pytest from whipper import gui def _bind_methods(app, *names): for name in names: setattr(app, name, MethodType(getattr(gui.WhipperGui, name), app)) return app def _drain_glib_until(predicate, timeout=2.0): context = gui.GLib.MainContext.default() deadline = time.time() + timeout while time.time() < deadline: while context.pending(): context.iteration(False) if predicate(): return time.sleep(0.01) while context.pending(): context.iteration(False) assert predicate() class FakeEntry: def __init__(self, text=""): self.text = text def get_text(self): return self.text def set_text(self, value): self.text = value class FakeToggle: def __init__(self, active=False): self.active = active def get_active(self): return self.active def set_active(self, value): self.active = bool(value) class FakeCombo: def __init__(self, active_id=None): self.active_id = active_id def get_active_id(self): return self.active_id def set_active_id(self, value): self.active_id = value def set_active(self, index): self.active_id = "" if index == 0 else self.active_id class FakeSpin: def __init__(self, value=0): self.value = value def get_value(self): return self.value def set_value(self, value): self.value = value class FakeButton: def __init__(self): self.sensitive = None def set_sensitive(self, value): self.sensitive = bool(value) class FakeLabel: def __init__(self, text=""): self.text = text def set_text(self, value): self.text = value def get_text(self): return self.text class FakeProgressBar: def __init__(self): self.fraction = 0.0 self.text = "" self.pulses = 0 def set_fraction(self, value): self.fraction = value def set_text(self, value): self.text = value def pulse(self): self.pulses += 1 class FakeFileChooser: def __init__(self, filename=None): self.filename = filename def get_filename(self): return self.filename def set_filename(self, filename): self.filename = filename class FakeWindow: def __init__(self, width=640, height=480): self.size = (width, height) def get_size(self): return self.size def resize(self, width, height): self.size = (width, height) class FakePane: def __init__(self, position=0): self.position = position def get_position(self): return self.position def set_position(self, value): self.position = value class FakeListStore(list): def clear(self): del self[:] def append(self, row): super().append(row) def get_value(self, tree_iter, index): return self[tree_iter][index] class FakeSelection: def __init__(self, model): self.model = model self.selected = None def get_selected(self): return self.model, self.selected def select_path(self, path): self.selected = path class FakeReleaseView: def __init__(self, store): self.selection = FakeSelection(store) self.cursor = None self.scrolled_path = None def get_selection(self): return self.selection def set_cursor(self, path): self.cursor = path def scroll_to_cell(self, path, *_args): self.scrolled_path = path class FakeRunner: def __init__(self): self.cancelled = False def run(self, task, verbose=False): self.task = task def cancel(self): self.cancelled = True class FakeTrack: def __init__(self, audio=True): self.audio = audio self.isrc = None self.pre_emphasis = False self.indexes = { 1: SimpleNamespace(relative=1), } def getPregap(self): return 0 class FakeITable: def __init__(self): self.tracks = [FakeTrack(audio=True)] class FakeResultTable: def getTrackStart(self, _track_number): return 0 def getTrackEnd(self, _track_number): return 9 def getTrackLength(self, _track_number): return 10 def setFile(self, *_args): return None class FakeRipResult: def __init__(self): self.tracks = [] self.table = FakeResultTable() self.isCdr = False def getTrackResult(self, track_number): for track in self.tracks: if track.number == track_number: return track return None def _make_metadata(title, duration, mbid): return SimpleNamespace( artist="Artist", releaseTitle=title, title=title, release="2024-01-01", releaseType="Album", countries=["US"], duration=duration, tracks=[], discNumber=1, discTotal=1, mbid=mbid, url="https://example.invalid/%s" % mbid, barcode=None, catalogNumbers=[], ) def _make_ui_app(tmp_path): app = SimpleNamespace() app.window = FakeWindow(111, 222) app.main_pane = FakePane(77) app.output_button = FakeFileChooser(str(tmp_path)) app.working_directory_entry = FakeEntry("") app.country_entry = FakeEntry("") app.release_id_entry = FakeEntry("") app.unknown_check = FakeToggle(True) app.cdr_check = FakeToggle(False) app.keep_going_check = FakeToggle(True) app.overread_check = FakeToggle(False) app.cover_art_combo = FakeCombo("") app.max_retries_spin = FakeSpin(5) app.offset_spin = FakeSpin(0) app.logger_combo = FakeCombo("whipper") app.track_template_entry = FakeEntry("%t") app.disc_template_entry = FakeEntry("%d") app.read_button = FakeButton() app.refresh_button = FakeButton() app.analyze_button = FakeButton() app.find_offset_button = FakeButton() app.rip_button = FakeButton() app.stop_button = FakeButton() app.status_label = FakeLabel() app.progress_label = FakeLabel() app.overall_bar = FakeProgressBar() app.track_bar = FakeProgressBar() app.release_store = FakeListStore() app.track_store = FakeListStore() app.track_columns = { "number": 0, "status": 1, "artist": 2, "title": 3, "length": 4, "test_crc": 5, "copy_crc": 6, "accuraterip": 7, } app.release_view = FakeReleaseView(app.release_store) app.release_details = FakeLabel() app.info_labels = {key: FakeLabel() for key in [ "device", "vendor", "model", "release", "read_offset", "cache_defeat", "disc_status", "cddb", "mbid", "duration", "tracks" ]} app.scan_runner = None app.rip_runner = None app.scan_cancel_requested = False app.rip_cancel_requested = False app.current_release = None app.current_track_number = 0 app.current_track_total = 0 app.pulse_id = 0 app.logs = [] app._append_log = lambda text: app.logs.append(text) app._clear_log = lambda: app.logs.clear() app._config_path = lambda: tmp_path / "gui.json" _bind_methods( app, "_log_action", "_collect_gui_settings", "_save_gui_settings", "_load_gui_settings", "_can_rip", "_set_label", "_set_running_state", "_update_drive_info", "_update_release_store", "_update_track_store", "_set_track_field", "_update_release_details", "_update_rip_task_progress", "_finish_rip", "_reset_progress", "_pulse_progress", "_resolve_release_metadata", "_on_release_selected", "_on_analyze_drive_clicked", "_on_find_offset_clicked", "_on_stop_clicked", ) app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler app._install_gui_log_handler = MethodType(gui.WhipperGui._install_gui_log_handler, app) return app def test_gui_settings_roundtrip(tmp_path, monkeypatch): app = _make_ui_app(tmp_path) app.output_button.set_filename(str(tmp_path / "output")) os.mkdir(app.output_button.get_filename()) app.working_directory_entry.set_text(str(tmp_path)) app.country_entry.set_text("JP") app.release_id_entry.set_text("release-id") app.cdr_check.set_active(True) app.cover_art_combo.set_active_id("embed") app.max_retries_spin.set_value(7) app.offset_spin.set_value(123) app.track_template_entry.set_text("%A/%t") app.disc_template_entry.set_text("%A/%d") monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args)) gui.WhipperGui._save_gui_settings(app) app.output_button.set_filename(None) app.working_directory_entry.set_text("") app.country_entry.set_text("") app.release_id_entry.set_text("") app.unknown_check.set_active(False) app.cdr_check.set_active(False) app.keep_going_check.set_active(False) app.overread_check.set_active(True) app.cover_art_combo.set_active_id(None) app.max_retries_spin.set_value(0) app.offset_spin.set_value(0) app.logger_combo.set_active_id(None) app.track_template_entry.set_text("") app.disc_template_entry.set_text("") gui.WhipperGui._load_gui_settings(app) assert app.output_button.get_filename() == str(tmp_path / "output") assert app.working_directory_entry.get_text() == str(tmp_path) assert app.country_entry.get_text() == "JP" assert app.release_id_entry.get_text() == "release-id" assert app.unknown_check.get_active() is True assert app.cdr_check.get_active() is True assert app.keep_going_check.get_active() is True assert app.overread_check.get_active() is False assert app.cover_art_combo.get_active_id() == "embed" assert app.max_retries_spin.get_value() == 7 assert app.offset_spin.get_value() == 123 assert app.logger_combo.get_active_id() == "whipper" assert app.track_template_entry.get_text() == "%A/%t" assert app.disc_template_entry.get_text() == "%A/%d" assert app.window.get_size() == (111, 222) assert app.main_pane.get_position() == 77 def test_release_selection_and_progress_updates(monkeypatch, tmp_path): app = _make_ui_app(tmp_path) selection = app.release_view.get_selection() metadata = _make_metadata("Chosen", 1000, "mbid-1") metadata.tracks = [SimpleNamespace(artist="Track Artist", title="Track Title", duration=210000)] app.release_store.append(["Artist", "Chosen", "2024", "Album", "US", metadata]) selection.select_path(0) monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 99) monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) gui.WhipperGui._on_release_selected(app, selection) gui.WhipperGui._set_running_state(app, True, "Busy") assert app.read_button.sensitive is False assert app.refresh_button.sensitive is False assert app.stop_button.sensitive is True gui.WhipperGui._update_rip_task_progress(app, "Encoding", "Track 1", 1, 2, 0.25) gui.WhipperGui._finish_rip(app, 0, "Done") assert app.current_release is metadata assert app.track_store[0][2] == "Track Artist" assert app.track_store[0][1].startswith("Encoding") assert app.release_details.get_text().startswith("Artist: Artist") assert app.overall_bar.fraction == 1.0 assert app.track_bar.fraction == 1.0 assert app.progress_label.get_text() == "Rip complete" def test_update_release_store_uses_visible_fallbacks(tmp_path): app = _make_ui_app(tmp_path) metadata = SimpleNamespace( artist=None, releaseTitle=None, title=None, release=None, releaseType=None, countries=[], duration=1000, tracks=[], discNumber=1, discTotal=1, mbid="mbid-empty", url="https://example.invalid/mbid-empty", barcode=None, catalogNumbers=[], ) gui.WhipperGui._update_release_store(app, [metadata]) assert app.release_store[0][0] == "Unknown Artist" assert app.release_store[0][1] == "Unknown Release" assert app.release_store[0][3] == "Unknown" assert app.release_store[0][4] == "—" assert app.release_view.cursor is not None assert app.release_view.scrolled_path is not None def test_update_drive_info_reads_config(monkeypatch, tmp_path): app = _make_ui_app(tmp_path) class FakeConfig: def getReadOffset(self, vendor, model, release): assert (vendor, model, release) == ("Vendor", "Model", "1.0") return 667 def getDefeatsCache(self, vendor, model, release): assert (vendor, model, release) == ("Vendor", "Model", "1.0") return True monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda device: ("Vendor", "Model", "1.0")) monkeypatch.setattr(gui.config, "Config", lambda: FakeConfig()) gui.WhipperGui._update_drive_info(app, "/dev/cdrom") assert app.info_labels["device"].get_text() == "/dev/cdrom" assert app.info_labels["vendor"].get_text() == "Vendor" assert app.info_labels["model"].get_text() == "Model" assert app.info_labels["release"].get_text() == "1.0" assert app.info_labels["read_offset"].get_text() == "667" assert app.info_labels["cache_defeat"].get_text() == "Yes" def test_parse_offset_candidates_supports_ranges(tmp_path): assert gui.WhipperGui._parse_offset_candidates("-1, 0, 2:4") == [-1, 0, 2, 3, 4] def test_main_reports_missing_runtime(monkeypatch, capsys): monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing")) monkeypatch.setattr(gui, "gi", None) monkeypatch.setattr(gui, "cdio", None) assert gui.main() == 1 assert "whipper-gui requires" in capsys.readouterr().err def test_read_disc_worker_processes_idle_callbacks(monkeypatch, tmp_path): app = _make_ui_app(tmp_path) calls = [] class FakeReadTOCTask: def __init__(self, _device, fast_toc=True): assert fast_toc is True self.toc = SimpleNamespace(table=SimpleNamespace( getCDDBDiscId=lambda: "cddb-id", getMusicBrainzDiscId=lambda: "mbid-disc", duration=lambda: 200000, getAudioTracks=lambda: 10, )) runner = FakeRunner() monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) monkeypatch.setattr(gui.cdrdao, "ReadTOCTask", FakeReadTOCTask) monkeypatch.setattr(gui.utils, "load_device", lambda device: calls.append(("load", device))) monkeypatch.setattr(gui.utils, "unmount_device", lambda device: calls.append(("unmount", device))) monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) monkeypatch.setattr( gui.mbngs, "musicbrainz", lambda *_args, **_kwargs: [ _make_metadata("Far", 260000, "r2"), _make_metadata("Near", 205000, "r1"), ], ) monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1) monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) worker = threading.Thread( target=gui.WhipperGui._read_disc_worker, args=(app, "/dev/cdrom", "US"), ) worker.start() _drain_glib_until(lambda: not worker.is_alive()) worker.join() assert calls == [("load", "/dev/cdrom"), ("unmount", "/dev/cdrom")] assert app.scan_data["mbid"] == "mbid-disc" assert app.scan_data["releases"][0].releaseTitle == "Near" assert app.info_labels["disc_status"].get_text() == "Ready" assert "Found 2 matching release(s)" in "".join(app.logs) def test_rip_disc_worker_success(monkeypatch, tmp_path): app = _make_ui_app(tmp_path) app.current_release = _make_metadata("Selected", 200000, "release-mbid") app.release_id_entry.set_text("") app.unknown_check.set_active(False) app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]} program_holder = {} class FakeProgram: def __init__(self, _conf, record=False): assert record is False self.metadata = None self.outdir = None self.result = FakeRipResult() self.cover_art_paths = [] self.write_cue = False self.write_m3u_called = False self.write_log_called = False program_holder["program"] = self def getFastToc(self, _runner, _device): return SimpleNamespace( getCDDBDiscId=lambda: "cddb-id", getMusicBrainzDiscId=lambda: "mbid-disc", ) def getRipResult(self): return self.result def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None): if track_number is None: return os.path.join(base, "Artist - Selected", "Artist - Selected") return os.path.join(base, "Artist - Selected", "track-%02d" % track_number) def getTable(self, *_args): return FakeITable() def getHTOA(self): return None def getTagList(self, _track_number, _mbdiscid): return {} def getCoverArt(self, dirname, _mbid): path = os.path.join(dirname, "cover.jpg") with open(path, "wb") as handle: handle.write(b"cover") self.cover_art_paths.append(path) return path def verifyImage(self, _runner, _itable): return None def writeCue(self, _disc_name): self.write_cue = True def write_m3u(self, _disc_name): self.write_m3u_called = True def writeLog(self, _disc_name, _logger): self.write_log_called = True def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total, _cover_art_path, _skipped_tracks, _mbdiscid): track_result = SimpleNamespace(number=track_number, filename="track.flac") program.result.tracks.append(track_result) runner = FakeRunner() monkeypatch.setattr(gui, "Program", FakeProgram) monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace( getDefeatsCache=lambda *_args: True, )) monkeypatch.setattr(gui.utils, "load_device", lambda _device: None) monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None) monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release")) monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False) monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0") monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2") monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace( get_hwinfo=lambda: (None, "vendor", "model", "release") )) monkeypatch.setattr(gui.importlib.util, "find_spec", lambda name: object() if name == "PIL" else None) monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()}) monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None) monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args)) app._rip_track = MethodType(fake_rip_track, app) settings = { "device": "/dev/cdrom", "output_directory": str(tmp_path), "working_directory": None, "country": None, "release_id": None, "unknown": False, "cdr": False, "keep_going": True, "overread": False, "cover_art": "complete", "max_retries": 1, "offset": 0, "logger": "whipper", "track_template": "%t", "disc_template": "%d", } gui.WhipperGui._rip_disc_worker(app, settings) program = program_holder["program"] assert program.write_cue is True assert program.write_m3u_called is True assert program.write_log_called is True assert program.cover_art_paths assert app.status_label.get_text() == "Done" assert "Rip finished successfully" in "".join(app.logs) def test_rip_disc_worker_cancel_during_verify(monkeypatch, tmp_path): app = _make_ui_app(tmp_path) app.current_release = _make_metadata("Selected", 200000, "release-mbid") app.unknown_check.set_active(False) app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]} verify_started = threading.Event() class FakeProgram: def __init__(self, _conf, record=False): assert record is False self.metadata = None self.outdir = None self.result = FakeRipResult() def getFastToc(self, _runner, _device): return SimpleNamespace( getCDDBDiscId=lambda: "cddb-id", getMusicBrainzDiscId=lambda: "mbid-disc", ) def getRipResult(self): return self.result def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None): if track_number is None: return os.path.join(base, "Artist - Selected", "Artist - Selected") return os.path.join(base, "Artist - Selected", "track-%02d" % track_number) def getTable(self, *_args): return FakeITable() def getHTOA(self): return None def verifyImage(self, _runner, _itable): verify_started.set() while not app.rip_cancel_requested: time.sleep(0.01) raise gui.RipCancelledError("Rip cancelled") def writeCue(self, _disc_name): return None def write_m3u(self, _disc_name): return None def writeLog(self, _disc_name, _logger): return None def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total, _cover_art_path, _skipped_tracks, _mbdiscid): program.result.tracks.append(SimpleNamespace(number=track_number, filename="track.flac")) runner = FakeRunner() monkeypatch.setattr(gui, "Program", FakeProgram) monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner) monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace( getDefeatsCache=lambda *_args: True, )) monkeypatch.setattr(gui.utils, "load_device", lambda _device: None) monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None) monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0) monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release")) monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False) monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0") monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2") monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace( get_hwinfo=lambda: (None, "vendor", "model", "release") )) monkeypatch.setattr(gui.importlib.util, "find_spec", lambda _name: None) monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()}) monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None) monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1) monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None) app._rip_track = MethodType(fake_rip_track, app) settings = { "device": "/dev/cdrom", "output_directory": str(tmp_path), "working_directory": None, "country": None, "release_id": None, "unknown": False, "cdr": False, "keep_going": True, "overread": False, "cover_art": None, "max_retries": 1, "offset": 0, "logger": "whipper", "track_template": "%t", "disc_template": "%d", } worker = threading.Thread( target=gui.WhipperGui._rip_disc_worker, args=(app, settings), ) worker.start() assert verify_started.wait(timeout=2.0) gui.WhipperGui._on_stop_clicked(app, None) _drain_glib_until(lambda: not worker.is_alive()) worker.join() assert app.rip_runner is None assert app.rip_cancel_requested is True assert runner.cancelled is True assert app.status_label.get_text() == "Cancelled"