From e52d20c19a572c5ce07943994b8ac4866c448078 Mon Sep 17 00:00:00 2001 From: Thomas Vander Stichele Date: Tue, 15 Mar 2011 22:44:05 +0000 Subject: [PATCH] * morituri/common/checksum.py: Create a GstException to wrap a Gst.GError. Create a base GstPipelineTask class. Use it in Checksum and TRM tasks. Raise and don't proceed to call .paused() when a GstError happens. Should help debug https://bugs.launchpad.net/bugs/735053 * morituri/test/test_common_checksum.py: Adapt test. --- ChangeLog | 11 ++ morituri/common/checksum.py | 154 ++++++++++++++++++-------- morituri/test/test_common_checksum.py | 4 +- 3 files changed, 118 insertions(+), 51 deletions(-) diff --git a/ChangeLog b/ChangeLog index e28194c..334f7ab 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,14 @@ +2011-03-15 Thomas Vander Stichele + + * morituri/common/checksum.py: + Create a GstException to wrap a Gst.GError. + Create a base GstPipelineTask class. + Use it in Checksum and TRM tasks. + Raise and don't proceed to call .paused() when a GstError happens. + Should help debug https://bugs.launchpad.net/bugs/735053 + * morituri/test/test_common_checksum.py: + Adapt test. + 2011-01-09 Thomas Vander Stichele patch by: Ross Burton diff --git a/morituri/common/checksum.py b/morituri/common/checksum.py index 00ecd63..c57af97 100644 --- a/morituri/common/checksum.py +++ b/morituri/common/checksum.py @@ -30,7 +30,80 @@ from morituri.common import common, task # checksums are not CRC's. a CRC is a specific type of checksum. -class ChecksumTask(task.Task): +# FIXME: probably this should move higher up the module hierarchy and +# be used wider +class GstException(Exception): + def __init__(self, gerror, debug): + self.args = (gerror, debug, ) + self.gerror = gerror + self.debug = debug + +# FIXME: this should move up too; other tasks might have use for it. +class GstPipelineTask(task.Task): + """ + I am a base class for tasks that use a GStreamer pipeline. + + I handle errors and raise them appropriately. + """ + def start(self, runner): + task.Task.start(self, runner) + desc = self.getPipelineDesc() + + self.debug('creating pipeline %r', desc) + self.pipeline = gst.parse_launch(desc) + + self._bus = self.pipeline.get_bus() + gst.debug('got bus %r' % self._bus) + + # a signal watch calls callbacks from an idle loop + # self._bus.add_signal_watch() + + # sync emission triggers sync-message signals which calls callbacks + # from the thread that signals, but happens immediately + self._bus.enable_sync_message_emission() + self._bus.connect('sync-message::eos', self.bus_eos_cb) + self._bus.connect('sync-message::tag', self.bus_tag_cb) + self._bus.connect('sync-message::error', self.bus_error_cb) + + self.parsed() + + self.debug('pausing pipeline') + self.pipeline.set_state(gst.STATE_PAUSED) + self.pipeline.get_state() + self.debug('paused pipeline') + + if not self.exception: + self.paused() + + def getPipelineDesc(self): + raise NotImplementedError + + def parsed(self): + """ + Called after parsing the pipeline but before setting it to paused. + """ + pass + + def paused(self): + """ + Called after pipeline is paused + """ + pass + + def bus_eos_cb(self, bus, message): + pass + + def bus_tag_cb(self, bus, message): + pass + + def bus_error_cb(self, bus, message): + exc = GstException(*message.parse_error()) + self.setAndRaiseException(exc) + gst.debug('error, scheduling stop') + #self.runner.schedule(0, self.stop) + + +class ChecksumTask(GstPipelineTask): """ I am a task that calculates a checksum of the decoded audio data. @@ -72,20 +145,15 @@ class ChecksumTask(task.Task): self.checksum = None # result - def start(self, runner): - task.Task.start(self, runner) - self._pipeline = gst.parse_launch(''' + def getPipelineDesc(self): + return ''' filesrc location="%s" ! decodebin ! audio/x-raw-int ! - appsink name=sink sync=False emit-signals=True''' % - common.quoteParse(self._path).encode('utf-8')) + appsink name=sink sync=False emit-signals=True + ''' % common.quoteParse(self._path).encode('utf-8') - self.debug('pausing pipeline') - self._pipeline.set_state(gst.STATE_PAUSED) - self._pipeline.get_state() - self.debug('paused pipeline') - - sink = self._pipeline.get_by_name('sink') + def paused(self): + sink = self.pipeline.get_by_name('sink') if self._frameLength < 0: self.debug('query duration') @@ -122,23 +190,22 @@ class ChecksumTask(task.Task): # FIXME: sending it with frameEnd set screws up the seek, we don't get # everything for flac; fixed in recent -good result = sink.send_event(event) - self.debug('event sent') - self.debug(result) + self.debug('event sent, result %r', result) sink.connect('new-buffer', self._new_buffer_cb) sink.connect('eos', self._eos_cb) self.debug('scheduling setting to play') # since set_state returns non-False, adding it as timeout_add # will repeatedly call it, and block the main loop; so - # gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING) + # gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) # would not work. def play(): - self._pipeline.set_state(gst.STATE_PLAYING) + self.pipeline.set_state(gst.STATE_PLAYING) return False self.runner.schedule(0, play) - #self._pipeline.set_state(gst.STATE_PLAYING) + #self.pipeline.set_state(gst.STATE_PLAYING) self.debug('scheduled setting to play') def _new_buffer_cb(self, sink): @@ -185,7 +252,7 @@ class ChecksumTask(task.Task): def stop(self): self.debug('stopping') self.debug('setting state to NULL') - self._pipeline.set_state(gst.STATE_NULL) + self.pipeline.set_state(gst.STATE_NULL) if not self._last: # see http://bugzilla.gnome.org/show_bug.cgi?id=578612 @@ -272,7 +339,7 @@ class AccurateRipChecksumTask(ChecksumTask): return checksum -class TRMTask(task.Task): +class TRMTask(GstPipelineTask): """ I calculate a MusicBrainz TRM fingerprint. @@ -288,63 +355,52 @@ class TRMTask(task.Task): self.path = path self._trm = None - self._pipeline = None self._bus = None - def start(self, runner): - task.Task.start(self, runner) - self._pipeline = gst.parse_launch(''' + def getPipelineDesc(self): + return ''' filesrc location="%s" ! decodebin ! audioconvert ! audio/x-raw-int ! trm name=trm ! - appsink name=sink sync=False emit-signals=True''' % self.path) - self._bus = self._pipeline.get_bus() - self._bus.add_signal_watch() - self._bus.connect('message::eos', self._bus_eos_cb) - self._bus.connect('message::tag', self._bus_tag_cb) - self._bus.connect('message::error', self._bus_error_cb) - sink = self._pipeline.get_by_name('sink') + appsink name=sink sync=False emit-signals=True''' % self.path + + + def parsed(self): + sink = self.pipeline.get_by_name('sink') sink.connect('new-buffer', self._new_buffer_cb) - gst.debug('pausing') - self._pipeline.set_state(gst.STATE_PAUSED) - gst.debug('paused') - self._pipeline.get_state() - gst.debug('paused') - + def paused(self): gst.debug('query duration') - sink = self._pipeline.get_by_name('sink') + sink = self.pipeline.get_by_name('sink') - self._length, qformat = self._pipeline.query_duration(gst.FORMAT_TIME) + self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME) gst.debug('total length: %r' % self._length) gst.debug('scheduling setting to play') # since set_state returns non-False, adding it as timeout_add # will repeatedly call it, and block the main loop; so - # gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING) + # gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING) # would not work. def play(): - self._pipeline.set_state(gst.STATE_PLAYING) + self.pipeline.set_state(gst.STATE_PLAYING) return False self.runner.schedule(0, play) - #self._pipeline.set_state(gst.STATE_PLAYING) + #self.pipeline.set_state(gst.STATE_PLAYING) gst.debug('scheduled setting to play') - def _bus_eos_cb(self, bus, message): + # FIXME: can't move this to base class because it triggers too soon + # in the case of checksum + def bus_eos_cb(self, bus, message): gst.debug('eos, scheduling stop') self.runner.schedule(0, self.stop) - def _bus_tag_cb(self, bus, message): + + def bus_tag_cb(self, bus, message): taglist = message.parse_tag() if 'musicbrainz-trmid' in taglist.keys(): self._trm = taglist['musicbrainz-trmid'] - def _bus_error_cb(self, bus, message): - error = message.parse_error() - # FIXME: handle properly - print error - def _new_buffer_cb(self, sink): # this is just for counting progress buf = sink.emit('pull-buffer') @@ -356,7 +412,7 @@ class TRMTask(task.Task): def stop(self): gst.debug('stopping') gst.debug('setting state to NULL') - self._pipeline.set_state(gst.STATE_NULL) + self.pipeline.set_state(gst.STATE_NULL) # publicize and stop self.trm = self._trm diff --git a/morituri/test/test_common_checksum.py b/morituri/test/test_common_checksum.py index d32b9d0..6e802d7 100644 --- a/morituri/test/test_common_checksum.py +++ b/morituri/test/test_common_checksum.py @@ -27,7 +27,7 @@ class EmptyTestCase(common.TestCase): # FIXME: do we want a specific error for this ? e = self.assertRaises(task.TaskException, self.runner.run, checksumtask, verbose=False) - self.failUnless(isinstance(e.exception, gst.QueryError)) + self.failUnless(isinstance(e.exception, checksum.GstException)) os.unlink(path) class PathTestCase(common.TestCase): @@ -37,7 +37,7 @@ class PathTestCase(common.TestCase): checksumtask = checksum.ChecksumTask(path) e = self.assertRaises(task.TaskException, self.runner.run, checksumtask, verbose=False) - self.failUnless(isinstance(e.exception, gst.QueryError)) + self.failUnless(isinstance(e.exception, checksum.GstException)) os.unlink(path) class UnicodePathTestCase(PathTestCase, common.UnicodeTestMixin):