* morituri/image/cue.py:

* morituri/image/image.py:
	* morituri/image/table.py:
	* morituri/program/cdparanoia.py:
	* morituri/test/test_image_cue.py:
	  Move to using a shared IndexTable for everything.
	  Sadly mixed with a MultiTask rename.
This commit is contained in:
Thomas Vander Stichele
2009-05-04 08:40:42 +00:00
parent d22bff1079
commit 22e81b4665
6 changed files with 302 additions and 116 deletions

View File

@@ -1,3 +1,13 @@
2009-05-04 Thomas Vander Stichele <thomas at apestaart dot org>
* morituri/image/cue.py:
* morituri/image/image.py:
* morituri/image/table.py:
* morituri/program/cdparanoia.py:
* morituri/test/test_image_cue.py:
Move to using a shared IndexTable for everything.
Sadly mixed with a MultiTask rename.
2009-05-04 Thomas Vander Stichele <thomas at apestaart dot org>
* morituri/common/checksum.py:

View File

@@ -29,7 +29,8 @@ See http://digitalx.org/cuesheetsyntax.php
import os
import re
from morituri.common import common
from morituri.common import common, log
from morituri.image import table
_REM_RE = re.compile("^REM\s(\w+)\s(.*)$")
_PERFORMER_RE = re.compile("^PERFORMER\s(.*)$")
@@ -56,18 +57,21 @@ _INDEX_RE = re.compile(r"""
""", re.VERBOSE)
class Cue:
class Cue(object, log.Loggable):
def __init__(self, path):
self._path = path
self._rems = {}
self._messages = []
self.tracks = []
self.leadout = None
def parse(self):
state = 'HEADER'
currentFile = None
currentTrack = None
counter = 0
self.info('Parsing .cue file %s', self._path)
handle = open(self._path, 'r')
for number, line in enumerate(handle.readlines()):
@@ -86,6 +90,7 @@ class Cue:
# look for FILE lines
m = _FILE_RE.search(line)
if m:
counter += 1
filePath = m.group('name')
fileFormat = m.group('format')
currentFile = File(filePath, fileFormat)
@@ -102,7 +107,8 @@ class Cue:
trackNumber = int(m.group('track'))
trackMode = m.group('mode')
currentTrack = Track(trackNumber)
self.debug('found track %d', trackNumber)
currentTrack = table.ITTrack(trackNumber)
self.tracks.append(currentTrack)
continue
@@ -118,37 +124,15 @@ class Cue:
minutes = int(m.expand('\\2'))
seconds = int(m.expand('\\3'))
frames = int(m.expand('\\4'))
self.debug('found index %d', indexNumber)
frameOffset = frames + seconds * 75 + minutes * 75 * 60
currentTrack.index(indexNumber, frameOffset, currentFile)
# print 'index %d, offset %d of track %r' % (indexNumber, frameOffset, currentTrack)
# FIXME: what do we do about File's FORMAT ?
currentTrack.index(indexNumber,
path=currentFile.path, relative=frameOffset,
counter=counter)
continue
def dump(self):
"""
Dump our internal representation to a .cue file content.
"""
lines = []
currentFile = None
for i, track in enumerate(self.tracks):
indexes = track._indexes.keys()
indexes.sort()
index, file = track._indexes[indexes[0]]
if file != currentFile:
lines.append('FILE "%s" WAVE' % file.path)
currentFile = file
lines.append(" TRACK %02d %s" % (i + 1, 'AUDIO'))
for index in indexes:
(offset, file) = track._indexes[index]
if file != currentFile:
lines.append('FILE "%s" WAVE' % file.path)
lines.append(
" INDEX %02d %s" % (index, common.framesToMSF(offset)))
lines.append("")
return "\n".join(lines)
def message(self, number, message):
"""
Add a message about a given line in the cue file.
@@ -166,11 +150,13 @@ class Cue:
# last track, so no length known
return -1
thisIndex = track._indexes[1] # FIXME: could be more
nextIndex = self.tracks[i + 1]._indexes[1] # FIXME: could be 0
thisIndex = track.indexes[1] # FIXME: could be more
nextIndex = self.tracks[i + 1].indexes[1] # FIXME: could be 0
if thisIndex[1] == nextIndex[1]: # same file
return nextIndex[0] - thisIndex[0]
c = thisIndex.counter
if c is not None and c == nextIndex.counter:
# they belong to the same source, so their relative delta is length
return nextIndex.relative - thisIndex.relative
# FIXME: more logic
return -1
@@ -217,49 +203,3 @@ class File:
def __repr__(self):
return '<File "%s" of format %s>' % (self.path, self.format)
# FIXME: add type ? separate AUDIO from others
class Track:
"""
I represent a track in a cue file.
I have index points.
Each index point is linked to an audio file.
@ivar number: track number
@type number: int
"""
def __init__(self, number):
if number < 1 or number > 99:
raise IndexError, "Track number must be from 1 to 99"
self.number = number
self._indexes = {} # index number -> (sector, File)
self.title = None
self.performer = None
def __repr__(self):
return '<Track %02d with %d indexes>' % (self.number,
len(self._indexes.keys()))
def index(self, number, sector, file):
"""
Add the given index to the current track.
@type file: L{File}
"""
if number in self._indexes.keys():
raise KeyError, "index %d already in track %d" % (
number, self.number)
if number < 0 or number > 99:
raise IndexError, "Index number must be from 0 to 99"
self._indexes[number] = (sector, file)
def getIndex(self, number):
"""
@rtype: tuple of (int, File)
"""
return self._indexes[number]

View File

@@ -32,6 +32,8 @@ import gst
from morituri.common import task, checksum, log
from morituri.image import cue, table
from morituri.test import common
class Image(object, log.Loggable):
"""
@ivar table: The Table of Contents for this image.
@@ -47,6 +49,7 @@ class Image(object, log.Loggable):
self.cue.parse()
self._offsets = [] # 0 .. trackCount - 1
self._lengths = [] # 0 .. trackCount - 1
self.table = None
def getRealPath(self, path):
@@ -60,6 +63,7 @@ class Image(object, log.Loggable):
Do initial setup, like figuring out track lengths, and
constructing the Table of Contents.
"""
self.debug('setup image start')
verify = ImageVerifyTask(self)
self.debug('verifying image')
runner.run(verify)
@@ -69,7 +73,7 @@ class Image(object, log.Loggable):
# CD's have a standard lead-in time of 2 seconds;
# checksums that use it should add it there
offset = self.cue.tracks[0].getIndex(1)[0]
offset = self.cue.tracks[0].getIndex(1).relative
tracks = []
@@ -77,14 +81,21 @@ class Image(object, log.Loggable):
length = self.cue.getTrackLength(self.cue.tracks[i])
if length == -1:
length = verify.lengths[i + 1]
tracks.append(table.Track(i + 1, offset, offset + length - 1))
t = table.ITTrack(i + 1, audio=True)
tracks.append(t)
# FIXME: this probably only works for non-compliant .CUE files
# where pregap is put at end of previous file
t.index(1, absolute=offset, path=self.cue.tracks[i].getIndex(1).path,
relative=0)
offset += length
self.table = table.Table(tracks)
self.table = table.IndexTable(tracks)
self.table.leadout = offset
self.debug('setup image done')
class AccurateRipChecksumTask(task.MultiTask):
class AccurateRipChecksumTask(task.MultiSeparateTask):
"""
I calculate the AccurateRip checksums of all tracks.
"""
@@ -96,22 +107,22 @@ class AccurateRipChecksumTask(task.MultiTask):
cue = image.cue
self.checksums = []
self.debug('Checksumming %d tracks' % len(cue.tracks))
for trackIndex, track in enumerate(cue.tracks):
index = track._indexes[1]
index = track.indexes[1]
length = cue.getTrackLength(track)
file = index[1]
offset = index[0]
self.debug('track %d has length %d' % (trackIndex + 1, length))
path = image.getRealPath(file.path)
path = image.getRealPath(index.path)
checksumTask = checksum.AccurateRipChecksumTask(path,
trackNumber=trackIndex + 1, trackCount=len(cue.tracks),
frameStart=offset * checksum.SAMPLES_PER_FRAME,
frameStart=index.relative * checksum.SAMPLES_PER_FRAME,
frameLength=length * checksum.SAMPLES_PER_FRAME)
self.addTask(checksumTask)
def stop(self):
self.checksums = [t.checksum for t in self.tasks]
task.MultiTask.stop(self)
task.MultiSeparateTask.stop(self)
class AudioLengthTask(task.Task):
"""
@@ -151,7 +162,7 @@ class AudioLengthTask(task.Task):
self.stop()
class ImageVerifyTask(task.MultiTask):
class ImageVerifyTask(task.MultiSeparateTask):
"""
I verify a disk image and get the necessary track lengths.
"""
@@ -167,13 +178,11 @@ class ImageVerifyTask(task.MultiTask):
for trackIndex, track in enumerate(cue.tracks):
self.debug('verifying track %d', trackIndex + 1)
index = track._indexes[1]
offset = index[0]
index = track.indexes[1]
length = cue.getTrackLength(track)
file = index[1]
if length == -1:
path = image.getRealPath(file.path)
path = image.getRealPath(index.path)
self.debug('schedule scan of audio length of %s', path)
taskk = AudioLengthTask(path)
self.addTask(taskk)
@@ -184,13 +193,12 @@ class ImageVerifyTask(task.MultiTask):
def stop(self):
for trackIndex, track, taskk in self._tasks:
# print '%d has length %d' % (trackIndex, taskk.length)
index = track._indexes[1]
offset = index[0]
index = track.indexes[1]
assert taskk.length % checksum.SAMPLES_PER_FRAME == 0
end = taskk.length / checksum.SAMPLES_PER_FRAME
self.lengths[trackIndex] = end - offset
self.lengths[trackIndex] = end - index.relative
task.MultiTask.stop(self)
task.MultiSeparateTask.stop(self)
# FIXME: move this method to a different module ?
def getAccurateRipResponses(data):

