diff --git a/morituri/image/image.py b/morituri/image/image.py index 97a3dc0..dc3cafe 100644 --- a/morituri/image/image.py +++ b/morituri/image/image.py @@ -29,7 +29,9 @@ import os from morituri.common import log, common from morituri.image import cue, table -from morituri.extern.task import task, gstreamer +from morituri.extern.task import task + +from morituri.program.soxi import AudioLengthTask class Image(object, log.Loggable): @@ -146,57 +148,6 @@ class AccurateRipChecksumTask(log.Loggable, task.MultiSeparateTask): task.MultiSeparateTask.stop(self) -class AudioLengthTask(log.Loggable, gstreamer.GstPipelineTask): - """ - I calculate the length of a track in audio samples. - - @ivar length: length of the decoded audio file, in audio samples. - """ - logCategory = 'AudioLengthTask' - description = 'Getting length of audio track' - length = None - - playing = False - - def __init__(self, path): - """ - @type path: unicode - """ - assert type(path) is unicode, "%r is not unicode" % path - - self._path = path - self.logName = os.path.basename(path).encode('utf-8') - - def getPipelineDesc(self): - return ''' - filesrc location="%s" ! - decodebin ! audio/x-raw-int ! - fakesink name=sink''' % \ - gstreamer.quoteParse(self._path).encode('utf-8') - - def paused(self): - self.debug('query duration') - sink = self.pipeline.get_by_name('sink') - assert sink, 'Error constructing pipeline' - - try: - length, qformat = sink.query_duration(self.gst.FORMAT_DEFAULT) - except self.gst.QueryError, e: - self.info('failed to query duration of %r' % self._path) - self.setException(e) - raise - - # wavparse 0.10.14 returns in bytes - if qformat == self.gst.FORMAT_BYTES: - self.debug('query returned in BYTES format') - length /= 4 - self.debug('total length of %r in samples: %d', self._path, length) - self.length = length - - self.pipeline.set_state(self.gst.STATE_NULL) - self.stop() - - class ImageVerifyTask(log.Loggable, task.MultiSeparateTask): """ I verify a disk image and get the necessary track lengths. diff --git a/morituri/program/soxi.py b/morituri/program/soxi.py new file mode 100644 index 0000000..42fbc8e --- /dev/null +++ b/morituri/program/soxi.py @@ -0,0 +1,47 @@ +import os + +from morituri.common import log, common +from morituri.common import task as ctask + +SOXI = 'soxi' + +class AudioLengthTask(ctask.PopenTask, log.Loggable): + """ + I calculate the length of a track in audio samples. + + @ivar length: length of the decoded audio file, in audio samples. + """ + logCategory = 'AudioLengthTask' + description = 'Getting length of audio track' + length = None + + def __init__(self, path): + """ + @type path: unicode + """ + assert type(path) is unicode, "%r is not unicode" % path + + self._path = path + self.logName = os.path.basename(path).encode('utf-8') + + self.command = [SOXI, '-s', self._path] + + self._error = [] + self._output = [] + + def commandMissing(self): + raise common.MissingDependencyException('sox') + + def readbytesout(self, bytes): + self._output.append(bytes) + + def readbyteserr(self, bytes): + self._error.append(bytes) + + def failed(self): + self.setException(Exception("soxi failed: %s"%"".join(self._error))) + + def done(self): + if self._error: + self.warning("soxi reported on stderr: %s", "".join(self._error)) + self.length = int("".join(self._output)) diff --git a/morituri/test/test_image_image.py b/morituri/test/test_image_image.py index f288d2d..b9a1d02 100644 --- a/morituri/test/test_image_image.py +++ b/morituri/test/test_image_image.py @@ -85,49 +85,3 @@ class TrackSeparateTestCase(tcommon.TestCase): def testAccurateRip(self): self.assertEquals(self.image.table.getAccurateRipIds(), ( "00000064", "00000191")) - - -class AudioLengthTestCase(tcommon.TestCase): - - def testLength(self): - path = os.path.join(os.path.dirname(__file__), u'track.flac') - t = image.AudioLengthTask(path) - runner = task.SyncRunner() - runner.run(t, verbose=False) - self.assertEquals(t.length, 10 * common.SAMPLES_PER_FRAME) - - -class AudioLengthPathTestCase(tcommon.TestCase): - - def _testSuffix(self, suffix): - self.runner = task.SyncRunner(verbose=False) - fd, path = tempfile.mkstemp(suffix=suffix) - t = image.AudioLengthTask(path) - e = self.assertRaises(task.TaskException, self.runner.run, - t, verbose=False) - self.failUnless(isinstance(e.exception, gstreamer.GstException), - "%r is not a gstreamer.GstException" % e.exceptionMessage) - self.assertEquals(e.exception.gerror.domain, gst.STREAM_ERROR) - # our empty file triggers TYPE_NOT_FOUND - self.assertEquals(e.exception.gerror.code, - gst.STREAM_ERROR_TYPE_NOT_FOUND) - os.unlink(path) - - -class NormalAudioLengthPathTestCase(AudioLengthPathTestCase): - - def testSingleQuote(self): - self._testSuffix(u"morituri.test.Guns 'N Roses") - - def testDoubleQuote(self): - # This test makes sure we can checksum files with double quote in - # their name - self._testSuffix(u'morituri.test.12" edit') - - -class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, - tcommon.UnicodeTestMixin): - - def testUnicodePath(self): - # this test makes sure we can checksum a unicode path - self._testSuffix(u'morituri.test.B\xeate Noire.empty') diff --git a/morituri/test/test_program_soxi.py b/morituri/test/test_program_soxi.py new file mode 100644 index 0000000..ae1e7da --- /dev/null +++ b/morituri/test/test_program_soxi.py @@ -0,0 +1,65 @@ +# -*- Mode: Python; test-case-name: morituri.test.test_program_sox -*- + +import os +import tempfile + +from morituri.common import common +from morituri.extern.task import task +from morituri.program.soxi import AudioLengthTask +from morituri.test import common as tcommon + +base_track_file = os.path.join(os.path.dirname(__file__), u'track.flac') +base_track_length = 10 * common.SAMPLES_PER_FRAME + +class AudioLengthTestCase(tcommon.TestCase): + + def testLength(self): + path = base_track_file + t = AudioLengthTask(path) + runner = task.SyncRunner() + runner.run(t, verbose=False) + self.assertEquals(t.length, base_track_length) + + +class AudioLengthPathTestCase(tcommon.TestCase): + + def _testSuffix(self, suffix): + fd, path = tempfile.mkstemp(suffix=suffix) + with os.fdopen(fd, "wb") as temptrack: + temptrack.write(open(base_track_file, "rb").read()) + + t = AudioLengthTask(path) + runner = task.SyncRunner() + runner.run(t, verbose=False) + self.assertEquals(t.length, base_track_length) + os.unlink(path) + +class NormalAudioLengthPathTestCase(AudioLengthPathTestCase): + + def testSingleQuote(self): + self._testSuffix(u"morituri.test.Guns 'N Roses.flac") + + def testDoubleQuote(self): + # This test makes sure we can checksum files with double quote in + # their name + self._testSuffix(u'morituri.test.12" edit.flac') + + +class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, + tcommon.UnicodeTestMixin): + + def testUnicodePath(self): + # this test makes sure we can checksum a unicode path + self._testSuffix(u'morituri.test.B\xeate Noire.empty.flac') + +class AbsentFileAudioLengthPathTestCase(AudioLengthPathTestCase): + def testAbsentFile(self): + tempdir = tempfile.mkdtemp() + path = os.path.join(tempdir, u"nonexistent.flac") + + t = AudioLengthTask(path) + runner = task.SyncRunner() + self.assertRaises(task.TaskException, runner.run, + t, verbose=False) + + os.rmdir(tempdir)