Files
whipper-gui/morituri/rip/cd.py
Thomas Vander Stichele 262801e554 * morituri/rip/cd.py:
Add asserts for comparing id's between the simple toc and
	  the full table.
	  Create the output directory before ripping the htoa.
	  Ignore data tracks for now.
	  Don't fail if we have no AccurateRip responses.
	* morituri/image/table.py:
	  Add a session ivar to Track.
	  Factor in session leadin when calculating track length
	  of last track in a session.
	  add getMusicBrainzSubmitURL()
	  add _getSessionGap() because the session gap size is different
	  for session 2 and all following.
	  Use it in merge() to get offsets right.
	  Fix getAccurateRipURL by only using the audio tracks for the
	  'length in tracks' number
	  Temporarily disable writing out data tracks to a .cue file,
	  since it's not implemented yet.
	  Add canCue to see if we can write a .cue file from the given table,
	  and debug why not if not.
	* morituri/program/cdrdao.py:
	  Rework to rip each session separately instead of using session 9.
	  This fixes session 9 read-toc missing the pregap.
	  Add a simple LineParser for handling output from disk-info.
	  Count tracks relatively for the session, because the output for
	  session 2 for track numbers picks up where session 1 left off.
	  Don't set leadout from TOC printing since for the same reason
	  session 2's leadout is absolute, not relative to start of session.
	  Add a DiscInfoTask.
	  Convert Table and Toc reading tasks to multitasks, first getting the
	  number of sessions, then reading table/toc for each session.
	* morituri/test/test_image_table.py:
	  Fix up MusicBrainz disc id for my Ladyhawke disc.
	  Add AccurateRip URL verification, compared against EAC's.
	* morituri/test/test_image_toc.py:
	  Use two separate session read-toc output files to verify
	  the case of Das Capital.
	  Verify musicbrainz URL.
2009-05-25 14:59:45 +00:00

433 lines
15 KiB
Python

# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import gobject
gobject.threads_init()
from morituri.common import logcommand, task, checksum, common, accurip, drive
from morituri.image import image, cue, table
from morituri.program import cdrdao, cdparanoia
class TrackMetadata(object):
artist = None
title = None
class DiscMetadata(object):
artist = None
title = None
various = False
tracks = None
def __init__(self):
self.tracks = []
def filterForPath(text):
return "-".join(text.split("/"))
def musicbrainz(discid):
metadata = DiscMetadata()
#import musicbrainz2.disc as mbdisc
import musicbrainz2.webservice as mbws
# Setup a Query object.
service = mbws.WebService()
query = mbws.Query(service)
# Query for all discs matching the given DiscID.
try:
rfilter = mbws.ReleaseFilter(discId=discid)
results = query.getReleases(rfilter)
except mbws.WebServiceError, e:
print "Error:", e
return
# No disc matching this DiscID has been found.
if len(results) == 0:
print "Disc is not yet in the MusicBrainz database."
print "Consider adding it."
return
# Display the returned results to the user.
print 'Matching releases:'
for result in results:
release = result.release
print 'Artist :', release.artist.name
print 'Title :', release.title
print
# Select one of the returned releases. We just pick the first one.
selectedRelease = results[0].release
# The returned release object only contains title and artist, but no tracks.
# Query the web service once again to get all data we need.
try:
inc = mbws.ReleaseIncludes(artist=True, tracks=True, releaseEvents=True)
release = query.getReleaseById(selectedRelease.getId(), inc)
except mbws.WebServiceError, e:
print "Error:", e
sys.exit(2)
# convert to our objects
isSingleArtist = release.isSingleArtistRelease()
metadata.various = not isSingleArtist
metadata.title = release.title
metadata.artist = release.artist.getUniqueName()
print "%s - %s" % (release.artist.getUniqueName(), release.title)
for t in release.tracks:
track = TrackMetadata()
if isSingleArtist:
track.artist = metadata.artist
track.title = t.title
else:
track.artist = t.artist.name
track.title = t.title
metadata.tracks.append(track)
return metadata
def getPath(outdir, template, metadata, i):
"""
Based on the template, get a complete path for the given track,
minus extension.
Also works for the disc name, using disc variables for the template.
@param outdir: the directory where to write the files
@type outdir: str
@param template: the template for writing the file
@type template: str
@param metadata:
@type metadata: L{DiscMetadata}
@param i: track number (0 for HTOA)
@type i: int
"""
# returns without extension
v = {}
v['t'] = '%02d' % (i + 1)
# default values
v['A'] = 'Unknown Artist'
v['d'] = 'Unknown Disc'
v['a'] = v['A']
v['n'] = 'Unknown Track'
if metadata:
v['A'] = filterForPath(metadata.artist)
v['d'] = filterForPath(metadata.title)
if i >= 0:
try:
v['a'] = filterForPath(metadata.tracks[i - 1].artist)
v['n'] = filterForPath(metadata.tracks[i - 1].title)
except IndexError, e:
print 'ERROR: no track %d found, %r' % (i, e)
raise
else:
# htoa defaults to disc's artist
v['a'] = filterForPath(metadata.artist)
v['n'] = filterForPath('Hidden Track One Audio')
import re
template = re.sub(r'%(\w)', r'%(\1)s', template)
return os.path.join(outdir, template % v)
class Rip(logcommand.LogCommand):
summary = "rip CD"
def addOptions(self):
# FIXME: get from config
default = 0
self.parser.add_option('-o', '--offset',
action="store", dest="offset",
help="sample read offset (defaults to %d)" % default,
default=default)
self.parser.add_option('-O', '--output-directory',
action="store", dest="output_directory",
help="output directory (defaults to current directory)")
# FIXME: have a cache of these pickles somewhere
self.parser.add_option('-t', '--table-pickle',
action="store", dest="table_pickle",
help="pickle to use for reading and writing the table",
default=default)
self.parser.add_option('-T', '--toc-pickle',
action="store", dest="toc_pickle",
help="pickle to use for reading and writing the TOC",
default=default)
# FIXME: get from config
default = '%A - %d/%t. %a - %n'
self.parser.add_option('', '--track-template',
action="store", dest="track_template",
help="template for track file naming (default %s)" % default,
default=default)
default = '%A - %d/%A - %d'
self.parser.add_option('', '--disc-template',
action="store", dest="disc_template",
help="template for disc file naming (default %s)" % default,
default=default)
def do(self, args):
runner = task.SyncRunner()
def function(r, t):
r.run(t)
# first, read the normal TOC, which is fast
ptoc = common.Persister(self.options.toc_pickle or None)
if not ptoc.object:
t = cdrdao.ReadTOCTask(device=self.parentCommand.options.device)
function(runner, t)
ptoc.persist(t.table)
ittoc = ptoc.object
assert ittoc.hasTOC()
# already show us some info based on this
print "CDDB disc id", ittoc.getCDDBDiscId()
print "MusicBrainz disc id", ittoc.getMusicBrainzDiscId()
metadata = musicbrainz(ittoc.getMusicBrainzDiscId())
if not metadata:
print 'Submit this disc to MusicBrainz at:'
print ittoc.getMusicBrainzSubmitURL()
# now, read the complete index table, which is slower
path = os.path.join(os.path.expanduser('~'), '.morituri', 'cache',
'table')
pcache = common.PersistedCache(path)
ptable = pcache.get(ittoc.getCDDBDiscId())
if not ptable.object:
t = cdrdao.ReadTableTask(device=self.parentCommand.options.device)
function(runner, t)
ptable.persist(t.table)
itable = ptable.object
assert itable.hasTOC()
assert itable.getCDDBDiscId() == ittoc.getCDDBDiscId(), \
"full table's id %s differs from toc id %s" % (
itable.getCDDBDiscId(), ittoc.getCDDBDiscId())
assert itable.getMusicBrainzDiscId() == ittoc.getMusicBrainzDiscId(), \
"full table's mb id %s differs from toc id mb %s" % (
itable.getMusicBrainzDiscId(), ittoc.getMusicBrainzDiscId())
assert itable.getAccurateRipURL() == ittoc.getAccurateRipURL(), \
"full table's AR URL %s differs from toc AR URL %s" % (
itable.getAccurateRipURL(), ittoc.getAccurateRipURL())
outdir = self.options.output_directory or os.getcwd()
# check for hidden track one audio
htoapath = None
index = None
track = itable.tracks[0]
try:
index = track.getIndex(0)
except KeyError:
pass
if index:
start = index.absolute
stop = track.getIndex(1).absolute
print 'Found Hidden Track One Audio from frame %d to %d' % (start, stop)
# rip it
htoapath = getPath(outdir, self.options.track_template, metadata, -1) + '.wav'
dirname = os.path.dirname(htoapath)
if not os.path.exists(dirname):
os.makedirs(dirname)
htoalength = stop - start
if not os.path.exists(htoapath):
print 'Ripping track %d: %s' % (0, os.path.basename(htoapath))
t = cdparanoia.ReadVerifyTrackTask(htoapath, ittoc,
start, stop - 1,
offset=int(self.options.offset),
device=self.parentCommand.options.device)
function(runner, t)
if t.checksum is not None:
print 'Checksums match for track %d' % 0
else:
print 'ERROR: checksums did not match for track %d' % 0
# overlay this rip onto the Table
itable.setFile(1, 0, htoapath, htoalength, 0)
for i, track in enumerate(itable.tracks):
# FIXME: rip data tracks differently
if not track.audio:
# FIXME: make it work for now
track.indexes[1].relative = 0
continue
path = getPath(outdir, self.options.track_template, metadata, i) + '.wav'
dirname = os.path.dirname(path)
if not os.path.exists(dirname):
os.makedirs(dirname)
# FIXME: optionally allow overriding reripping
if not os.path.exists(path):
print 'Ripping track %d: %s' % (i + 1, os.path.basename(path))
t = cdparanoia.ReadVerifyTrackTask(path, ittoc,
ittoc.getTrackStart(i + 1),
ittoc.getTrackEnd(i + 1),
offset=int(self.options.offset),
device=self.parentCommand.options.device)
t.description = 'Reading Track %d' % (i + 1)
function(runner, t)
if t.checksum:
print 'Checksums match for track %d' % (i + 1)
else:
print 'ERROR: checksums did not match for track %d' % (i + 1)
# overlay this rip onto the Table
itable.setFile(i + 1, 1, path, ittoc.getTrackLength(i + 1), i + 1)
### write disc files
discName = getPath(outdir, self.options.disc_template, metadata, i)
dirname = os.path.dirname(discName)
if not os.path.exists(dirname):
os.makedirs(dirname)
# write .cue file
assert itable.canCue()
cuePath = '%s.cue' % discName
handle = open(cuePath, 'w')
handle.write(itable.cue())
handle.close()
# write .m3u file
m3uPath = '%s.m3u' % discName
handle = open(m3uPath, 'w')
handle.write('#EXTM3U\n')
if htoapath:
handle.write('#EXTINF:%d,%s\n' % (
htoalength / common.FRAMES_PER_SECOND,
os.path.basename(htoapath[:-4])))
handle.write('%s\n' % os.path.basename(htoapath))
for i, track in enumerate(itable.tracks):
path = getPath(outdir, self.options.track_template, metadata, i) + '.wav'
handle.write('#EXTINF:%d,%s\n' % (
itable.getTrackLength(i + 1) / common.FRAMES_PER_SECOND,
os.path.basename(path)))
handle.write('%s\n' % os.path.basename(path))
handle.close()
# verify using accuraterip
url = ittoc.getAccurateRipURL()
print "AccurateRip URL", url
cache = accurip.AccuCache()
responses = cache.retrieve(url)
if not responses:
print 'Album not found in AccurateRip database'
if responses:
print '%d AccurateRip reponses found' % len(responses)
if responses[0].cddbDiscId != itable.getCDDBDiscId():
print "AccurateRip response discid different: %s" % \
responses[0].cddbDiscId
# FIXME: put accuraterip verification into a separate task/function
# and apply here
cueImage = image.Image(cuePath)
verifytask = image.ImageVerifyTask(cueImage)
cuetask = image.AccurateRipChecksumTask(cueImage)
function(runner, verifytask)
function(runner, cuetask)
response = None # track which response matches, for all tracks
# loop over tracks
for i, csum in enumerate(cuetask.checksums):
status = 'rip NOT accurate'
confidence = None
# match against each response's checksum
for j, r in enumerate(responses or []):
if "%08x" % csum == r.checksums[i]:
if not response:
response = r
else:
assert r == response, \
"checksum %s for %d matches wrong response %d, "\
"checksum %s" % (
csum, i + 1, j + 1, response.checksums[i])
status = 'rip accurate '
# arsum = csum
confidence = response.confidences[i]
c = "(not found)"
ar = "(not in database)"
if responses:
if not response:
print 'ERROR: none of the responses matched.'
else:
maxConfidence = max(r.confidences[i] for r in responses)
c = "(max confidence %3d)" % maxConfidence
if confidence is not None:
if confidence < maxConfidence:
c = "(confidence %3d of %3d)" % (confidence, maxConfidence)
ar = ", AR [%s]" % response.checksums[i]
print "Track %2d: %s %s [%08x]%s" % (
i + 1, status, c, csum, ar)
class CD(logcommand.LogCommand):
summary = "handle CD's"
subCommandClasses = [Rip, ]
def addOptions(self):
self.parser.add_option('-d', '--device',
action="store", dest="device",
help="CD-DA device")
def handleOptions(self, options):
if not options.device:
drives = drive.getAllDevicePaths()
if not drives:
self.error('No CD-DA drives found!')
return 3
self.options.device = drives[0]