Files
whipper-gui/whipper/test/test_gui.py
2026-04-18 17:55:52 +03:00

680 lines
21 KiB
Python

import os
import threading
import time
from types import MethodType, SimpleNamespace
import pytest
from whipper import gui
def _bind_methods(app, *names):
for name in names:
setattr(app, name, MethodType(getattr(gui.WhipperGui, name), app))
return app
def _drain_glib_until(predicate, timeout=2.0):
context = gui.GLib.MainContext.default()
deadline = time.time() + timeout
while time.time() < deadline:
while context.pending():
context.iteration(False)
if predicate():
return
time.sleep(0.01)
while context.pending():
context.iteration(False)
assert predicate()
class FakeEntry:
def __init__(self, text=""):
self.text = text
def get_text(self):
return self.text
def set_text(self, value):
self.text = value
class FakeToggle:
def __init__(self, active=False):
self.active = active
def get_active(self):
return self.active
def set_active(self, value):
self.active = bool(value)
class FakeCombo:
def __init__(self, active_id=None):
self.active_id = active_id
def get_active_id(self):
return self.active_id
def set_active_id(self, value):
self.active_id = value
def set_active(self, index):
self.active_id = "" if index == 0 else self.active_id
class FakeSpin:
def __init__(self, value=0):
self.value = value
def get_value(self):
return self.value
def set_value(self, value):
self.value = value
class FakeButton:
def __init__(self):
self.sensitive = None
def set_sensitive(self, value):
self.sensitive = bool(value)
class FakeLabel:
def __init__(self, text=""):
self.text = text
def set_text(self, value):
self.text = value
def get_text(self):
return self.text
class FakeProgressBar:
def __init__(self):
self.fraction = 0.0
self.text = ""
self.pulses = 0
def set_fraction(self, value):
self.fraction = value
def set_text(self, value):
self.text = value
def pulse(self):
self.pulses += 1
class FakeFileChooser:
def __init__(self, filename=None):
self.filename = filename
def get_filename(self):
return self.filename
def set_filename(self, filename):
self.filename = filename
class FakeWindow:
def __init__(self, width=640, height=480):
self.size = (width, height)
def get_size(self):
return self.size
def resize(self, width, height):
self.size = (width, height)
class FakePane:
def __init__(self, position=0):
self.position = position
def get_position(self):
return self.position
def set_position(self, value):
self.position = value
class FakeListStore(list):
def clear(self):
del self[:]
def append(self, row):
super().append(row)
def get_value(self, tree_iter, index):
return self[tree_iter][index]
class FakeSelection:
def __init__(self, model):
self.model = model
self.selected = None
def get_selected(self):
return self.model, self.selected
def select_path(self, path):
self.selected = path
class FakeReleaseView:
def __init__(self, store):
self.selection = FakeSelection(store)
def get_selection(self):
return self.selection
class FakeRunner:
def __init__(self):
self.cancelled = False
def run(self, task, verbose=False):
self.task = task
def cancel(self):
self.cancelled = True
class FakeTrack:
def __init__(self, audio=True):
self.audio = audio
self.isrc = None
self.pre_emphasis = False
self.indexes = {
1: SimpleNamespace(relative=1),
}
def getPregap(self):
return 0
class FakeITable:
def __init__(self):
self.tracks = [FakeTrack(audio=True)]
class FakeResultTable:
def getTrackStart(self, _track_number):
return 0
def getTrackEnd(self, _track_number):
return 9
def getTrackLength(self, _track_number):
return 10
def setFile(self, *_args):
return None
class FakeRipResult:
def __init__(self):
self.tracks = []
self.table = FakeResultTable()
self.isCdr = False
def getTrackResult(self, track_number):
for track in self.tracks:
if track.number == track_number:
return track
return None
def _make_metadata(title, duration, mbid):
return SimpleNamespace(
artist="Artist",
releaseTitle=title,
title=title,
release="2024-01-01",
releaseType="Album",
countries=["US"],
duration=duration,
tracks=[],
discNumber=1,
discTotal=1,
mbid=mbid,
url="https://example.invalid/%s" % mbid,
barcode=None,
catalogNumbers=[],
)
def _make_ui_app(tmp_path):
app = SimpleNamespace()
app.window = FakeWindow(111, 222)
app.main_pane = FakePane(77)
app.output_button = FakeFileChooser(str(tmp_path))
app.working_directory_entry = FakeEntry("")
app.country_entry = FakeEntry("")
app.release_id_entry = FakeEntry("")
app.unknown_check = FakeToggle(True)
app.cdr_check = FakeToggle(False)
app.keep_going_check = FakeToggle(True)
app.overread_check = FakeToggle(False)
app.cover_art_combo = FakeCombo("")
app.max_retries_spin = FakeSpin(5)
app.offset_spin = FakeSpin(0)
app.logger_combo = FakeCombo("whipper")
app.track_template_entry = FakeEntry("%t")
app.disc_template_entry = FakeEntry("%d")
app.read_button = FakeButton()
app.refresh_button = FakeButton()
app.rip_button = FakeButton()
app.stop_button = FakeButton()
app.status_label = FakeLabel()
app.progress_label = FakeLabel()
app.overall_bar = FakeProgressBar()
app.track_bar = FakeProgressBar()
app.release_store = FakeListStore()
app.track_store = FakeListStore()
app.release_view = FakeReleaseView(app.release_store)
app.release_details = FakeLabel()
app.info_labels = {key: FakeLabel() for key in [
"device", "disc_status", "cddb", "mbid", "duration", "tracks"
]}
app.scan_runner = None
app.rip_runner = None
app.scan_cancel_requested = False
app.rip_cancel_requested = False
app.current_release = None
app.current_track_number = 0
app.current_track_total = 0
app.pulse_id = 0
app.logs = []
app._append_log = lambda text: app.logs.append(text)
app._clear_log = lambda: app.logs.clear()
app._config_path = lambda: tmp_path / "gui.json"
_bind_methods(
app,
"_collect_gui_settings",
"_save_gui_settings",
"_load_gui_settings",
"_can_rip",
"_set_label",
"_set_running_state",
"_update_release_store",
"_update_track_store",
"_update_release_details",
"_update_rip_task_progress",
"_finish_rip",
"_reset_progress",
"_pulse_progress",
"_resolve_release_metadata",
"_on_release_selected",
"_on_stop_clicked",
)
app._remove_gui_log_handler = gui.WhipperGui._remove_gui_log_handler
app._install_gui_log_handler = MethodType(gui.WhipperGui._install_gui_log_handler, app)
return app
def test_gui_settings_roundtrip(tmp_path, monkeypatch):
app = _make_ui_app(tmp_path)
app.output_button.set_filename(str(tmp_path / "output"))
os.mkdir(app.output_button.get_filename())
app.working_directory_entry.set_text(str(tmp_path))
app.country_entry.set_text("JP")
app.release_id_entry.set_text("release-id")
app.cdr_check.set_active(True)
app.cover_art_combo.set_active_id("embed")
app.max_retries_spin.set_value(7)
app.offset_spin.set_value(123)
app.track_template_entry.set_text("%A/%t")
app.disc_template_entry.set_text("%A/%d")
monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args))
gui.WhipperGui._save_gui_settings(app)
app.output_button.set_filename(None)
app.working_directory_entry.set_text("")
app.country_entry.set_text("")
app.release_id_entry.set_text("")
app.unknown_check.set_active(False)
app.cdr_check.set_active(False)
app.keep_going_check.set_active(False)
app.overread_check.set_active(True)
app.cover_art_combo.set_active_id(None)
app.max_retries_spin.set_value(0)
app.offset_spin.set_value(0)
app.logger_combo.set_active_id(None)
app.track_template_entry.set_text("")
app.disc_template_entry.set_text("")
gui.WhipperGui._load_gui_settings(app)
assert app.output_button.get_filename() == str(tmp_path / "output")
assert app.working_directory_entry.get_text() == str(tmp_path)
assert app.country_entry.get_text() == "JP"
assert app.release_id_entry.get_text() == "release-id"
assert app.unknown_check.get_active() is True
assert app.cdr_check.get_active() is True
assert app.keep_going_check.get_active() is True
assert app.overread_check.get_active() is False
assert app.cover_art_combo.get_active_id() == "embed"
assert app.max_retries_spin.get_value() == 7
assert app.offset_spin.get_value() == 123
assert app.logger_combo.get_active_id() == "whipper"
assert app.track_template_entry.get_text() == "%A/%t"
assert app.disc_template_entry.get_text() == "%A/%d"
assert app.window.get_size() == (111, 222)
assert app.main_pane.get_position() == 77
def test_release_selection_and_progress_updates(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
selection = app.release_view.get_selection()
metadata = _make_metadata("Chosen", 1000, "mbid-1")
metadata.tracks = [SimpleNamespace(artist="Track Artist", title="Track Title", duration=210000)]
app.release_store.append(["Artist", "Chosen", "2024", "Album", "US", metadata])
selection.select_path(0)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 99)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
gui.WhipperGui._on_release_selected(app, selection)
gui.WhipperGui._set_running_state(app, True, "Busy")
assert app.read_button.sensitive is False
assert app.refresh_button.sensitive is False
assert app.stop_button.sensitive is True
gui.WhipperGui._update_rip_task_progress(app, "Encoding", "Track 1", 1, 2, 0.25)
gui.WhipperGui._finish_rip(app, 0, "Done")
assert app.current_release is metadata
assert app.track_store[0][1] == "Track Artist"
assert app.release_details.get_text().startswith("Artist: Artist")
assert app.overall_bar.fraction == 1.0
assert app.track_bar.fraction == 1.0
assert app.progress_label.get_text() == "Rip complete"
def test_main_reports_missing_runtime(monkeypatch, capsys):
monkeypatch.setattr(gui, "_GUI_IMPORT_ERROR", ImportError("missing"))
monkeypatch.setattr(gui, "gi", None)
monkeypatch.setattr(gui, "cdio", None)
assert gui.main() == 1
assert "whipper-gui requires" in capsys.readouterr().err
def test_read_disc_worker_processes_idle_callbacks(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
calls = []
class FakeReadTOCTask:
def __init__(self, _device, fast_toc=True):
assert fast_toc is True
self.toc = SimpleNamespace(table=SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
duration=lambda: 200000,
getAudioTracks=lambda: 10,
))
runner = FakeRunner()
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.cdrdao, "ReadTOCTask", FakeReadTOCTask)
monkeypatch.setattr(gui.utils, "load_device", lambda device: calls.append(("load", device)))
monkeypatch.setattr(gui.utils, "unmount_device", lambda device: calls.append(("unmount", device)))
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(
gui.mbngs,
"musicbrainz",
lambda *_args, **_kwargs: [
_make_metadata("Far", 260000, "r2"),
_make_metadata("Near", 205000, "r1"),
],
)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
worker = threading.Thread(
target=gui.WhipperGui._read_disc_worker,
args=(app, "/dev/cdrom", "US"),
)
worker.start()
_drain_glib_until(lambda: not worker.is_alive())
worker.join()
assert calls == [("load", "/dev/cdrom"), ("unmount", "/dev/cdrom")]
assert app.scan_data["mbid"] == "mbid-disc"
assert app.scan_data["releases"][0].releaseTitle == "Near"
assert app.info_labels["disc_status"].get_text() == "Ready"
assert "Found 2 matching release(s)" in "".join(app.logs)
def test_rip_disc_worker_success(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
app.current_release = _make_metadata("Selected", 200000, "release-mbid")
app.release_id_entry.set_text("")
app.unknown_check.set_active(False)
app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]}
program_holder = {}
class FakeProgram:
def __init__(self, _conf, record=False):
assert record is False
self.metadata = None
self.outdir = None
self.result = FakeRipResult()
self.cover_art_paths = []
self.write_cue = False
self.write_m3u_called = False
self.write_log_called = False
program_holder["program"] = self
def getFastToc(self, _runner, _device):
return SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
)
def getRipResult(self):
return self.result
def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None):
if track_number is None:
return os.path.join(base, "Artist - Selected", "Artist - Selected")
return os.path.join(base, "Artist - Selected", "track-%02d" % track_number)
def getTable(self, *_args):
return FakeITable()
def getHTOA(self):
return None
def getTagList(self, _track_number, _mbdiscid):
return {}
def getCoverArt(self, dirname, _mbid):
path = os.path.join(dirname, "cover.jpg")
with open(path, "wb") as handle:
handle.write(b"cover")
self.cover_art_paths.append(path)
return path
def verifyImage(self, _runner, _itable):
return None
def writeCue(self, _disc_name):
self.write_cue = True
def write_m3u(self, _disc_name):
self.write_m3u_called = True
def writeLog(self, _disc_name, _logger):
self.write_log_called = True
def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total,
_cover_art_path, _skipped_tracks, _mbdiscid):
track_result = SimpleNamespace(number=track_number, filename="track.flac")
program.result.tracks.append(track_result)
runner = FakeRunner()
monkeypatch.setattr(gui, "Program", FakeProgram)
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace(
getDefeatsCache=lambda *_args: True,
))
monkeypatch.setattr(gui.utils, "load_device", lambda _device: None)
monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None)
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release"))
monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False)
monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0")
monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2")
monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace(
get_hwinfo=lambda: (None, "vendor", "model", "release")
))
monkeypatch.setattr(gui.importlib.util, "find_spec", lambda name: object() if name == "PIL" else None)
monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()})
monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None)
monkeypatch.setattr(gui.GLib, "idle_add", lambda func, *args: func(*args))
app._rip_track = MethodType(fake_rip_track, app)
settings = {
"device": "/dev/cdrom",
"output_directory": str(tmp_path),
"working_directory": None,
"country": None,
"release_id": None,
"unknown": False,
"cdr": False,
"keep_going": True,
"overread": False,
"cover_art": "complete",
"max_retries": 1,
"offset": 0,
"logger": "whipper",
"track_template": "%t",
"disc_template": "%d",
}
gui.WhipperGui._rip_disc_worker(app, settings)
program = program_holder["program"]
assert program.write_cue is True
assert program.write_m3u_called is True
assert program.write_log_called is True
assert program.cover_art_paths
assert app.status_label.get_text() == "Done"
assert "Rip finished successfully" in "".join(app.logs)
def test_rip_disc_worker_cancel_during_verify(monkeypatch, tmp_path):
app = _make_ui_app(tmp_path)
app.current_release = _make_metadata("Selected", 200000, "release-mbid")
app.unknown_check.set_active(False)
app.scan_data = {"mbid": "mbid-disc", "releases": [app.current_release]}
verify_started = threading.Event()
class FakeProgram:
def __init__(self, _conf, record=False):
assert record is False
self.metadata = None
self.outdir = None
self.result = FakeRipResult()
def getFastToc(self, _runner, _device):
return SimpleNamespace(
getCDDBDiscId=lambda: "cddb-id",
getMusicBrainzDiscId=lambda: "mbid-disc",
)
def getRipResult(self):
return self.result
def getPath(self, base, _template, _mbdiscid, _metadata, track_number=None):
if track_number is None:
return os.path.join(base, "Artist - Selected", "Artist - Selected")
return os.path.join(base, "Artist - Selected", "track-%02d" % track_number)
def getTable(self, *_args):
return FakeITable()
def getHTOA(self):
return None
def verifyImage(self, _runner, _itable):
verify_started.set()
while not app.rip_cancel_requested:
time.sleep(0.01)
raise gui.RipCancelledError("Rip cancelled")
def writeCue(self, _disc_name):
return None
def write_m3u(self, _disc_name):
return None
def writeLog(self, _disc_name, _logger):
return None
def fake_rip_track(_self, _runner, program, _itable, _settings, track_number, _item_index, _item_total,
_cover_art_path, _skipped_tracks, _mbdiscid):
program.result.tracks.append(SimpleNamespace(number=track_number, filename="track.flac"))
runner = FakeRunner()
monkeypatch.setattr(gui, "Program", FakeProgram)
monkeypatch.setattr(gui, "CancellableSyncRunner", lambda: runner)
monkeypatch.setattr(gui.config, "Config", lambda: SimpleNamespace(
getDefeatsCache=lambda *_args: True,
))
monkeypatch.setattr(gui.utils, "load_device", lambda _device: None)
monkeypatch.setattr(gui.utils, "unmount_device", lambda _device: None)
monkeypatch.setattr(gui.drive, "get_cdrom_drive_status", lambda _device: 0)
monkeypatch.setattr(gui.drive, "getDeviceInfo", lambda _device: ("vendor", "model", "release"))
monkeypatch.setattr(gui.cdrdao, "DetectCdr", lambda _device: False)
monkeypatch.setattr(gui.cdrdao, "version", lambda: "1.0")
monkeypatch.setattr(gui.cdparanoia, "getCdParanoiaVersion", lambda: "10.2")
monkeypatch.setattr(gui.cdio, "Device", lambda _device: SimpleNamespace(
get_hwinfo=lambda: (None, "vendor", "model", "release")
))
monkeypatch.setattr(gui.importlib.util, "find_spec", lambda _name: None)
monkeypatch.setattr(gui.result, "getLoggers", lambda: {"whipper": lambda: object()})
monkeypatch.setattr(gui.accurip, "print_report", lambda _result: None)
monkeypatch.setattr(gui.GLib, "timeout_add", lambda *_args: 1)
monkeypatch.setattr(gui.GLib, "source_remove", lambda *_args: None)
app._rip_track = MethodType(fake_rip_track, app)
settings = {
"device": "/dev/cdrom",
"output_directory": str(tmp_path),
"working_directory": None,
"country": None,
"release_id": None,
"unknown": False,
"cdr": False,
"keep_going": True,
"overread": False,
"cover_art": None,
"max_retries": 1,
"offset": 0,
"logger": "whipper",
"track_template": "%t",
"disc_template": "%d",
}
worker = threading.Thread(
target=gui.WhipperGui._rip_disc_worker,
args=(app, settings),
)
worker.start()
assert verify_started.wait(timeout=2.0)
gui.WhipperGui._on_stop_clicked(app, None)
_drain_glib_until(lambda: not worker.is_alive())
worker.join()
assert app.rip_runner is None
assert app.rip_cancel_requested is True
assert runner.cancelled is True
assert app.status_label.get_text() == "Cancelled"