View File

@@ -29,8 +29,7 @@ import struct
import gst
from morituri.common import task, checksum
from morituri.image import cue
from morituri.common import task, checksum, common
class Track:
"""
@@ -194,3 +193,230 @@ class Table:
"%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
len(self.tracks), discId1, discId2, self.getCDDBDiscId())
class ITTrack:
"""
I represent a track entry in an IndexTable.
@ivar number: track number (1-based)
@type number: int
@ivar audio: whether the track is audio
@type audio: bool
@type indexes: dict of number -> L{Index}
"""
number = None
audio = None
indexes = None
def __repr__(self):
return '<Track %02d>' % self.number
def __init__(self, number, audio=True):
self.number = number
self.audio = audio
self.indexes = {}
def index(self, number, absolute=None, path=None, relative=None, counter=None):
i = Index(number, absolute, path, relative, counter)
self.indexes[number] = i
def getIndex(self, number):
return self.indexes[number]
def getFirstIndex(self):
indexes = self.indexes.keys()
indexes.sort()
return self.indexes[indexes[0]]
class Index:
"""
@ivar counter: counter for the index source; distinguishes between
the matching FILE lines in .cue files for example
"""
number = None
absolute = None
path = None
relative = None
counter = None
def __init__(self, number, absolute=None, path=None, relative=None, counter=None):
self.number = number
self.absolute = absolute
self.path = path
self.relative = relative
self.counter = counter
def __repr__(self):
return '<Index %02d, absolute %d, path %d, relative %d>' % (
self.number, self.absolute, self.path, self.relative)
class IndexTable:
"""
I represent the Table of Contents of a CD.
@ivar tracks: tracks on this CD
@type tracks: list of L{ITTrack}
"""
tracks = None # list of ITTrack
leadout = None # offset where the leadout starts
def __init__(self, tracks=None):
if not tracks:
tracks = []
self.tracks = tracks
def getTrackStart(self, number):
"""
@param number: the track number, 1-based
@type number: int
@returns: the start of the given track number's index 1, in CD frames
@rtype: int
"""
track = self.tracks[number - 1]
return track.getIndex(1).absolute
def getTrackEnd(self, number):
"""
@param number: the track number, 1-based
@type number: int
@returns: the end of the given track number (ie index 1 of next track)
@rtype: int
"""
end = self.leadout - 1
if number < len(self.tracks):
end = self.tracks[number].getIndex(1).absolute - 1
return end
def getTrackLength(self, number):
"""
@param number: the track number, 1-based
@type number: int
@returns: the length of the given track number, in CD frames
@rtype: int
"""
track = self.tracks[number - 1]
return self.getTrackEnd(number) - self.getTrackStart(number) + 1
def getAudioTracks(self):
"""
@returns: the number of audio tracks on the CD
@rtype: int
"""
return len([t for t in self.tracks if t.audio])
def _cddbSum(self, i):
ret = 0
while i > 0:
ret += (i % 10)
i /= 10
return ret
def getCDDBDiscId(self):
"""
Calculate the CDDB disc ID.
@rtype: str
@returns: the 8-character hexadecimal disc ID
"""
# cddb disc id takes into account data tracks
# last byte is the number of tracks on the CD
n = 0
for track in self.tracks:
# CD's have a standard lead-in time of 2 seconds
# which gets added for CDDB disc id's
offset = self.getTrackStart(track.number) + \
2 * checksum.FRAMES_PER_SECOND
seconds = offset / checksum.FRAMES_PER_SECOND
n += self._cddbSum(seconds)
last = self.tracks[-1]
leadout = self.getTrackEnd(last.number)
frameLength = leadout - self.getTrackStart(1)
t = frameLength / checksum.FRAMES_PER_SECOND
value = (n % 0xff) << 24 | t << 8 | len(self.tracks)
return "%08x" % value
def getAccurateRipIds(self):
"""
Calculate the two AccurateRip ID's.
@returns: the two 8-character hexadecimal disc ID's
@rtype: tuple of (str, str)
"""
# AccurateRip does not take into account data tracks,
# but does count the data track to determine the leadout offset
discId1 = 0
discId2 = 0
for track in self.tracks:
if not track.audio:
continue
offset = self.getTrackStart(track.number)
discId1 += offset
discId2 += (offset or 1) * track.number
# also add end values, where leadout offset is one past the end
# of the last track
last = self.tracks[-1]
offset = self.getTrackEnd(last.number) + 1
discId1 += offset
discId2 += offset * (self.getAudioTracks() + 1)
discId1 &= 0xffffffff
discId2 &= 0xffffffff
return ("%08x" % discId1, "%08x" % discId2)
def getAccurateRipURL(self):
"""
Return the full AccurateRip URL.
@returns: the AccurateRip URL
@rtype: str
"""
discId1, discId2 = self.getAccurateRipIds()
return "http://www.accuraterip.com/accuraterip/" \
"%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
len(self.tracks), discId1, discId2, self.getCDDBDiscId())
def cue(self):
"""
Dump our internal representation to a .cue file content.
"""
lines = []
# add the first FILE line
path = self.tracks[0].getFirstIndex().path
currentPath = path
lines.append('FILE "%s" WAVE' % path)
for i, track in enumerate(self.tracks):
lines.append(" TRACK %02d %s" % (i + 1, 'AUDIO'))
indexes = track.indexes.keys()
indexes.sort()
for number in indexes:
index = track.indexes[number]
if index.path != currentPath:
lines.append('FILE "%s" WAVE' % index.path)
lines.append(" INDEX %02d %s" % (number,
common.framesToMSF(index.relative)))
lines.append("")
return "\n".join(lines)

View File

@@ -196,7 +196,7 @@ class ReadTrackTask(task.Task):
self.stop()
return
class ReadVerifyTrackTask(task.MultiTask):
class ReadVerifyTrackTask(task.MultiSeparateTask):
"""
I am a task that reads and verifies a track using cdparanoia.
@@ -250,4 +250,4 @@ class ReadVerifyTrackTask(task.MultiTask):
print 'ERROR: read and verify failed'
self.checksum = None
task.MultiTask.stop(self)
task.MultiSeparateTask.stop(self)

