* morituri/test/test_common_encode.py:

Generate an actual file by spawning gst-launch; otherwise
	  with proper error handling we get an error from wavparse that
	  there is not enough data to typefind.
	* morituri/common/gstreamer.py:
	  Move the gst import to start() and set it as a class attribute.
	  Document methods.
	* morituri/common/encode.py:
	  Convert EncodeTask to a GstPipelineTask.
This commit is contained in:
Thomas Vander Stichele
2011-05-23 14:57:16 +00:00
parent 514bf4bc10
commit 1ce46272a9
4 changed files with 80 additions and 56 deletions

View File

@@ -25,7 +25,7 @@ import os
import shutil
import tempfile
from morituri.common import common, task, log
from morituri.common import common, task, log, gstreamer
class Profile(object):
name = None
@@ -122,7 +122,7 @@ LOSSY_PROFILES = {
ALL_PROFILES = PROFILES.copy()
ALL_PROFILES.update(LOSSY_PROFILES)
class EncodeTask(task.Task):
class EncodeTask(gstreamer.GstPipelineTask):
"""
I am a task that encodes a .wav file.
I set tags too.
@@ -158,13 +158,8 @@ class EncodeTask(task.Task):
self.description = "Encoding %s" % what
self._profile.test()
def start(self, runner):
task.Task.start(self, runner)
# here to avoid import gst eating our options
import gst
desc = '''
def getPipelineDesc(self):
return '''
filesrc location="%s" !
decodebin name=decoder !
audio/x-raw-int,width=16,depth=16,channels=2 !
@@ -175,68 +170,62 @@ class EncodeTask(task.Task):
self._profile.pipeline,
common.quoteParse(self._outpath).encode('utf-8'))
self.debug('creating pipeline: %r', desc)
self._pipeline = gst.parse_launch(desc)
tagger = self._pipeline.get_by_name('tagger')
def parsed(self):
tagger = self.pipeline.get_by_name('tagger')
# set tags
if tagger and self._taglist:
# FIXME: under which conditions do we not have merge_tags ?
# See for example comment saying wavenc did not have it.
try:
tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
tagger.merge_tags(self._taglist, self.gst.TAG_MERGE_APPEND)
except AttributeError, e:
self.warning('Could not merge tags: %r',
log.getExceptionMessage(e))
self.debug('pausing pipeline')
self._pipeline.set_state(gst.STATE_PAUSED)
self._pipeline.get_state()
self.debug('paused pipeline')
def paused(self):
# get length
identity = self._pipeline.get_by_name('identity')
identity = self.pipeline.get_by_name('identity')
self.debug('query duration')
try:
length, qformat = identity.query_duration(gst.FORMAT_DEFAULT)
except gst.QueryError, e:
length, qformat = identity.query_duration(self.gst.FORMAT_DEFAULT)
except self.gst.QueryError, e:
self.setException(e)
self.stop()
return
# wavparse 0.10.14 returns in bytes
if qformat == gst.FORMAT_BYTES:
if qformat == self.gst.FORMAT_BYTES:
self.debug('query returned in BYTES format')
length /= 4
self.debug('total length: %r', length)
self._length = length
# add eos handling
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message::eos', self._message_eos_cb)
# set up level callbacks
# FIXME: publicize bus and reuse it instead of regetting and adding ?
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message::element', self._message_element_cb)
self._level = self._pipeline.get_by_name('level')
self._level = self.pipeline.get_by_name('level')
# add a probe so we can track progress
# we connect to level because this gives us offset in samples
srcpad = self._level.get_static_pad('src')
srcpad.add_buffer_probe(self._probe_handler)
# FIXME: move to base class ?
self.debug('scheduling setting to play')
# since set_state returns non-False, adding it as timeout_add
# will repeatedly call it, and block the main loop; so
# gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
# gobject.timeout_add(0L, self.pipeline.set_state, self.gst.STATE_PLAYING)
# would not work.
def play():
self._pipeline.set_state(gst.STATE_PLAYING)
self.pipeline.set_state(self.gst.STATE_PLAYING)
return False
self.runner.schedule(0, play)
#self._pipeline.set_state(gst.STATE_PLAYING)
#self.pipeline.set_state(gst.STATE_PLAYING)
self.debug('scheduled setting to play')
def _probe_handler(self, pad, buffer):
@@ -249,7 +238,7 @@ class EncodeTask(task.Task):
# don't drop the buffer
return True
def _message_eos_cb(self, bus, message):
def bus_eos_cb(self, bus, message):
self.debug('eos, scheduling stop')
self.runner.schedule(0, self.stop)
@@ -270,13 +259,11 @@ class EncodeTask(task.Task):
self.log('higher peakdB found, now %r', self._peakdB)
self._peakdB = p
# FIXME: move to base class, have stopped handler ?
def stop(self):
# here to avoid import gst eating our options
import gst
self.debug('stopping')
self.debug('setting state to NULL')
self._pipeline.set_state(gst.STATE_NULL)
self.pipeline.set_state(self.gst.STATE_NULL)
self.debug('set state to NULL')
# FIXME: maybe this should move lower ? If used by BaseMultiTask,
# this starts the next task without showing us the peakdB