diff --git a/ChangeLog b/ChangeLog index c873ef3..6763cab 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,15 @@ +2011-05-23 Thomas Vander Stichele + + * morituri/test/test_common_encode.py: + Generate an actual file by spawning gst-launch; otherwise + with proper error handling we get an error from wavparse that + there is not enough data to typefind. + * morituri/common/gstreamer.py: + Move the gst import to start() and set it as a class attribute. + Document methods. + * morituri/common/encode.py: + Convert EncodeTask to a GstPipelineTask. + 2011-05-23 Thomas Vander Stichele * morituri/common/encode.py: diff --git a/morituri/common/encode.py b/morituri/common/encode.py index f9322e9..b260917 100644 --- a/morituri/common/encode.py +++ b/morituri/common/encode.py @@ -25,7 +25,7 @@ import os import shutil import tempfile -from morituri.common import common, task, log +from morituri.common import common, task, log, gstreamer class Profile(object): name = None @@ -122,7 +122,7 @@ LOSSY_PROFILES = { ALL_PROFILES = PROFILES.copy() ALL_PROFILES.update(LOSSY_PROFILES) -class EncodeTask(task.Task): +class EncodeTask(gstreamer.GstPipelineTask): """ I am a task that encodes a .wav file. I set tags too. @@ -158,13 +158,8 @@ class EncodeTask(task.Task): self.description = "Encoding %s" % what self._profile.test() - def start(self, runner): - task.Task.start(self, runner) - - # here to avoid import gst eating our options - import gst - - desc = ''' + def getPipelineDesc(self): + return ''' filesrc location="%s" ! decodebin name=decoder ! audio/x-raw-int,width=16,depth=16,channels=2 ! @@ -175,68 +170,62 @@ class EncodeTask(task.Task): self._profile.pipeline, common.quoteParse(self._outpath).encode('utf-8')) - self.debug('creating pipeline: %r', desc) - self._pipeline = gst.parse_launch(desc) - - tagger = self._pipeline.get_by_name('tagger') + def parsed(self): + tagger = self.pipeline.get_by_name('tagger') # set tags if tagger and self._taglist: # FIXME: under which conditions do we not have merge_tags ? # See for example comment saying wavenc did not have it. try: - tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND) + tagger.merge_tags(self._taglist, self.gst.TAG_MERGE_APPEND) except AttributeError, e: self.warning('Could not merge tags: %r', log.getExceptionMessage(e)) - self.debug('pausing pipeline') - self._pipeline.set_state(gst.STATE_PAUSED) - self._pipeline.get_state() - self.debug('paused pipeline') - + def paused(self): # get length - identity = self._pipeline.get_by_name('identity') + identity = self.pipeline.get_by_name('identity') self.debug('query duration') try: - length, qformat = identity.query_duration(gst.FORMAT_DEFAULT) - except gst.QueryError, e: + length, qformat = identity.query_duration(self.gst.FORMAT_DEFAULT) + except self.gst.QueryError, e: self.setException(e) self.stop() return # wavparse 0.10.14 returns in bytes - if qformat == gst.FORMAT_BYTES: + if qformat == self.gst.FORMAT_BYTES: self.debug('query returned in BYTES format') length /= 4 self.debug('total length: %r', length) self._length = length - # add eos handling - bus = self._pipeline.get_bus() - bus.add_signal_watch() - bus.connect('message::eos', self._message_eos_cb) - # set up level callbacks + # FIXME: publicize bus and reuse it instead of regetting and adding ? + bus = self.pipeline.get_bus() + bus.add_signal_watch() + bus.connect('message::element', self._message_element_cb) - self._level = self._pipeline.get_by_name('level') + self._level = self.pipeline.get_by_name('level') # add a probe so we can track progress # we connect to level because this gives us offset in samples srcpad = self._level.get_static_pad('src') srcpad.add_buffer_probe(self._probe_handler) + # FIXME: move to base class ? self.debug('scheduling setting to play') # since set_state returns non-False, adding it as timeout_add # will repeatedly call it, and block the main loop; so - # gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING) + # gobject.timeout_add(0L, self.pipeline.set_state, self.gst.STATE_PLAYING) # would not work. def play(): - self._pipeline.set_state(gst.STATE_PLAYING) + self.pipeline.set_state(self.gst.STATE_PLAYING) return False self.runner.schedule(0, play) - #self._pipeline.set_state(gst.STATE_PLAYING) + #self.pipeline.set_state(gst.STATE_PLAYING) self.debug('scheduled setting to play') def _probe_handler(self, pad, buffer): @@ -249,7 +238,7 @@ class EncodeTask(task.Task): # don't drop the buffer return True - def _message_eos_cb(self, bus, message): + def bus_eos_cb(self, bus, message): self.debug('eos, scheduling stop') self.runner.schedule(0, self.stop) @@ -270,13 +259,11 @@ class EncodeTask(task.Task): self.log('higher peakdB found, now %r', self._peakdB) self._peakdB = p + # FIXME: move to base class, have stopped handler ? def stop(self): - # here to avoid import gst eating our options - import gst - self.debug('stopping') self.debug('setting state to NULL') - self._pipeline.set_state(gst.STATE_NULL) + self.pipeline.set_state(self.gst.STATE_NULL) self.debug('set state to NULL') # FIXME: maybe this should move lower ? If used by BaseMultiTask, # this starts the next task without showing us the peakdB diff --git a/morituri/common/gstreamer.py b/morituri/common/gstreamer.py index 94d52d9..8d752be 100644 --- a/morituri/common/gstreamer.py +++ b/morituri/common/gstreamer.py @@ -20,8 +20,6 @@ # You should have received a copy of the GNU General Public License # along with morituri. If not, see . -import gst - from morituri.common import common, task class GstException(Exception): @@ -35,16 +33,25 @@ class GstPipelineTask(task.Task): I am a base class for tasks that use a GStreamer pipeline. I handle errors and raise them appropriately. + + @cvar gst: the GStreamer module, so code does not have to import gst + as a module in code everywhere to avoid option stealing. """ + + gst = None + def start(self, runner): + import gst + self.gst = gst + task.Task.start(self, runner) desc = self.getPipelineDesc() self.debug('creating pipeline %r', desc) - self.pipeline = gst.parse_launch(desc) + self.pipeline = self.gst.parse_launch(desc) self._bus = self.pipeline.get_bus() - gst.debug('got bus %r' % self._bus) + self.gst.debug('got bus %r' % self._bus) # a signal watch calls callbacks from an idle loop # self._bus.add_signal_watch() @@ -59,7 +66,7 @@ class GstPipelineTask(task.Task): self.parsed() self.debug('pausing pipeline') - self.pipeline.set_state(gst.STATE_PAUSED) + self.pipeline.set_state(self.gst.STATE_PAUSED) # FIXME: this can block self.pipeline.get_state() self.debug('paused pipeline') @@ -85,13 +92,27 @@ class GstPipelineTask(task.Task): pass def bus_eos_cb(self, bus, message): + """ + Called synchronously (ie from messaging thread) on eos message. + + Override me to handle eos + """ pass def bus_tag_cb(self, bus, message): + """ + Called synchronously (ie from messaging thread) on tag message. + + Override me to handle tags. + """ pass def bus_error_cb(self, bus, message): + """ + Called synchronously (ie from messaging thread) on error message. + """ exc = GstException(*message.parse_error()) self.setAndRaiseException(exc) - gst.debug('error, scheduling stop') + # FIXME: why is this commented ? + # self.gst.debug('error, scheduling stop') #self.runner.schedule(0, self.stop) diff --git a/morituri/test/test_common_encode.py b/morituri/test/test_common_encode.py index 675b6bb..e277bc9 100644 --- a/morituri/test/test_common_encode.py +++ b/morituri/test/test_common_encode.py @@ -9,26 +9,30 @@ gobject.threads_init() import gst -from morituri.test import common +from morituri.test import common as tcommon -from morituri.common import task, encode, log +from morituri.common import task, encode, log, common -class PathTestCase(common.TestCase): +class PathTestCase(tcommon.TestCase): def _testSuffix(self, suffix): self.runner = task.SyncRunner(verbose=False) - fd, path = tempfile.mkstemp( - suffix=suffix) + fd, path = tempfile.mkstemp(suffix=suffix) + cmd = "gst-launch " \ + "audiotestsrc num-buffers=100 samplesperbuffer=1024 ! " \ + "audioconvert ! audio/x-raw-int,width=16,depth=16,channels =2 ! " \ + "wavenc ! " \ + "filesink location=\"%s\" > /dev/null 2>&1" % ( + common.quoteParse(path).encode('utf-8'), ) + os.system(cmd) + self.failUnless(os.path.exists(path)) encodetask = encode.EncodeTask(path, path + '.out', encode.WavProfile()) - e = self.assertRaises(task.TaskException, self.runner.run, - encodetask, verbose=False) - self.failUnless(isinstance(e.exception, gst.QueryError), - "%r is not a gst.QueryError" % e.exception) + self.runner.run(encodetask, verbose=False) os.close(fd) os.unlink(path) os.unlink(path + '.out') -class UnicodePathTestCase(PathTestCase, common.UnicodeTestMixin): +class UnicodePathTestCase(PathTestCase, tcommon.UnicodeTestMixin): def testUnicodePath(self): # this test makes sure we can checksum a unicode path self._testSuffix(u'.morituri.test_encode.B\xeate Noire') @@ -40,7 +44,7 @@ class NormalPathTestCase(PathTestCase): def testDoubleQuote(self): self._testSuffix(u'.morituri.test_encode.12" edit') -class TagReadTestCase(common.TestCase): +class TagReadTestCase(tcommon.TestCase): def testRead(self): path = os.path.join(os.path.dirname(__file__), u'track.flac') self.runner = task.SyncRunner(verbose=False) @@ -50,7 +54,7 @@ class TagReadTestCase(common.TestCase): self.assertEquals(t.taglist['audio-codec'], 'FLAC') self.assertEquals(t.taglist['description'], 'audiotest wave') -class TagWriteTestCase(common.TestCase): +class TagWriteTestCase(tcommon.TestCase): def testWrite(self): fd, inpath = tempfile.mkstemp(suffix=u'.morituri.tagwrite.flac') @@ -81,7 +85,7 @@ class TagWriteTestCase(common.TestCase): os.unlink(inpath) os.unlink(outpath) -class SafeRetagTestCase(common.TestCase): +class SafeRetagTestCase(tcommon.TestCase): def setUp(self): self._fd, self._path = tempfile.mkstemp(suffix=u'.morituri.retag.flac')