* morituri/common/checksum.py:
Create a GstException to wrap a Gst.GError. Create a base GstPipelineTask class. Use it in Checksum and TRM tasks. Raise and don't proceed to call .paused() when a GstError happens. Should help debug https://bugs.launchpad.net/bugs/735053 * morituri/test/test_common_checksum.py: Adapt test.
This commit is contained in:
11
ChangeLog
11
ChangeLog
@@ -1,3 +1,14 @@
|
||||
2011-03-15 Thomas Vander Stichele <thomas at apestaart dot org>
|
||||
|
||||
* morituri/common/checksum.py:
|
||||
Create a GstException to wrap a Gst.GError.
|
||||
Create a base GstPipelineTask class.
|
||||
Use it in Checksum and TRM tasks.
|
||||
Raise and don't proceed to call .paused() when a GstError happens.
|
||||
Should help debug https://bugs.launchpad.net/bugs/735053
|
||||
* morituri/test/test_common_checksum.py:
|
||||
Adapt test.
|
||||
|
||||
2011-01-09 Thomas Vander Stichele <thomas at apestaart dot org>
|
||||
|
||||
patch by: Ross Burton
|
||||
|
||||
@@ -30,7 +30,80 @@ from morituri.common import common, task
|
||||
|
||||
# checksums are not CRC's. a CRC is a specific type of checksum.
|
||||
|
||||
class ChecksumTask(task.Task):
|
||||
# FIXME: probably this should move higher up the module hierarchy and
|
||||
# be used wider
|
||||
class GstException(Exception):
|
||||
def __init__(self, gerror, debug):
|
||||
self.args = (gerror, debug, )
|
||||
self.gerror = gerror
|
||||
self.debug = debug
|
||||
|
||||
# FIXME: this should move up too; other tasks might have use for it.
|
||||
class GstPipelineTask(task.Task):
|
||||
"""
|
||||
I am a base class for tasks that use a GStreamer pipeline.
|
||||
|
||||
I handle errors and raise them appropriately.
|
||||
"""
|
||||
def start(self, runner):
|
||||
task.Task.start(self, runner)
|
||||
desc = self.getPipelineDesc()
|
||||
|
||||
self.debug('creating pipeline %r', desc)
|
||||
self.pipeline = gst.parse_launch(desc)
|
||||
|
||||
self._bus = self.pipeline.get_bus()
|
||||
gst.debug('got bus %r' % self._bus)
|
||||
|
||||
# a signal watch calls callbacks from an idle loop
|
||||
# self._bus.add_signal_watch()
|
||||
|
||||
# sync emission triggers sync-message signals which calls callbacks
|
||||
# from the thread that signals, but happens immediately
|
||||
self._bus.enable_sync_message_emission()
|
||||
self._bus.connect('sync-message::eos', self.bus_eos_cb)
|
||||
self._bus.connect('sync-message::tag', self.bus_tag_cb)
|
||||
self._bus.connect('sync-message::error', self.bus_error_cb)
|
||||
|
||||
self.parsed()
|
||||
|
||||
self.debug('pausing pipeline')
|
||||
self.pipeline.set_state(gst.STATE_PAUSED)
|
||||
self.pipeline.get_state()
|
||||
self.debug('paused pipeline')
|
||||
|
||||
if not self.exception:
|
||||
self.paused()
|
||||
|
||||
def getPipelineDesc(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def parsed(self):
|
||||
"""
|
||||
Called after parsing the pipeline but before setting it to paused.
|
||||
"""
|
||||
pass
|
||||
|
||||
def paused(self):
|
||||
"""
|
||||
Called after pipeline is paused
|
||||
"""
|
||||
pass
|
||||
|
||||
def bus_eos_cb(self, bus, message):
|
||||
pass
|
||||
|
||||
def bus_tag_cb(self, bus, message):
|
||||
pass
|
||||
|
||||
def bus_error_cb(self, bus, message):
|
||||
exc = GstException(*message.parse_error())
|
||||
self.setAndRaiseException(exc)
|
||||
gst.debug('error, scheduling stop')
|
||||
#self.runner.schedule(0, self.stop)
|
||||
|
||||
|
||||
class ChecksumTask(GstPipelineTask):
|
||||
"""
|
||||
I am a task that calculates a checksum of the decoded audio data.
|
||||
|
||||
@@ -72,20 +145,15 @@ class ChecksumTask(task.Task):
|
||||
|
||||
self.checksum = None # result
|
||||
|
||||
def start(self, runner):
|
||||
task.Task.start(self, runner)
|
||||
self._pipeline = gst.parse_launch('''
|
||||
def getPipelineDesc(self):
|
||||
return '''
|
||||
filesrc location="%s" !
|
||||
decodebin ! audio/x-raw-int !
|
||||
appsink name=sink sync=False emit-signals=True''' %
|
||||
common.quoteParse(self._path).encode('utf-8'))
|
||||
appsink name=sink sync=False emit-signals=True
|
||||
''' % common.quoteParse(self._path).encode('utf-8')
|
||||
|
||||
self.debug('pausing pipeline')
|
||||
self._pipeline.set_state(gst.STATE_PAUSED)
|
||||
self._pipeline.get_state()
|
||||
self.debug('paused pipeline')
|
||||
|
||||
sink = self._pipeline.get_by_name('sink')
|
||||
def paused(self):
|
||||
sink = self.pipeline.get_by_name('sink')
|
||||
|
||||
if self._frameLength < 0:
|
||||
self.debug('query duration')
|
||||
@@ -122,23 +190,22 @@ class ChecksumTask(task.Task):
|
||||
# FIXME: sending it with frameEnd set screws up the seek, we don't get
|
||||
# everything for flac; fixed in recent -good
|
||||
result = sink.send_event(event)
|
||||
self.debug('event sent')
|
||||
self.debug(result)
|
||||
self.debug('event sent, result %r', result)
|
||||
sink.connect('new-buffer', self._new_buffer_cb)
|
||||
sink.connect('eos', self._eos_cb)
|
||||
|
||||
self.debug('scheduling setting to play')
|
||||
# since set_state returns non-False, adding it as timeout_add
|
||||
# will repeatedly call it, and block the main loop; so
|
||||
# gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
|
||||
# gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
|
||||
# would not work.
|
||||
|
||||
def play():
|
||||
self._pipeline.set_state(gst.STATE_PLAYING)
|
||||
self.pipeline.set_state(gst.STATE_PLAYING)
|
||||
return False
|
||||
self.runner.schedule(0, play)
|
||||
|
||||
#self._pipeline.set_state(gst.STATE_PLAYING)
|
||||
#self.pipeline.set_state(gst.STATE_PLAYING)
|
||||
self.debug('scheduled setting to play')
|
||||
|
||||
def _new_buffer_cb(self, sink):
|
||||
@@ -185,7 +252,7 @@ class ChecksumTask(task.Task):
|
||||
def stop(self):
|
||||
self.debug('stopping')
|
||||
self.debug('setting state to NULL')
|
||||
self._pipeline.set_state(gst.STATE_NULL)
|
||||
self.pipeline.set_state(gst.STATE_NULL)
|
||||
|
||||
if not self._last:
|
||||
# see http://bugzilla.gnome.org/show_bug.cgi?id=578612
|
||||
@@ -272,7 +339,7 @@ class AccurateRipChecksumTask(ChecksumTask):
|
||||
|
||||
return checksum
|
||||
|
||||
class TRMTask(task.Task):
|
||||
class TRMTask(GstPipelineTask):
|
||||
"""
|
||||
I calculate a MusicBrainz TRM fingerprint.
|
||||
|
||||
@@ -288,63 +355,52 @@ class TRMTask(task.Task):
|
||||
|
||||
self.path = path
|
||||
self._trm = None
|
||||
self._pipeline = None
|
||||
self._bus = None
|
||||
|
||||
def start(self, runner):
|
||||
task.Task.start(self, runner)
|
||||
self._pipeline = gst.parse_launch('''
|
||||
def getPipelineDesc(self):
|
||||
return '''
|
||||
filesrc location="%s" !
|
||||
decodebin ! audioconvert ! audio/x-raw-int !
|
||||
trm name=trm !
|
||||
appsink name=sink sync=False emit-signals=True''' % self.path)
|
||||
self._bus = self._pipeline.get_bus()
|
||||
self._bus.add_signal_watch()
|
||||
self._bus.connect('message::eos', self._bus_eos_cb)
|
||||
self._bus.connect('message::tag', self._bus_tag_cb)
|
||||
self._bus.connect('message::error', self._bus_error_cb)
|
||||
sink = self._pipeline.get_by_name('sink')
|
||||
appsink name=sink sync=False emit-signals=True''' % self.path
|
||||
|
||||
|
||||
def parsed(self):
|
||||
sink = self.pipeline.get_by_name('sink')
|
||||
sink.connect('new-buffer', self._new_buffer_cb)
|
||||
|
||||
gst.debug('pausing')
|
||||
self._pipeline.set_state(gst.STATE_PAUSED)
|
||||
gst.debug('paused')
|
||||
self._pipeline.get_state()
|
||||
gst.debug('paused')
|
||||
|
||||
def paused(self):
|
||||
gst.debug('query duration')
|
||||
sink = self._pipeline.get_by_name('sink')
|
||||
sink = self.pipeline.get_by_name('sink')
|
||||
|
||||
self._length, qformat = self._pipeline.query_duration(gst.FORMAT_TIME)
|
||||
self._length, qformat = self.pipeline.query_duration(gst.FORMAT_TIME)
|
||||
gst.debug('total length: %r' % self._length)
|
||||
gst.debug('scheduling setting to play')
|
||||
# since set_state returns non-False, adding it as timeout_add
|
||||
# will repeatedly call it, and block the main loop; so
|
||||
# gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
|
||||
# gobject.timeout_add(0L, self.pipeline.set_state, gst.STATE_PLAYING)
|
||||
# would not work.
|
||||
|
||||
def play():
|
||||
self._pipeline.set_state(gst.STATE_PLAYING)
|
||||
self.pipeline.set_state(gst.STATE_PLAYING)
|
||||
return False
|
||||
self.runner.schedule(0, play)
|
||||
|
||||
#self._pipeline.set_state(gst.STATE_PLAYING)
|
||||
#self.pipeline.set_state(gst.STATE_PLAYING)
|
||||
gst.debug('scheduled setting to play')
|
||||
|
||||
def _bus_eos_cb(self, bus, message):
|
||||
# FIXME: can't move this to base class because it triggers too soon
|
||||
# in the case of checksum
|
||||
def bus_eos_cb(self, bus, message):
|
||||
gst.debug('eos, scheduling stop')
|
||||
self.runner.schedule(0, self.stop)
|
||||
|
||||
def _bus_tag_cb(self, bus, message):
|
||||
|
||||
def bus_tag_cb(self, bus, message):
|
||||
taglist = message.parse_tag()
|
||||
if 'musicbrainz-trmid' in taglist.keys():
|
||||
self._trm = taglist['musicbrainz-trmid']
|
||||
|
||||
def _bus_error_cb(self, bus, message):
|
||||
error = message.parse_error()
|
||||
# FIXME: handle properly
|
||||
print error
|
||||
|
||||
def _new_buffer_cb(self, sink):
|
||||
# this is just for counting progress
|
||||
buf = sink.emit('pull-buffer')
|
||||
@@ -356,7 +412,7 @@ class TRMTask(task.Task):
|
||||
def stop(self):
|
||||
gst.debug('stopping')
|
||||
gst.debug('setting state to NULL')
|
||||
self._pipeline.set_state(gst.STATE_NULL)
|
||||
self.pipeline.set_state(gst.STATE_NULL)
|
||||
|
||||
# publicize and stop
|
||||
self.trm = self._trm
|
||||
|
||||
@@ -27,7 +27,7 @@ class EmptyTestCase(common.TestCase):
|
||||
# FIXME: do we want a specific error for this ?
|
||||
e = self.assertRaises(task.TaskException, self.runner.run,
|
||||
checksumtask, verbose=False)
|
||||
self.failUnless(isinstance(e.exception, gst.QueryError))
|
||||
self.failUnless(isinstance(e.exception, checksum.GstException))
|
||||
os.unlink(path)
|
||||
|
||||
class PathTestCase(common.TestCase):
|
||||
@@ -37,7 +37,7 @@ class PathTestCase(common.TestCase):
|
||||
checksumtask = checksum.ChecksumTask(path)
|
||||
e = self.assertRaises(task.TaskException, self.runner.run,
|
||||
checksumtask, verbose=False)
|
||||
self.failUnless(isinstance(e.exception, gst.QueryError))
|
||||
self.failUnless(isinstance(e.exception, checksum.GstException))
|
||||
os.unlink(path)
|
||||
|
||||
class UnicodePathTestCase(PathTestCase, common.UnicodeTestMixin):
|
||||
|
||||
Reference in New Issue
Block a user