View File

@@ -5,7 +5,9 @@ import os
import tempfile
import unittest
from morituri.image import cue
from morituri.test import common
from morituri.image import table, cue
class KingsSingleTestCase(unittest.TestCase):
def setUp(self):
@@ -51,26 +53,26 @@ class WriteCueTestCase(unittest.TestCase):
def testWrite(self):
fd, path = tempfile.mkstemp(suffix='morituri.test.cue')
os.close(fd)
c = cue.Cue(path)
f = cue.File('track01.wav', 'AUDIO')
t = cue.Track(1)
t.index(1, 0, f)
c.tracks.append(t)
it = table.IndexTable()
t = cue.Track(2)
t.index(0, 1000, f)
f = cue.File('track02.wav', 'AUDIO')
t.index(1, 1100, f)
c.tracks.append(t)
t = table.ITTrack(1)
t.index(1, path='track01.wav', relative=0)
it.tracks.append(t)
self.assertEquals(c.dump(), """FILE "track01.wav" WAVE
t = table.ITTrack(2)
t.index(0, path='track01.wav', relative=1000)
t.index(1, path='track02.wav', relative=0)
it.tracks.append(t)
self.assertEquals(it.cue(), """FILE "track01.wav" WAVE
TRACK 01 AUDIO
INDEX 01 00:00:00
TRACK 02 AUDIO
INDEX 00 00:13:25
FILE "track02.wav" WAVE
INDEX 01 00:14:50
INDEX 01 00:00:00
""")