* morituri/common/encode.py:

Add tasks to read, write, and safely retag flac files.
	* morituri/test/test_common_encode.py:
	  Add tests for this.
This commit is contained in:
Thomas Vander Stichele
2010-04-13 21:55:35 +00:00
parent 7515cf9e73
commit d9ca12d7cc
3 changed files with 223 additions and 2 deletions

View File

@@ -1,3 +1,10 @@
2010-04-13 Thomas Vander Stichele <thomas at apestaart dot org>
* morituri/common/encode.py:
Add tasks to read, write, and safely retag flac files.
* morituri/test/test_common_encode.py:
Add tests for this.
2010-04-13 Thomas Vander Stichele <thomas at apestaart dot org>
* morituri/common/checksum.py:

View File

@@ -21,8 +21,10 @@
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import math
import os
import tempfile
from morituri.common import common, task
from morituri.common import common, task, checksum
from morituri.common import log
log.init()
@@ -283,7 +285,7 @@ class TagReadTask(task.Task):
logCategory = 'TagReadTask'
description = 'Reading Tags'
description = 'Reading tags'
taglist = None
@@ -351,3 +353,152 @@ class TagReadTask(task.Task):
self.debug('set state to NULL')
task.Task.stop(self)
class TagWriteTask(task.Task):
"""
I am a task that retags an encoded file.
"""
logCategory = 'TagWriteTask'
description = 'Writing tags'
def __init__(self, inpath, outpath, taglist=None):
"""
"""
assert type(inpath) is unicode, "inpath %r is not unicode" % inpath
assert type(outpath) is unicode, "outpath %r is not unicode" % outpath
self._inpath = inpath
self._outpath = outpath
self._taglist = taglist
def start(self, runner):
task.Task.start(self, runner)
# here to avoid import gst eating our options
import gst
self._pipeline = gst.parse_launch('''
filesrc location="%s" !
flactag name=tagger !
filesink location="%s"''' % (
common.quoteParse(self._inpath).encode('utf-8'),
common.quoteParse(self._outpath).encode('utf-8')))
# set tags
tagger = self._pipeline.get_by_name('tagger')
if self._taglist:
tagger.merge_tags(self._taglist, gst.TAG_MERGE_APPEND)
self.debug('pausing pipeline')
self._pipeline.set_state(gst.STATE_PAUSED)
self._pipeline.get_state()
self.debug('paused pipeline')
# add eos handling
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message::eos', self._message_eos_cb)
self.debug('scheduling setting to play')
# since set_state returns non-False, adding it as timeout_add
# will repeatedly call it, and block the main loop; so
# gobject.timeout_add(0L, self._pipeline.set_state, gst.STATE_PLAYING)
# would not work.
def play():
self._pipeline.set_state(gst.STATE_PLAYING)
return False
self.runner.schedule(0, play)
#self._pipeline.set_state(gst.STATE_PLAYING)
self.debug('scheduled setting to play')
def _message_eos_cb(self, bus, message):
self.debug('eos, scheduling stop')
self.runner.schedule(0, self.stop)
def stop(self):
# here to avoid import gst eating our options
import gst
self.debug('stopping')
self.debug('setting state to NULL')
self._pipeline.set_state(gst.STATE_NULL)
self.debug('set state to NULL')
task.Task.stop(self)
class SafeRetagTask(task.MultiSeparateTask):
"""
I am a task that retags an encoded file safely in place.
First of all, if the new tags are the same as the old ones, it doesn't
do anything.
If the tags are not the same, then the file gets retagged, but only
if the decodes of the original and retagged file checksum the same.
@ivar changed: True if the tags have changed (and hence an output file is
generated)
"""
logCategory = 'SafeRetagTask'
description = 'Retagging'
changed = False
def __init__(self, path, taglist=None):
"""
"""
assert type(path) is unicode, "path %r is not unicode" % path
task.MultiSeparateTask.__init__(self)
self._path = path
self._taglist = taglist.copy()
self.tasks = [TagReadTask(path), ]
def stopped(self, taskk):
if not taskk.exception:
import gst
# Check if the tags are different or not
if taskk == self.tasks[0]:
taglist = taskk.taglist.copy()
if common.tagListEquals(taglist, self._taglist):
self.debug('tags are already fine')
else:
# need to retag
self.debug('tags need to be rewritten')
self.debug('Current tags: %r, new tags: %r',
common.tagListToDict(taglist),
common.tagListToDict(self._taglist))
assert common.tagListToDict(taglist) != common.tagListToDict(self._taglist)
self.tasks.append(checksum.CRC32Task(self._path))
self._fd, self._tmppath = tempfile.mkstemp(
dir=os.path.dirname(self._path), suffix=u'.morituri')
self.tasks.append(TagWriteTask(self._path,
self._tmppath, self._taglist))
self.tasks.append(checksum.CRC32Task(self._tmppath))
self.tasks.append(TagReadTask(self._tmppath))
elif len(self.tasks) > 1 and taskk == self.tasks[4]:
if common.tagListEquals(self.tasks[4].taglist, self._taglist):
self.debug('tags written successfully')
c1 = self.tasks[1].checksum
c2 = self.tasks[3].checksum
self.debug('comparing checksums %08x and %08x' % (c1, c2))
if False: #c1 == c2:
# data is fine, so we can now move
self.debug('moving temporary file to %r' % self._path)
os.rename(self._tmppath, self._path)
else:
# FIXME: don't raise TypeError
e = TypeError("Checksums failed")
self.setAndRaiseException(e)
else:
os.unlink(self._tmppath)
e = TypeError("Tags not written")
self.setAndRaiseException(e)
task.MultiSeparateTask.stopped(self, taskk)

