argparse & logging (#92)

* introduce logcommand.Lager, Whipper(); use argparse for whipper image commands, stub logging

* update Lager docstring to mention config.Config()

* make incorrect subcommand and --version work on toplevel command

* migrate accurip show, expand Lager, do not attempt to return from Lager.__init__.

* migrate offset find, add Lager.error

* correct offset find drive symlink handling

* migrate drive

* change Lager.__init__(prog) to arg from kwarg

* but actually

* remove Whipper.usage

* add and use Lager.device_option() context manager

* help I married an axe murderer

* use unified options namespace for entire command tree

* migrate whipper cd without comprehensive config loading

* switch to logging module

- use logging instead of flog for non-extern modules
- use WHIPPER_DEBUG and WHIPPER_LOGFILE env variables

* convert self.log calls to logger.debug

* convert self.error calls to logger.error

* remove log.Loggable, use logger not logging

* Logging conversion continues

- Convert log.* calls to logger.*
- Remove morituri.common.log imports

* remove morituri.common.log from tests

* remove extern/flog, bare minimum Debug conversion

* update README for logging changes

* update soxi to use logging

* refactor Lager for more declarative subcommands

* Refactor Lager.device_option:

- inline into __init__
- throw IOError instead of Exception for missing drives
- remove CommandError checking in rip/main

* rename rip to whipper in rip.main

* convert rip.debug commands

* Rename logcommand.Lager to command.BaseCommand

- remove command.CommandError occurrences
- remove python-command external module

* remove submodules from README, update rclog formatter

* update minor ambiguity in readme for command invocation

* update version number to match setup.py

* remove gitmodules

* update version number in tests as well (boo)

* convert logger.error to logger.critical

* Change morituri.rip to morituri.command

- mv common.command to command.basecommand
- move TEMPLATES used only by rip.cd out of rip.common
- update entry point for command to command.main

* update basecommand documentation

* go pyflaking: import fixing

* replace self.stdout with sys.stdout

* remove BaseCommand.config, alphabetise imports

* convert self.stdXXX leftovers

* convert last getRootCommand to config.Config

* convert last getExceptionMessage's to str

* change musicbrainz useragent to whipper
This commit is contained in:
Samantha Baldwin
2016-12-20 17:11:30 -05:00
committed by JoeLametta
parent 8f4607de4c
commit d1ed80d62a
48 changed files with 1049 additions and 1072 deletions

View File

102
morituri/command/accurip.py Normal file
View File

@@ -0,0 +1,102 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import sys
from morituri.command.basecommand import BaseCommand
from morituri.common import accurip
import logging
logger = logging.getLogger(__name__)
class Show(BaseCommand):
summary = "show accuraterip data"
description = """
retrieves and display accuraterip data from the given URL
"""
def add_arguments(self):
self.parser.add_argument('url', action='store',
help="accuraterip URL to load data from")
def do(self):
url = self.options.url
cache = accurip.AccuCache()
responses = cache.retrieve(url)
count = responses[0].trackCount
sys.stdout.write("Found %d responses for %d tracks\n\n" % (
len(responses), count))
for (i, r) in enumerate(responses):
if r.trackCount != count:
sys.stdout.write(
"Warning: response %d has %d tracks instead of %d\n" % (
i, r.trackCount, count))
# checksum and confidence by track
for track in range(count):
sys.stdout.write("Track %d:\n" % (track + 1))
checksums = {}
for (i, r) in enumerate(responses):
if r.trackCount != count:
continue
assert len(r.checksums) == r.trackCount
assert len(r.confidences) == r.trackCount
entry = {}
entry["confidence"] = r.confidences[track]
entry["response"] = i + 1
checksum = r.checksums[track]
if checksum in checksums:
checksums[checksum].append(entry)
else:
checksums[checksum] = [entry, ]
# now sort track results in checksum by highest confidence
sortedChecksums = []
for checksum, entries in checksums.items():
highest = max(d['confidence'] for d in entries)
sortedChecksums.append((highest, checksum))
sortedChecksums.sort()
sortedChecksums.reverse()
for highest, checksum in sortedChecksums:
sys.stdout.write(" %d result(s) for checksum %s: %s\n" % (
len(checksums[checksum]), checksum,
str(checksums[checksum])))
class AccuRip(BaseCommand):
summary = "handle AccurateRip information"
description = """
Handle AccurateRip information. Retrieves AccurateRip disc entries and
displays diagnostic information.
"""
subcommands = {
'show': Show
}

View File

@@ -0,0 +1,129 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
import argparse
import os
import sys
from morituri.common import drive
import logging
logger = logging.getLogger(__name__)
# Q: What about argparse.add_subparsers(), you ask?
# A: Unfortunately add_subparsers() does not support specifying the
# formatter_class of subparsers, nor does it support epilogs, so
# it does not quite fit our use case.
# Q: Why not subclass ArgumentParser and extend/replace the relevant
# methods?
# A: If this can be done in a simpler fashion than this current
# implementation, by all means submit a patch.
# Q: Why not argparse.parse_known_args()?
# A: The prefix matching prevents passing '-h' (and possibly other
# options) to the child command.
class BaseCommand():
"""
A base command class for whipper commands.
Creates an argparse.ArgumentParser.
Override add_arguments() and handle_arguments() to register
and process arguments before & after argparse.parse_args().
Provides self.epilog() formatting command for argparse.
device_option = True adds -d / --device option to current command
no_add_help = True removes -h / --help option from current command
Overriding formatter_class sets the argparse formatter class.
If the 'subcommands' dictionary is set, __init__ searches the
arguments for subcommands.keys() and instantiates the class
implementing the subcommand as self.cmd, passing all non-understood
arguments, the current options namespace, and the full command path
name.
"""
device_option = False
no_add_help = False # for rip.main.Whipper
formatter_class = argparse.RawDescriptionHelpFormatter
def __init__(self, argv, prog_name, opts):
self.opts = opts # for Rip.add_arguments()
self.prog_name = prog_name
self.init_parser()
self.add_arguments()
if hasattr(self, 'subcommands'):
self.parser.add_argument('remainder',
nargs=argparse.REMAINDER,
help=argparse.SUPPRESS)
if self.device_option:
# pick the first drive as default
drives = drive.getAllDevicePaths()
if not drives:
msg = 'No CD-DA drives found!'
logger.critical(msg)
# morituri exited with return code 3 here
raise IOError(msg)
self.parser.add_argument('-d', '--device',
action="store",
dest="device",
default=drives[0],
help="CD-DA device")
self.options = self.parser.parse_args(argv, namespace=opts)
if self.device_option:
# this can be a symlink to another device
self.options.device = os.path.realpath(self.options.device)
if not os.path.exists(self.options.device):
msg = 'CD-DA device %s not found!' % self.options.device
logger.critical(msg)
raise IOError(msg)
self.handle_arguments()
if hasattr(self, 'subcommands'):
if not self.options.remainder:
self.parser.print_help()
sys.exit(0)
if not self.options.remainder[0] in self.subcommands:
sys.stderr.write("incorrect subcommand: %s" %
self.options.remainder[0])
sys.exit(1)
self.cmd = self.subcommands[self.options.remainder[0]](
self.options.remainder[1:],
prog_name + " " + self.options.remainder[0],
self.options
)
def init_parser(self):
kw = {
'prog': self.prog_name,
'description': self.description,
'formatter_class': self.formatter_class,
}
if hasattr(self, 'subcommands'):
kw['epilog'] = self.epilog()
if self.no_add_help:
kw['add_help'] = False
self.parser = argparse.ArgumentParser(**kw)
def add_arguments(self):
pass
def handle_arguments(self):
pass
def do(self):
return self.cmd.do()
def epilog(self):
s = "commands:\n"
for com in sorted(self.subcommands.keys()):
s += " %s %s\n" % (com.ljust(8), self.subcommands[com].summary)
return s

601
morituri/command/cd.py Normal file
View File

@@ -0,0 +1,601 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import argparse
import os
import glob
import urllib2
import socket
import sys
import gobject
gobject.threads_init()
from morituri.command.basecommand import BaseCommand
from morituri.common import (
accurip, common, config, drive, gstreamer, program, task
)
from morituri.program import cdrdao, cdparanoia
from morituri.result import result
import logging
logger = logging.getLogger(__name__)
SILENT = 1e-10
MAX_TRIES = 5
DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n'
DEFAULT_DISC_TEMPLATE = u'%r/%A - %d/%A - %d'
TEMPLATE_DESCRIPTION = '''
Tracks are named according to the track template, filling in the variables
and adding the file extension. Variables exclusive to the track template are:
- %t: track number
- %a: track artist
- %n: track title
- %s: track sort name
Disc files (.cue, .log, .m3u) are named according to the disc template,
filling in the variables and adding the file extension. Variables for both
disc and track template are:
- %A: album artist
- %S: album sort name
- %d: disc title
- %y: release year
- %r: release type, lowercase
- %R: Release type, normal case
- %x: audio extension, lowercase
- %X: audio extension, uppercase
'''
class _CD(BaseCommand):
"""
@type program: L{program.Program}
@ivar eject: whether to eject the drive after completing
"""
eject = True
@staticmethod
def add_arguments(parser):
# FIXME: have a cache of these pickles somewhere
parser.add_argument('-T', '--toc-pickle',
action="store", dest="toc_pickle",
help="pickle to use for reading and writing the TOC")
parser.add_argument('-R', '--release-id',
action="store", dest="release_id",
help="MusicBrainz release id to match to (if there are multiple)")
parser.add_argument('-p', '--prompt',
action="store_true", dest="prompt",
help="Prompt if there are multiple matching releases")
parser.add_argument('-c', '--country',
action="store", dest="country",
help="Filter releases by country")
def do(self):
self.config = config.Config()
self.program = program.Program(self.config,
record=self.options.record,
stdout=sys.stdout)
self.runner = task.SyncRunner()
# if the device is mounted (data session), unmount it
#self.device = self.parentCommand.options.device
self.device = self.options.device
sys.stdout.write('Checking device %s\n' % self.device)
self.program.loadDevice(self.device)
self.program.unmountDevice(self.device)
# first, read the normal TOC, which is fast
self.ittoc = self.program.getFastToc(self.runner,
self.options.toc_pickle,
self.device)
# already show us some info based on this
self.program.getRipResult(self.ittoc.getCDDBDiscId())
sys.stdout.write("CDDB disc id: %s\n" % self.ittoc.getCDDBDiscId())
self.mbdiscid = self.ittoc.getMusicBrainzDiscId()
sys.stdout.write("MusicBrainz disc id %s\n" % self.mbdiscid)
sys.stdout.write("MusicBrainz lookup URL %s\n" %
self.ittoc.getMusicBrainzSubmitURL())
self.program.metadata = self.program.getMusicBrainz(self.ittoc,
self.mbdiscid,
release=self.options.release_id,
country=self.options.country,
prompt=self.options.prompt)
if not self.program.metadata:
# fall back to FreeDB for lookup
cddbid = self.ittoc.getCDDBValues()
cddbmd = self.program.getCDDB(cddbid)
if cddbmd:
sys.stdout.write('FreeDB identifies disc as %s\n' % cddbmd)
# also used by rip cd info
if not getattr(self.options, 'unknown', False):
if self.eject:
self.program.ejectDevice(self.device)
return -1
# FIXME ?????
# Hackish fix for broken commit
offset = 0
info = drive.getDeviceInfo(self.device)
if info:
try:
offset = self.config.getReadOffset(*info)
except KeyError:
pass
# now, read the complete index table, which is slower
self.itable = self.program.getTable(self.runner,
self.ittoc.getCDDBDiscId(),
self.ittoc.getMusicBrainzDiscId(), self.device, offset)
assert self.itable.getCDDBDiscId() == self.ittoc.getCDDBDiscId(), \
"full table's id %s differs from toc id %s" % (
self.itable.getCDDBDiscId(), self.ittoc.getCDDBDiscId())
assert self.itable.getMusicBrainzDiscId() == \
self.ittoc.getMusicBrainzDiscId(), \
"full table's mb id %s differs from toc id mb %s" % (
self.itable.getMusicBrainzDiscId(),
self.ittoc.getMusicBrainzDiscId())
assert self.itable.getAccurateRipURL() == \
self.ittoc.getAccurateRipURL(), \
"full table's AR URL %s differs from toc AR URL %s" % (
self.itable.getAccurateRipURL(), self.ittoc.getAccurateRipURL())
if self.program.metadata:
self.program.metadata.discid = self.ittoc.getMusicBrainzDiscId()
# result
self.program.result.cdrdaoVersion = cdrdao.getCDRDAOVersion()
self.program.result.cdparanoiaVersion = \
cdparanoia.getCdParanoiaVersion()
info = drive.getDeviceInfo(self.device)
if info:
try:
self.program.result.cdparanoiaDefeatsCache = \
self.config.getDefeatsCache(*info)
except KeyError, e:
logger.debug('Got key error: %r' % (e, ))
self.program.result.artist = self.program.metadata \
and self.program.metadata.artist \
or 'Unknown Artist'
self.program.result.title = self.program.metadata \
and self.program.metadata.title \
or 'Unknown Title'
try:
import cdio
_, self.program.result.vendor, self.program.result.model, \
self.program.result.release = \
cdio.Device(self.device).get_hwinfo()
except ImportError:
raise ImportError("Pycdio module import failed.\n"
"This is a hard dependency: if not "
"available please install it")
self.doCommand()
if self.eject:
self.program.ejectDevice(self.device)
def doCommand(self):
pass
class Info(_CD):
summary = "retrieve information about the currently inserted CD"
description = ("Display musicbrainz, CDDB/FreeDB, and AccurateRip"
"information for the currently inserted CD.")
eject = False
# Requires opts.device
def add_arguments(self):
_CD.add_arguments(self.parser)
class Rip(_CD):
summary = "rip CD"
# see morituri.common.program.Program.getPath for expansion
description = """
Rips a CD.
%s
Paths to track files referenced in .cue and .m3u files will be made
relative to the directory of the disc files.
All files will be created relative to the given output directory.
Log files will log the path to tracks relative to this directory.
""" % TEMPLATE_DESCRIPTION
formatter_class = argparse.ArgumentDefaultsHelpFormatter
# Requires opts.record
# Requires opts.device
def add_arguments(self):
loggers = result.getLoggers().keys()
default_offset = None
info = drive.getDeviceInfo(self.opts.device)
if info:
try:
default_offset = config.Config().getReadOffset(*info)
sys.stdout.write("Using configured read offset %d\n" %
default_offset)
except KeyError:
pass
_CD.add_arguments(self.parser)
self.parser.add_argument('-L', '--logger',
action="store", dest="logger", default='morituri',
help="logger to use (choose from '" + "', '".join(loggers) + "')")
# FIXME: get from config
self.parser.add_argument('-o', '--offset',
action="store", dest="offset", default=default_offset,
help="sample read offset")
self.parser.add_argument('-x', '--force-overread',
action="store_true", dest="overread", default=False,
help="Force overreading into the lead-out portion of the disc. "
"Works only if the patched cdparanoia package is installed "
"and the drive supports this feature. ")
self.parser.add_argument('-O', '--output-directory',
action="store", dest="output_directory",
default=os.path.relpath(os.getcwd()),
help="output directory; will be included in file paths in log")
self.parser.add_argument('-W', '--working-directory',
action="store", dest="working_directory",
help="working directory; morituri will change to this directory "
"and files will be created relative to it when not absolute")
self.parser.add_argument('--track-template',
action="store", dest="track_template",
default=DEFAULT_TRACK_TEMPLATE,
help="template for track file naming (default default)")
self.parser.add_argument('--disc-template',
action="store", dest="disc_template",
default=DEFAULT_DISC_TEMPLATE,
help="template for disc file naming (default default)")
self.parser.add_argument('-U', '--unknown',
action="store_true", dest="unknown",
help="whether to continue ripping if the CD is unknown",
default=False)
def handle_arguments(self):
self.options.output_directory = os.path.expanduser(self.options.output_directory)
self.options.track_template = self.options.track_template.decode('utf-8')
self.options.disc_template = self.options.disc_template.decode('utf-8')
if self.options.offset is None:
raise ValueError("Drive offset is unconfigured.\n"
"Please install pycdio and run 'rip offset "
"find' to detect your drive's offset or set it "
"manually in the configuration file. It can "
"also be specified at runtime using the "
"'--offset=value' argument")
if self.options.working_directory is not None:
self.options.working_directory = os.path.expanduser(self.options.working_directory)
if self.options.logger:
try:
self.logger = result.getLoggers()[self.options.logger]()
except KeyError:
msg = "No logger named %s found!" % self.options.logger
logger.critical(msg)
raise ValueError(msg)
def doCommand(self):
# here to avoid import gst eating our options
from morituri.common import encode
profile = encode.PROFILES['flac']()
self.program.result.profileName = profile.name
self.program.result.profilePipeline = profile.pipeline
elementFactory = profile.pipeline.split(' ')[0]
self.program.result.gstreamerVersion = gstreamer.gstreamerVersion()
self.program.result.gstPythonVersion = gstreamer.gstPythonVersion()
self.program.result.encoderVersion = gstreamer.elementFactoryVersion(
elementFactory)
self.program.setWorkingDirectory(self.options.working_directory)
self.program.outdir = self.options.output_directory.decode('utf-8')
self.program.result.offset = int(self.options.offset)
self.program.result.overread = self.options.overread
self.program.result.logger = self.options.logger
### write disc files
disambiguate = False
while True:
discName = self.program.getPath(self.program.outdir,
self.options.disc_template, self.mbdiscid, 0,
profile=profile, disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if os.path.exists(dirname):
sys.stdout.write("Output directory %s already exists\n" %
dirname.encode('utf-8'))
logs = glob.glob(os.path.join(dirname, '*.log'))
if logs:
sys.stdout.write(
"Output directory %s is a finished rip\n" %
dirname.encode('utf-8'))
if not disambiguate:
disambiguate = True
continue
return
else:
break
else:
sys.stdout.write("Creating output directory %s\n" %
dirname.encode('utf-8'))
os.makedirs(dirname)
break
# FIXME: say when we're continuing a rip
# FIXME: disambiguate if the pre-existing rip is different
# FIXME: turn this into a method
def ripIfNotRipped(number):
logger.debug('ripIfNotRipped for track %d' % number)
# we can have a previous result
trackResult = self.program.result.getTrackResult(number)
if not trackResult:
trackResult = result.TrackResult()
self.program.result.tracks.append(trackResult)
else:
logger.debug('ripIfNotRipped have trackresult, path %r' %
trackResult.filename)
path = self.program.getPath(self.program.outdir,
self.options.track_template,
self.mbdiscid, number,
profile=profile, disambiguate=disambiguate) \
+ '.' + profile.extension
logger.debug('ripIfNotRipped: path %r' % path)
trackResult.number = number
assert type(path) is unicode, "%r is not unicode" % path
trackResult.filename = path
if number > 0:
trackResult.pregap = self.itable.tracks[number - 1].getPregap()
# FIXME: optionally allow overriding reripping
if os.path.exists(path):
if path != trackResult.filename:
# the path is different (different name/template ?)
# but we can copy it
logger.debug('previous result %r, expected %r' % (
trackResult.filename, path))
sys.stdout.write('Verifying track %d of %d: %s\n' % (
number, len(self.itable.tracks),
os.path.basename(path).encode('utf-8')))
if not self.program.verifyTrack(self.runner, trackResult):
sys.stdout.write('Verification failed, reripping...\n')
os.unlink(path)
if not os.path.exists(path):
logger.debug('path %r does not exist, ripping...' % path)
tries = 0
# we reset durations for test and copy here
trackResult.testduration = 0.0
trackResult.copyduration = 0.0
extra = ""
while tries < MAX_TRIES:
tries += 1
if tries > 1:
extra = " (try %d)" % tries
sys.stdout.write('Ripping track %d of %d%s: %s\n' % (
number, len(self.itable.tracks), extra,
os.path.basename(path).encode('utf-8')))
try:
logger.debug('ripIfNotRipped: track %d, try %d',
number, tries)
self.program.ripTrack(self.runner, trackResult,
offset=int(self.options.offset),
device=self.device,
profile=profile,
taglist=self.program.getTagList(number),
overread=self.options.overread,
what='track %d of %d%s' % (
number, len(self.itable.tracks), extra))
break
except Exception, e:
logger.debug('Got exception %r on try %d',
e, tries)
if tries == MAX_TRIES:
logger.critical('Giving up on track %d after %d times' % (
number, tries))
raise RuntimeError(
"track can't be ripped. "
"Rip attempts number is equal to 'MAX_TRIES'")
if trackResult.testcrc == trackResult.copycrc:
sys.stdout.write('Checksums match for track %d\n' %
number)
else:
sys.stdout.write(
'ERROR: checksums did not match for track %d\n' %
number)
raise
sys.stdout.write('Peak level: {:.2%} \n'.format(trackResult.peak))
sys.stdout.write('Rip quality: {:.2%}\n'.format(trackResult.quality))
# overlay this rip onto the Table
if number == 0:
# HTOA goes on index 0 of track 1
# ignore silence in PREGAP
if trackResult.peak <= SILENT:
logger.debug('HTOA peak %r is below SILENT threshold, disregarding', trackResult.peak)
self.itable.setFile(1, 0, None,
self.ittoc.getTrackStart(1), number)
logger.debug('Unlinking %r', trackResult.filename)
os.unlink(trackResult.filename)
trackResult.filename = None
sys.stdout.write('HTOA discarded, contains digital silence\n')
else:
self.itable.setFile(1, 0, trackResult.filename,
self.ittoc.getTrackStart(1), number)
else:
self.itable.setFile(number, 1, trackResult.filename,
self.ittoc.getTrackLength(number), number)
self.program.saveRipResult()
# check for hidden track one audio
htoapath = None
htoa = self.program.getHTOA()
if htoa:
start, stop = htoa
sys.stdout.write(
'Found Hidden Track One Audio from frame %d to %d\n' % (
start, stop))
# rip it
ripIfNotRipped(0)
htoapath = self.program.result.tracks[0].filename
for i, track in enumerate(self.itable.tracks):
# FIXME: rip data tracks differently
if not track.audio:
sys.stdout.write(
'WARNING: skipping data track %d, not implemented\n' % (
i + 1, ))
# FIXME: make it work for now
track.indexes[1].relative = 0
continue
ripIfNotRipped(i + 1)
### write disc files
discName = self.program.getPath(self.program.outdir,
self.options.disc_template, self.mbdiscid, 0,
profile=profile, disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if not os.path.exists(dirname):
os.makedirs(dirname)
logger.debug('writing cue file for %r', discName)
self.program.writeCue(discName)
# write .m3u file
logger.debug('writing m3u file for %r', discName)
m3uPath = u'%s.m3u' % discName
handle = open(m3uPath, 'w')
handle.write(u'#EXTM3U\n')
def writeFile(handle, path, length):
targetPath = common.getRelativePath(path, m3uPath)
u = u'#EXTINF:%d,%s\n' % (length, targetPath)
handle.write(u.encode('utf-8'))
u = '%s\n' % targetPath
handle.write(u.encode('utf-8'))
if htoapath:
writeFile(handle, htoapath,
self.itable.getTrackStart(1) / common.FRAMES_PER_SECOND)
for i, track in enumerate(self.itable.tracks):
if not track.audio:
continue
path = self.program.getPath(self.program.outdir,
self.options.track_template, self.mbdiscid, i + 1,
profile=profile,
disambiguate=disambiguate) + '.' + profile.extension
writeFile(handle, path,
self.itable.getTrackLength(i + 1) / common.FRAMES_PER_SECOND)
handle.close()
# verify using accuraterip
url = self.ittoc.getAccurateRipURL()
sys.stdout.write("AccurateRip URL %s\n" % url)
accucache = accurip.AccuCache()
try:
responses = accucache.retrieve(url)
except urllib2.URLError, e:
if isinstance(e.args[0], socket.gaierror):
if e.args[0].errno == -2:
sys.stdout.write("Warning: network error: %r\n" % (
e.args[0], ))
responses = None
else:
raise
else:
raise
if not responses:
sys.stdout.write('Album not found in AccurateRip database\n')
if responses:
sys.stdout.write('%d AccurateRip reponses found\n' %
len(responses))
if responses[0].cddbDiscId != self.itable.getCDDBDiscId():
sys.stdout.write(
"AccurateRip response discid different: %s\n" %
responses[0].cddbDiscId)
self.program.verifyImage(self.runner, responses)
sys.stdout.write("\n".join(
self.program.getAccurateRipResults()) + "\n")
self.program.saveRipResult()
# write log file
self.program.writeLog(discName, self.logger)
self.program.ejectDevice(self.device)
class CD(BaseCommand):
summary = "handle CDs"
description = "Display and rip CD-DA and metadata."
device_option = True
subcommands = {
'info': Info,
'rip': Rip
}

333
morituri/command/debug.py Normal file
View File

@@ -0,0 +1,333 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import argparse
import sys
from morituri.command.basecommand import BaseCommand
from morituri.common import cache, task
from morituri.result import result
import logging
logger = logging.getLogger(__name__)
class RCCue(BaseCommand):
summary = "write a cue file for the cached result"
description = summary
def do(self, args):
self._cache = cache.ResultCache()
try:
discid = args[0]
except IndexError:
sys.stderr.write(
'Please specify a cddb disc id\n')
return 3
persisted = self._cache.getRipResult(discid, create=False)
if not persisted:
sys.stderr.write(
'Could not find a result for cddb disc id %s\n' % discid)
return 3
sys.stdout.write(persisted.object.table.cue().encode('utf-8'))
class RCList(BaseCommand):
summary = "list cached results"
description = summary
def do(self, args):
self._cache = cache.ResultCache()
results = []
for i in self._cache.getIds():
r = self._cache.getRipResult(i, create=False)
results.append((r.object.artist, r.object.title, i))
results.sort()
for artist, title, cddbid in results:
if artist is None:
artist = '(None)'
if title is None:
title = '(None)'
sys.stdout.write('%s: %s - %s\n' % (
cddbid, artist.encode('utf-8'), title.encode('utf-8')))
class RCLog(BaseCommand):
summary = "write a log file for the cached result"
description = summary
formatter_class = argparse.ArgumentDefaultsHelpFormatter
def add_arguments(self):
loggers = result.getLoggers().keys()
self.parser.add_argument(
'-L', '--logger',
action="store", dest="logger",
default='morituri',
help="logger to use (choose from '" + "', '".join(loggers) + "')"
)
def do(self, args):
self._cache = cache.ResultCache()
persisted = self._cache.getRipResult(args[0], create=False)
if not persisted:
sys.stderr.write(
'Could not find a result for cddb disc id %s\n' % args[0])
return 3
try:
klazz = result.getLoggers()[self.options.logger]
except KeyError:
sys.stderr.write("No logger named %s found!\n" % (
self.options.logger))
return 3
logger = klazz()
sys.stdout.write(logger.log(persisted.object).encode('utf-8'))
class ResultCache(BaseCommand):
summary = "debug result cache"
description = summary
subcommands = {
'cue': RCCue,
'list': RCList,
'log': RCLog,
}
class Checksum(BaseCommand):
summary = "run a checksum task"
description = summary
def add_arguments(self):
self.parser.add_argument('files', nargs='+', action='store',
help="audio files to checksum")
def do(self):
runner = task.SyncRunner()
# here to avoid import gst eating our options
from morituri.common import checksum
for f in self.options.files:
fromPath = unicode(f)
checksumtask = checksum.CRC32Task(fromPath)
runner.run(checksumtask)
sys.stdout.write('Checksum: %08x\n' % checksumtask.checksum)
class Encode(BaseCommand):
summary = "run an encode task"
description = summary
def add_arguments(self):
# here to avoid import gst eating our options
from morituri.common import encode
default = 'flac'
# slated for deletion as flac will be the only encoder
self.parser.add_argument('--profile',
action="store",
dest="profile",
help="profile for encoding (default '%s', choices '%s')" % (
default, "', '".join(encode.ALL_PROFILES.keys())),
default=default)
self.parser.add_argument('input', action='store',
help="audio file to encode")
self.parser.add_argument('output', nargs='?', action='store',
help="output path")
def do(self):
from morituri.common import encode
profile = encode.ALL_PROFILES[self.options.profile]()
try:
fromPath = unicode(self.options.input)
except IndexError:
# unexercised after BaseCommand
sys.stdout.write('Please specify an input file.\n')
return 3
try:
toPath = unicode(self.options.output)
except IndexError:
toPath = fromPath + '.' + profile.extension
runner = task.SyncRunner()
logger.debug('Encoding %s to %s',
fromPath.encode('utf-8'),
toPath.encode('utf-8'))
encodetask = encode.EncodeTask(fromPath, toPath, profile)
runner.run(encodetask)
sys.stdout.write('Peak level: %r\n' % encodetask.peak)
sys.stdout.write('Encoded to %s\n' % toPath.encode('utf-8'))
class MaxSample(BaseCommand):
summary = "run a max sample task"
description = summary
def add_arguments(self):
self.parser.add_argument('files', nargs='+', action='store',
help="audio files to sample")
def do(self):
runner = task.SyncRunner()
# here to avoid import gst eating our options
from morituri.common import checksum
for arg in self.options.files:
fromPath = unicode(arg.decode('utf-8'))
checksumtask = checksum.MaxSampleTask(fromPath)
runner.run(checksumtask)
sys.stdout.write('%s\n' % arg)
sys.stdout.write('Biggest absolute sample: %04x\n' %
checksumtask.checksum)
class Tag(BaseCommand):
summary = "run a tag reading task"
description = summary
def add_arguments(self):
self.parser.add_argument('file', action='store',
help="audio file to tag")
def do(self):
try:
path = unicode(self.options.file)
except IndexError:
sys.stdout.write('Please specify an input file.\n')
return 3
runner = task.SyncRunner()
from morituri.common import encode
logger.debug('Reading tags from %s' % path.encode('utf-8'))
tagtask = encode.TagReadTask(path)
runner.run(tagtask)
for key in tagtask.taglist.keys():
sys.stdout.write('%s: %r\n' % (key, tagtask.taglist[key]))
class MusicBrainzNGS(BaseCommand):
summary = "examine MusicBrainz NGS info"
description = """Look up a MusicBrainz disc id and output information.
You can get the MusicBrainz disc id with rip cd info.
Example disc id: KnpGsLhvH.lPrNc1PBL21lb9Bg4-"""
def add_arguments(self):
self.parser.add_argument('mbdiscid', action='store',
help="MB disc id to look up")
def do(self):
try:
discId = unicode(self.options.mbdiscid)
except IndexError:
sys.stdout.write('Please specify a MusicBrainz disc id.\n')
return 3
from morituri.common import mbngs
metadatas = mbngs.musicbrainz(discId, record=self.options.record)
sys.stdout.write('%d releases\n' % len(metadatas))
for i, md in enumerate(metadatas):
sys.stdout.write('- Release %d:\n' % (i + 1, ))
sys.stdout.write(' Artist: %s\n' % md.artist.encode('utf-8'))
sys.stdout.write(' Title: %s\n' % md.title.encode('utf-8'))
sys.stdout.write(' Type: %s\n' % md.releaseType.encode('utf-8'))
sys.stdout.write(' URL: %s\n' % md.url)
sys.stdout.write(' Tracks: %d\n' % len(md.tracks))
if md.catalogNumber:
sys.stdout.write(' Cat no: %s\n' % md.catalogNumber)
if md.barcode:
sys.stdout.write(' Barcode: %s\n' % md.barcode)
for j, track in enumerate(md.tracks):
sys.stdout.write(' Track %2d: %s - %s\n' % (
j + 1, track.artist.encode('utf-8'),
track.title.encode('utf-8')))
class CDParanoia(BaseCommand):
summary = "show cdparanoia version"
description = summary
def do(self):
from morituri.program import cdparanoia
version = cdparanoia.getCdParanoiaVersion()
sys.stdout.write("cdparanoia version: %s\n" % version)
class CDRDAO(BaseCommand):
summary = "show cdrdao version"
description = summary
def do(self):
from morituri.program import cdrdao
version = cdrdao.getCDRDAOVersion()
sys.stdout.write("cdrdao version: %s\n" % version)
class Version(BaseCommand):
summary = "debug version getting"
description = summary
subcommands = {
'cdparanoia': CDParanoia,
'cdrdao': CDRDAO,
}
class Debug(BaseCommand):
summary = "debug internals"
description = "debug internals"
subcommands = {
'checksum': Checksum,
'encode': Encode,
'maxsample': MaxSample,
'tag': Tag,
'musicbrainzngs': MusicBrainzNGS,
'resultcache': ResultCache,
'version': Version,
}

124
morituri/command/drive.py Normal file
View File

@@ -0,0 +1,124 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import sys
from morituri.command.basecommand import BaseCommand
from morituri.common import config, drive
from morituri.extern.task import task
from morituri.program import cdparanoia
import logging
logger = logging.getLogger(__name__)
class Analyze(BaseCommand):
summary = "analyze caching behaviour of drive"
description = """Determine whether cdparanoia can defeat the audio cache of the drive."""
device_option = True
def do(self):
runner = task.SyncRunner()
t = cdparanoia.AnalyzeTask(self.options.device)
runner.run(t)
if t.defeatsCache is None:
sys.stdout.write(
'Cannot analyze the drive. Is there a CD in it?\n')
return
if not t.defeatsCache:
sys.stdout.write(
'cdparanoia cannot defeat the audio cache on this drive.\n')
else:
sys.stdout.write(
'cdparanoia can defeat the audio cache on this drive.\n')
info = drive.getDeviceInfo(self.options.device)
if not info:
sys.stdout.write('Drive caching behaviour not saved: could not get device info (requires pycdio).\n')
return
sys.stdout.write(
'Adding drive cache behaviour to configuration file.\n')
config.Config().setDefeatsCache(info[0], info[1], info[2],
t.defeatsCache)
class List(BaseCommand):
summary = "list drives"
description = """list available CD-DA drives"""
def do(self):
paths = drive.getAllDevicePaths()
self.config = config.Config()
if not paths:
sys.stdout.write('No drives found.\n')
sys.stdout.write('Create /dev/cdrom if you have a CD drive, \n')
sys.stdout.write('or install pycdio for better detection.\n')
return
try:
import cdio as _
except ImportError:
sys.stdout.write(
'Install pycdio for vendor/model/release detection.\n')
return
for path in paths:
vendor, model, release = drive.getDeviceInfo(path)
sys.stdout.write(
"drive: %s, vendor: %s, model: %s, release: %s\n" % (
path, vendor, model, release))
try:
offset = self.config.getReadOffset(
vendor, model, release)
sys.stdout.write(
" Configured read offset: %d\n" % offset)
except KeyError:
sys.stdout.write(
" No read offset found. Run 'rip offset find'\n")
try:
defeats = self.config.getDefeatsCache(
vendor, model, release)
sys.stdout.write(
" Can defeat audio cache: %s\n" % defeats)
except KeyError:
sys.stdout.write(
" Unknown whether audio cache can be defeated. "
"Run 'rip drive analyze'\n")
if not paths:
sys.stdout.write('No drives found.\n')
class Drive(BaseCommand):
summary = "handle drives"
description = """Drive utilities."""
subcommands = {
'analyze': Analyze,
'list': List
}

156
morituri/command/image.py Normal file
View File

@@ -0,0 +1,156 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
from morituri.command.basecommand import BaseCommand
from morituri.common import accurip, config, program
from morituri.extern.task import task
from morituri.image import image
from morituri.result import result
import logging
logger = logging.getLogger(__name__)
class Retag(BaseCommand):
summary = "retag image files"
description = """
Retags the image from the given .cue files with tags obtained from MusicBrainz.
"""
def add_arguments(self):
self.parser.add_argument('cuefile', nargs='+', action='store',
help="cue file to load rip image from")
self.parser.add_argument(
'-R', '--release-id',
action="store", dest="release_id",
help="MusicBrainz release id to match to (if there are multiple)"
)
self.parser.add_argument(
'-p', '--prompt',
action="store_true", dest="prompt",
help="Prompt if there are multiple matching releases"
)
self.parser.add_argument(
'-c', '--country',
action="store", dest="country",
help="Filter releases by country"
)
def do(self):
# here to avoid import gst eating our options
from morituri.common import encode
prog = program.Program(config.Config(), stdout=sys.stdout)
runner = task.SyncRunner()
for arg in self.options.cuefile:
sys.stdout.write('Retagging image %r\n' % arg)
arg = arg.decode('utf-8')
cueImage = image.Image(arg)
cueImage.setup(runner)
mbdiscid = cueImage.table.getMusicBrainzDiscId()
sys.stdout.write('MusicBrainz disc id is %s\n' % mbdiscid)
sys.stdout.write("MusicBrainz lookup URL %s\n" %
cueImage.table.getMusicBrainzSubmitURL())
prog.metadata = prog.getMusicBrainz(cueImage.table, mbdiscid,
release=self.options.release_id,
country=self.options.country,
prompt=self.options.prompt)
if not prog.metadata:
print 'Not in MusicBrainz database, skipping'
continue
prog.metadata.discid = mbdiscid
# FIXME: this feels like we're poking at internals.
prog.cuePath = arg
prog.result = result.RipResult()
for track in cueImage.table.tracks:
path = cueImage.getRealPath(track.indexes[1].path)
taglist = prog.getTagList(track.number)
logger.debug(
'possibly retagging %r from cue path %r with taglist %r',
path, arg, taglist)
t = encode.SafeRetagTask(path, taglist)
runner.run(t)
path = os.path.basename(path)
if t.changed:
print 'Retagged %s' % path
else:
print '%s already tagged correctly' % path
print
class Verify(BaseCommand):
summary = "verify image"
description = """
Verifies the image from the given .cue files against the AccurateRip database.
"""
def add_arguments(self):
self.parser.add_argument('cuefile', nargs='+', action='store',
help="cue file to load rip image from")
def do(self):
prog = program.Program(config.Config())
runner = task.SyncRunner()
cache = accurip.AccuCache()
for arg in self.options.cuefile:
arg = arg.decode('utf-8')
cueImage = image.Image(arg)
cueImage.setup(runner)
url = cueImage.table.getAccurateRipURL()
responses = cache.retrieve(url)
# FIXME: this feels like we're poking at internals.
prog.cuePath = arg
prog.result = result.RipResult()
for track in cueImage.table.tracks:
tr = result.TrackResult()
tr.number = track.number
prog.result.tracks.append(tr)
prog.verifyImage(runner, responses)
print "\n".join(prog.getAccurateRipResults()) + "\n"
class Image(BaseCommand):
summary = "handle images"
description = """
Handle disc images. Disc images are described by a .cue file.
Disc images can be encoded to another format (for example, to make a
compressed encoding), retagged and verified.
"""
subcommands = {
'verify': Verify,
'retag': Retag
}

86
morituri/command/main.py Normal file
View File

@@ -0,0 +1,86 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import sys
import pkg_resources
import musicbrainzngs
from morituri.command import cd, offset, drive, image, accurip, debug
from morituri.command.basecommand import BaseCommand
from morituri.common import common, directory
from morituri.configure import configure
from morituri.extern.task import task
import logging
logger = logging.getLogger(__name__)
def main():
# set user agent
musicbrainzngs.set_useragent("whipper", configure.version,
"https://github.com/JoeLametta/whipper")
# register plugins with pkg_resources
distributions, _ = pkg_resources.working_set.find_plugins(
pkg_resources.Environment([directory.data_path('plugins')])
)
map(pkg_resources.working_set.add, distributions)
try:
ret = Whipper(sys.argv[1:], os.path.basename(sys.argv[0]), None).do()
except SystemError, e:
sys.stderr.write('whipper: error: %s\n' % e.args)
return 255
except ImportError, e:
raise ImportError(e)
except task.TaskException, e:
if isinstance(e.exception, ImportError):
raise ImportError(e.exception)
elif isinstance(e.exception, common.MissingDependencyException):
sys.stderr.write('whipper: error: missing dependency "%s"\n' %
e.exception.dependency)
return 255
if isinstance(e.exception, common.EmptyError):
logger.debug("EmptyError: %r", str(e.exception))
sys.stderr.write('whipper: error: Could not create encoded file.\n')
return 255
# in python3 we can instead do `raise e.exception` as that would show
# the exception's original context
sys.stderr.write(e.exceptionMessage)
return 255
return ret if ret else 0
class Whipper(BaseCommand):
description = """whipper is a CD ripping utility focusing on accuracy over speed.
whipper gives you a tree of subcommands to work with.
You can get help on subcommands by using the -h option to the subcommand.
"""
no_add_help = True
subcommands = {
'accurip': accurip.AccuRip,
'cd': cd.CD,
'debug': debug.Debug,
'drive': drive.Drive,
'offset': offset.Offset,
'image': image.Image
}
def add_arguments(self):
self.parser.add_argument('-R', '--record',
action='store_true', dest='record',
help="record API requests for playback")
self.parser.add_argument('-v', '--version',
action="store_true", dest="version",
help="show version information")
self.parser.add_argument('-h', '--help',
action="store_true", dest="help",
help="show this help message and exit")
def handle_arguments(self):
if self.options.help:
self.parser.print_help()
sys.exit(0)
if self.options.version:
print "whipper %s" % configure.version
sys.exit(0)

245
morituri/command/offset.py Normal file
View File

@@ -0,0 +1,245 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of morituri.
#
# morituri is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# morituri is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with morituri. If not, see <http://www.gnu.org/licenses/>.
import argparse
import os
import sys
import tempfile
import gobject
gobject.threads_init()
from morituri.command.basecommand import BaseCommand
from morituri.common import accurip, common, config, drive, program
from morituri.common import task as ctask
from morituri.program import cdrdao, cdparanoia
from morituri.extern.task import task
import logging
logger = logging.getLogger(__name__)
# see http://www.accuraterip.com/driveoffsets.htm
# and misc/offsets.py
OFFSETS = "+6, +48, +102, +667, +12, +30, +618, +594, +738, -472, " + \
"+98, +116, +96, +733, +120, +691, +685, +97, +600, " + \
"+690, +1292, +99, +676, +686, +1182, -24, +704, +572, " + \
"+688, +91, +696, +103, -491, +689, +145, +708, +697, " + \
"+564, +86, +679, +355, -496, -1164, +1160, +694, 0, " + \
"-436, +79, +94, +684, +681, +106, +692, +943, +1194, " + \
"+92, +117, +680, +682, +1268, +678, -582, +1473, +1279, " + \
"-54, +1508, +740, +1272, +534, +976, +687, +675, +1303, " + \
"+674, +1263, +108, +974, +122, +111, -489, +772, +732, " + \
"-495, -494, +975, +935, +87, +668, +1776, +1364, +1336, " + \
"+1127"
class Find(BaseCommand):
summary = "find drive read offset"
description = """Find drive's read offset by ripping tracks from a
CD in the AccurateRip database."""
formatter_class = argparse.ArgumentDefaultsHelpFormatter
device_option = True
def add_arguments(self):
self.parser.add_argument(
'-o', '--offsets',
action="store", dest="offsets", default=OFFSETS,
help="list of offsets, comma-separated, colon-separated for ranges"
)
def handle_arguments(self):
self._offsets = []
blocks = self.options.offsets.split(',')
for b in blocks:
if ':' in b:
a, b = b.split(':')
self._offsets.extend(range(int(a), int(b) + 1))
else:
self._offsets.append(int(b))
logger.debug('Trying with offsets %r', self._offsets)
def do(self):
prog = program.Program(config.Config())
runner = ctask.SyncRunner()
device = self.options.device
# if necessary, load and unmount
sys.stdout.write('Checking device %s\n' % device)
prog.loadDevice(device)
prog.unmountDevice(device)
# first get the Table Of Contents of the CD
t = cdrdao.ReadTOCTask(device)
table = t.table
logger.debug("CDDB disc id: %r", table.getCDDBDiscId())
url = table.getAccurateRipURL()
logger.debug("AccurateRip URL: %s", url)
# FIXME: download url as a task too
responses = []
import urllib2
try:
handle = urllib2.urlopen(url)
data = handle.read()
responses = accurip.getAccurateRipResponses(data)
except urllib2.HTTPError, e:
if e.code == 404:
sys.stdout.write(
'Album not found in AccurateRip database.\n')
return 1
else:
raise
if responses:
logger.debug('%d AccurateRip responses found.' % len(responses))
if responses[0].cddbDiscId != table.getCDDBDiscId():
logger.warning("AccurateRip response discid different: %s",
responses[0].cddbDiscId)
# now rip the first track at various offsets, calculating AccurateRip
# CRC, and matching it against the retrieved ones
def match(archecksum, track, responses):
for i, r in enumerate(responses):
if archecksum == r.checksums[track - 1]:
return archecksum, i
return None, None
for offset in self._offsets:
sys.stdout.write('Trying read offset %d ...\n' % offset)
try:
archecksum = self._arcs(runner, table, 1, offset)
except task.TaskException, e:
# let MissingDependency fall through
if isinstance(e.exception,
common.MissingDependencyException):
raise e
if isinstance(e.exception, cdparanoia.FileSizeError):
sys.stdout.write(
'WARNING: cannot rip with offset %d...\n' % offset)
continue
logger.warning("Unknown task exception for offset %d: %r" % (
offset, e))
sys.stdout.write(
'WARNING: cannot rip with offset %d...\n' % offset)
continue
logger.debug('AR checksum calculated: %s' % archecksum)
c, i = match(archecksum, 1, responses)
if c:
count = 1
logger.debug('MATCHED against response %d' % i)
sys.stdout.write(
'Offset of device is likely %d, confirming ...\n' %
offset)
# now try and rip all other tracks as well, except for the
# last one (to avoid readers that can't do overread
for track in range(2, (len(table.tracks) + 1) - 1):
try:
archecksum = self._arcs(runner, table, track, offset)
except task.TaskException, e:
if isinstance(e.exception, cdparanoia.FileSizeError):
sys.stdout.write(
'WARNING: cannot rip with offset %d...\n' %
offset)
continue
c, i = match(archecksum, track, responses)
if c:
logger.debug('MATCHED track %d against response %d' % (
track, i))
count += 1
if count == len(table.tracks) - 1:
self._foundOffset(device, offset)
return 0
else:
sys.stdout.write(
'Only %d of %d tracks matched, continuing ...\n' % (
count, len(table.tracks)))
sys.stdout.write('No matching offset found.\n')
sys.stdout.write('Consider trying again with a different disc.\n')
# TODO MW: Update this further for ARv2 code
def _arcs(self, runner, table, track, offset):
# rips the track with the given offset, return the arcs checksum
logger.debug('Ripping track %r with offset %d ...', track, offset)
fd, path = tempfile.mkstemp(
suffix=u'.track%02d.offset%d.morituri.wav' % (
track, offset))
os.close(fd)
t = cdparanoia.ReadTrackTask(path, table,
table.getTrackStart(track), table.getTrackEnd(track),
overread=False, offset=offset, device=self.options.device)
t.description = 'Ripping track %d with read offset %d' % (
track, offset)
runner.run(t)
# here to avoid import gst eating our options
from morituri.common import checksum
# TODO MW: Update this to also use the v2 checksum(s)
t = checksum.FastAccurateRipChecksumTask(path, trackNumber=track,
trackCount=len(table.tracks), wave=True, v2=False)
runner.run(t)
os.unlink(path)
return "%08x" % t.checksum
def _foundOffset(self, device, offset):
sys.stdout.write('\nRead offset of device is: %d.\n' %
offset)
info = drive.getDeviceInfo(device)
if not info:
sys.stdout.write('Offset not saved: could not get device info (requires pycdio).\n')
return
sys.stdout.write('Adding read offset to configuration file.\n')
config.Config().setReadOffset(info[0], info[1], info[2],
offset)
class Offset(BaseCommand):
summary = "handle drive offsets"
description = """
Drive offset detection utility.
"""
subcommands = {
'find': Find,
}