From ba10b573a218b01c3e3cd704fbf1af75c9fc7037 Mon Sep 17 00:00:00 2001 From: chrysn Date: Mon, 7 Nov 2016 14:57:17 +0100 Subject: [PATCH 1/3] Use soxi instead of gstreamer to determine a track's length Contributes-To: https://github.com/JoeLametta/whipper/issues/29 --- morituri/image/image.py | 55 +++------------------------------------- morituri/program/soxi.py | 47 ++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 52 deletions(-) create mode 100644 morituri/program/soxi.py diff --git a/morituri/image/image.py b/morituri/image/image.py index 97a3dc0..dc3cafe 100644 --- a/morituri/image/image.py +++ b/morituri/image/image.py @@ -29,7 +29,9 @@ import os from morituri.common import log, common from morituri.image import cue, table -from morituri.extern.task import task, gstreamer +from morituri.extern.task import task + +from morituri.program.soxi import AudioLengthTask class Image(object, log.Loggable): @@ -146,57 +148,6 @@ class AccurateRipChecksumTask(log.Loggable, task.MultiSeparateTask): task.MultiSeparateTask.stop(self) -class AudioLengthTask(log.Loggable, gstreamer.GstPipelineTask): - """ - I calculate the length of a track in audio samples. - - @ivar length: length of the decoded audio file, in audio samples. - """ - logCategory = 'AudioLengthTask' - description = 'Getting length of audio track' - length = None - - playing = False - - def __init__(self, path): - """ - @type path: unicode - """ - assert type(path) is unicode, "%r is not unicode" % path - - self._path = path - self.logName = os.path.basename(path).encode('utf-8') - - def getPipelineDesc(self): - return ''' - filesrc location="%s" ! - decodebin ! audio/x-raw-int ! - fakesink name=sink''' % \ - gstreamer.quoteParse(self._path).encode('utf-8') - - def paused(self): - self.debug('query duration') - sink = self.pipeline.get_by_name('sink') - assert sink, 'Error constructing pipeline' - - try: - length, qformat = sink.query_duration(self.gst.FORMAT_DEFAULT) - except self.gst.QueryError, e: - self.info('failed to query duration of %r' % self._path) - self.setException(e) - raise - - # wavparse 0.10.14 returns in bytes - if qformat == self.gst.FORMAT_BYTES: - self.debug('query returned in BYTES format') - length /= 4 - self.debug('total length of %r in samples: %d', self._path, length) - self.length = length - - self.pipeline.set_state(self.gst.STATE_NULL) - self.stop() - - class ImageVerifyTask(log.Loggable, task.MultiSeparateTask): """ I verify a disk image and get the necessary track lengths. diff --git a/morituri/program/soxi.py b/morituri/program/soxi.py new file mode 100644 index 0000000..42fbc8e --- /dev/null +++ b/morituri/program/soxi.py @@ -0,0 +1,47 @@ +import os + +from morituri.common import log, common +from morituri.common import task as ctask + +SOXI = 'soxi' + +class AudioLengthTask(ctask.PopenTask, log.Loggable): + """ + I calculate the length of a track in audio samples. + + @ivar length: length of the decoded audio file, in audio samples. + """ + logCategory = 'AudioLengthTask' + description = 'Getting length of audio track' + length = None + + def __init__(self, path): + """ + @type path: unicode + """ + assert type(path) is unicode, "%r is not unicode" % path + + self._path = path + self.logName = os.path.basename(path).encode('utf-8') + + self.command = [SOXI, '-s', self._path] + + self._error = [] + self._output = [] + + def commandMissing(self): + raise common.MissingDependencyException('sox') + + def readbytesout(self, bytes): + self._output.append(bytes) + + def readbyteserr(self, bytes): + self._error.append(bytes) + + def failed(self): + self.setException(Exception("soxi failed: %s"%"".join(self._error))) + + def done(self): + if self._error: + self.warning("soxi reported on stderr: %s", "".join(self._error)) + self.length = int("".join(self._output)) From 89d6c8fef8899a592c18be3e8beaef2f0d2e497a Mon Sep 17 00:00:00 2001 From: chrysn Date: Mon, 7 Nov 2016 18:23:12 +0100 Subject: [PATCH 2/3] AudioLengthPath: move unit test in parallel to code --- morituri/test/test_image_image.py | 46 -------------------------- morituri/test/test_program_soxi.py | 53 ++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 46 deletions(-) create mode 100644 morituri/test/test_program_soxi.py diff --git a/morituri/test/test_image_image.py b/morituri/test/test_image_image.py index f288d2d..b9a1d02 100644 --- a/morituri/test/test_image_image.py +++ b/morituri/test/test_image_image.py @@ -85,49 +85,3 @@ class TrackSeparateTestCase(tcommon.TestCase): def testAccurateRip(self): self.assertEquals(self.image.table.getAccurateRipIds(), ( "00000064", "00000191")) - - -class AudioLengthTestCase(tcommon.TestCase): - - def testLength(self): - path = os.path.join(os.path.dirname(__file__), u'track.flac') - t = image.AudioLengthTask(path) - runner = task.SyncRunner() - runner.run(t, verbose=False) - self.assertEquals(t.length, 10 * common.SAMPLES_PER_FRAME) - - -class AudioLengthPathTestCase(tcommon.TestCase): - - def _testSuffix(self, suffix): - self.runner = task.SyncRunner(verbose=False) - fd, path = tempfile.mkstemp(suffix=suffix) - t = image.AudioLengthTask(path) - e = self.assertRaises(task.TaskException, self.runner.run, - t, verbose=False) - self.failUnless(isinstance(e.exception, gstreamer.GstException), - "%r is not a gstreamer.GstException" % e.exceptionMessage) - self.assertEquals(e.exception.gerror.domain, gst.STREAM_ERROR) - # our empty file triggers TYPE_NOT_FOUND - self.assertEquals(e.exception.gerror.code, - gst.STREAM_ERROR_TYPE_NOT_FOUND) - os.unlink(path) - - -class NormalAudioLengthPathTestCase(AudioLengthPathTestCase): - - def testSingleQuote(self): - self._testSuffix(u"morituri.test.Guns 'N Roses") - - def testDoubleQuote(self): - # This test makes sure we can checksum files with double quote in - # their name - self._testSuffix(u'morituri.test.12" edit') - - -class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, - tcommon.UnicodeTestMixin): - - def testUnicodePath(self): - # this test makes sure we can checksum a unicode path - self._testSuffix(u'morituri.test.B\xeate Noire.empty') diff --git a/morituri/test/test_program_soxi.py b/morituri/test/test_program_soxi.py new file mode 100644 index 0000000..aa6b431 --- /dev/null +++ b/morituri/test/test_program_soxi.py @@ -0,0 +1,53 @@ +# -*- Mode: Python; test-case-name: morituri.test.test_program_sox -*- + +import os +import tempfile + +from morituri.common import common +from morituri.extern.task import task +from morituri.program.soxi import AudioLengthTask +from morituri.test import common as tcommon + +class AudioLengthTestCase(tcommon.TestCase): + + def testLength(self): + path = os.path.join(os.path.dirname(__file__), u'track.flac') + t = AudioLengthTask(path) + runner = task.SyncRunner() + runner.run(t, verbose=False) + self.assertEquals(t.length, 10 * common.SAMPLES_PER_FRAME) + + +class AudioLengthPathTestCase(tcommon.TestCase): + + def _testSuffix(self, suffix): + self.runner = task.SyncRunner(verbose=False) + fd, path = tempfile.mkstemp(suffix=suffix) + t = AudioLengthTask(path) + e = self.assertRaises(task.TaskException, self.runner.run, + t, verbose=False) + self.failUnless(isinstance(e.exception, gstreamer.GstException), + "%r is not a gstreamer.GstException" % e.exceptionMessage) + self.assertEquals(e.exception.gerror.domain, gst.STREAM_ERROR) + # our empty file triggers TYPE_NOT_FOUND + self.assertEquals(e.exception.gerror.code, + gst.STREAM_ERROR_TYPE_NOT_FOUND) + os.unlink(path) + +class NormalAudioLengthPathTestCase(AudioLengthPathTestCase): + + def testSingleQuote(self): + self._testSuffix(u"morituri.test.Guns 'N Roses") + + def testDoubleQuote(self): + # This test makes sure we can checksum files with double quote in + # their name + self._testSuffix(u'morituri.test.12" edit') + + +class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, + tcommon.UnicodeTestMixin): + + def testUnicodePath(self): + # this test makes sure we can checksum a unicode path + self._testSuffix(u'morituri.test.B\xeate Noire.empty') From bacda814081725fef4589a5e5227fbbc96ebd6e0 Mon Sep 17 00:00:00 2001 From: chrysn Date: Tue, 29 Nov 2016 12:51:31 +0100 Subject: [PATCH 3/3] AudioLengthTest: adapt to new soxi backend * Use extensions soxi understands (ie. ".flac") * Actually test for result correctness on files with odd characters in their names by copying the test track * Relax the requirements on the "track absent" task to only raise some TaskError (the previously tested behavior was backend dependent, and the application did not actually depend on that behavior) --- morituri/test/test_program_soxi.py | 40 +++++++++++++++++++----------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/morituri/test/test_program_soxi.py b/morituri/test/test_program_soxi.py index aa6b431..ae1e7da 100644 --- a/morituri/test/test_program_soxi.py +++ b/morituri/test/test_program_soxi.py @@ -8,41 +8,41 @@ from morituri.extern.task import task from morituri.program.soxi import AudioLengthTask from morituri.test import common as tcommon +base_track_file = os.path.join(os.path.dirname(__file__), u'track.flac') +base_track_length = 10 * common.SAMPLES_PER_FRAME + class AudioLengthTestCase(tcommon.TestCase): def testLength(self): - path = os.path.join(os.path.dirname(__file__), u'track.flac') + path = base_track_file t = AudioLengthTask(path) runner = task.SyncRunner() runner.run(t, verbose=False) - self.assertEquals(t.length, 10 * common.SAMPLES_PER_FRAME) + self.assertEquals(t.length, base_track_length) class AudioLengthPathTestCase(tcommon.TestCase): def _testSuffix(self, suffix): - self.runner = task.SyncRunner(verbose=False) fd, path = tempfile.mkstemp(suffix=suffix) + with os.fdopen(fd, "wb") as temptrack: + temptrack.write(open(base_track_file, "rb").read()) + t = AudioLengthTask(path) - e = self.assertRaises(task.TaskException, self.runner.run, - t, verbose=False) - self.failUnless(isinstance(e.exception, gstreamer.GstException), - "%r is not a gstreamer.GstException" % e.exceptionMessage) - self.assertEquals(e.exception.gerror.domain, gst.STREAM_ERROR) - # our empty file triggers TYPE_NOT_FOUND - self.assertEquals(e.exception.gerror.code, - gst.STREAM_ERROR_TYPE_NOT_FOUND) + runner = task.SyncRunner() + runner.run(t, verbose=False) + self.assertEquals(t.length, base_track_length) os.unlink(path) class NormalAudioLengthPathTestCase(AudioLengthPathTestCase): def testSingleQuote(self): - self._testSuffix(u"morituri.test.Guns 'N Roses") + self._testSuffix(u"morituri.test.Guns 'N Roses.flac") def testDoubleQuote(self): # This test makes sure we can checksum files with double quote in # their name - self._testSuffix(u'morituri.test.12" edit') + self._testSuffix(u'morituri.test.12" edit.flac') class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, @@ -50,4 +50,16 @@ class UnicodeAudioLengthPathTestCase(AudioLengthPathTestCase, def testUnicodePath(self): # this test makes sure we can checksum a unicode path - self._testSuffix(u'morituri.test.B\xeate Noire.empty') + self._testSuffix(u'morituri.test.B\xeate Noire.empty.flac') + +class AbsentFileAudioLengthPathTestCase(AudioLengthPathTestCase): + def testAbsentFile(self): + tempdir = tempfile.mkdtemp() + path = os.path.join(tempdir, u"nonexistent.flac") + + t = AudioLengthTask(path) + runner = task.SyncRunner() + self.assertRaises(task.TaskException, runner.run, + t, verbose=False) + + os.rmdir(tempdir)