View File

@@ -49,3 +49,66 @@ class TagReadTestCase(common.TestCase):
self.failUnless(t.taglist)
self.assertEquals(t.taglist['audio-codec'], 'FLAC')
self.assertEquals(t.taglist['description'], 'audiotest wave')
class TagWriteTestCase(common.TestCase):
def testWrite(self):
fd, inpath = tempfile.mkstemp(suffix=u'.morituri.tagwrite.flac')
os.system('gst-launch '
'audiotestsrc num-buffers=10 samplesperbuffer=588 ! '
'audioconvert ! '
'audio/x-raw-int,channels=2,width=16,height=16,rate=44100 ! '
'flacenc ! filesink location=%s > /dev/null 2>&1' % inpath)
os.close(fd)
fd, outpath = tempfile.mkstemp(suffix=u'.morituri.tagwrite.flac')
self.runner = task.SyncRunner(verbose=False)
taglist = gst.TagList()
taglist[gst.TAG_ARTIST] = 'Artist'
taglist[gst.TAG_TITLE] = 'Title'
t = encode.TagWriteTask(inpath, outpath, taglist)
self.runner.run(t)
t = encode.TagReadTask(outpath)
self.runner.run(t)
self.failUnless(t.taglist)
self.assertEquals(t.taglist['audio-codec'], 'FLAC')
self.assertEquals(t.taglist['description'], 'audiotest wave')
self.assertEquals(t.taglist[gst.TAG_ARTIST], 'Artist')
self.assertEquals(t.taglist[gst.TAG_TITLE], 'Title')
os.unlink(inpath)
os.unlink(outpath)
class SafeRetagTestCase(common.TestCase):
def setUp(self):
self._fd, self._path = tempfile.mkstemp(suffix=u'.morituri.retag.flac')
os.system('gst-launch '
'audiotestsrc num-buffers=10 samplesperbuffer=588 ! '
'audioconvert ! '
'audio/x-raw-int,channels=2,width=16,height=16,rate=44100 ! '
'flacenc ! filesink location=%s > /dev/null 2>&1' % self._path)
os.close(self._fd)
self.runner = task.SyncRunner(verbose=False)
def tearDown(self):
os.unlink(self._path)
def testNoChange(self):
taglist = gst.TagList()
taglist[gst.TAG_DESCRIPTION] = 'audiotest wave'
taglist[gst.TAG_AUDIO_CODEC] = 'FLAC'
t = encode.SafeRetagTask(self._path, taglist)
self.runner.run(t)
def testChange(self):
taglist = gst.TagList()
taglist[gst.TAG_DESCRIPTION] = 'audiotest retagged'
taglist[gst.TAG_AUDIO_CODEC] = 'FLAC'
taglist[gst.TAG_ARTIST] = 'Artist'
t = encode.SafeRetagTask(self._path, taglist)
self.runner.run(t)