Rename "morituri" module to "whipper".

Fixes https://github.com/JoeLametta/whipper/issues/100
This commit is contained in:
Frederik “Freso” S. Olesen
2017-04-26 16:51:11 +02:00
parent a8af9b79ab
commit ff309e468c
114 changed files with 198 additions and 198 deletions

View File

147
whipper/common/accurip.py Normal file
View File

@@ -0,0 +1,147 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import errno
import os
import struct
import urlparse
import urllib2
from whipper.common import directory
import logging
logger = logging.getLogger(__name__)
_CACHE_DIR = directory.cache_path()
class AccuCache:
def __init__(self):
if not os.path.exists(_CACHE_DIR):
logger.debug('Creating cache directory %s', _CACHE_DIR)
os.makedirs(_CACHE_DIR)
def _getPath(self, url):
# split path starts with /
return os.path.join(_CACHE_DIR, urlparse.urlparse(url)[2][1:])
def retrieve(self, url, force=False):
logger.debug("Retrieving AccurateRip URL %s", url)
path = self._getPath(url)
logger.debug("Cached path: %s", path)
if force:
logger.debug("forced to download")
self.download(url)
elif not os.path.exists(path):
logger.debug("%s does not exist, downloading", path)
self.download(url)
if not os.path.exists(path):
logger.debug("%s does not exist, not in database", path)
return None
data = self._read(url)
return getAccurateRipResponses(data)
def download(self, url):
# FIXME: download url as a task too
try:
handle = urllib2.urlopen(url)
data = handle.read()
except urllib2.HTTPError, e:
if e.code == 404:
return None
else:
raise
self._cache(url, data)
return data
def _cache(self, url, data):
path = self._getPath(url)
try:
os.makedirs(os.path.dirname(path))
except OSError, e:
logger.debug('Could not make dir %s: %r' % (
path, str(e)))
if e.errno != errno.EEXIST:
raise
handle = open(path, 'wb')
handle.write(data)
handle.close()
def _read(self, url):
logger.debug("Reading %s from cache", url)
path = self._getPath(url)
handle = open(path, 'rb')
data = handle.read()
handle.close()
return data
def getAccurateRipResponses(data):
ret = []
while data:
trackCount = struct.unpack("B", data[0])[0]
nbytes = 1 + 12 + trackCount * (1 + 8)
ret.append(AccurateRipResponse(data[:nbytes]))
data = data[nbytes:]
return ret
class AccurateRipResponse(object):
"""
I represent the response of the AccurateRip online database.
@type checksums: list of str
"""
trackCount = None
discId1 = ""
discId2 = ""
cddbDiscId = ""
confidences = None
checksums = None
def __init__(self, data):
self.trackCount = struct.unpack("B", data[0])[0]
self.discId1 = "%08x" % struct.unpack("<L", data[1:5])[0]
self.discId2 = "%08x" % struct.unpack("<L", data[5:9])[0]
self.cddbDiscId = "%08x" % struct.unpack("<L", data[9:13])[0]
self.confidences = []
self.checksums = []
pos = 13
for _ in range(self.trackCount):
confidence = struct.unpack("B", data[pos])[0]
checksum = "%08x" % struct.unpack("<L", data[pos + 1:pos + 5])[0]
pos += 9
self.confidences.append(confidence)
self.checksums.append(checksum)

229
whipper/common/cache.py Normal file
View File

@@ -0,0 +1,229 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_cache -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import glob
import tempfile
import shutil
from whipper.result import result
from whipper.common import directory
import logging
logger = logging.getLogger(__name__)
class Persister:
"""
I wrap an optional pickle to persist an object to disk.
Instantiate me with a path to automatically unpickle the object.
Call persist to store the object to disk; it will get stored if it
changed from the on-disk object.
@ivar object: the persistent object
"""
def __init__(self, path=None, default=None):
"""
If path is not given, the object will not be persisted.
This allows code to transparently deal with both persisted and
non-persisted objects, since the persist method will just end up
doing nothing.
"""
self._path = path
self.object = None
self._unpickle(default)
def persist(self, obj=None):
"""
Persist the given object, if we have a persistence path and the
object changed.
If object is not given, re-persist our object, always.
If object is given, only persist if it was changed.
"""
# don't pickle if it's already ok
if obj and obj == self.object:
return
# store the object on ourselves if not None
if obj is not None:
self.object = obj
# don't pickle if there is no path
if not self._path:
return
# default to pickling our object again
if obj is None:
obj = self.object
# pickle
self.object = obj
(fd, path) = tempfile.mkstemp(suffix='.morituri.pickle')
handle = os.fdopen(fd, 'wb')
import pickle
pickle.dump(obj, handle, 2)
handle.close()
# do an atomic move
shutil.move(path, self._path)
logger.debug('saved persisted object to %r' % self._path)
def _unpickle(self, default=None):
self.object = default
if not self._path:
return None
if not os.path.exists(self._path):
return None
handle = open(self._path)
import pickle
try:
self.object = pickle.load(handle)
logger.debug('loaded persisted object from %r' % self._path)
except:
# can fail for various reasons; in that case, pretend we didn't
# load it
pass
def delete(self):
self.object = None
os.unlink(self._path)
class PersistedCache:
"""
I wrap a directory of persisted objects.
"""
path = None
def __init__(self, path):
self.path = path
try:
os.makedirs(self.path)
except OSError, e:
if e.errno != 17: # FIXME
raise
def _getPath(self, key):
return os.path.join(self.path, '%s.pickle' % key)
def get(self, key):
"""
Returns the persister for the given key.
"""
persister = Persister(self._getPath(key))
if persister.object:
if hasattr(persister.object, 'instanceVersion'):
o = persister.object
if o.instanceVersion < o.__class__.classVersion:
logger.debug(
'key %r persisted object version %d is outdated',
key, o.instanceVersion)
persister.object = None
# FIXME: don't delete old objects atm
# persister.delete()
return persister
class ResultCache:
def __init__(self, path=None):
self._path = path or directory.cache_path('result')
self._pcache = PersistedCache(self._path)
def getRipResult(self, cddbdiscid, create=True):
"""
Retrieve the persistable RipResult either from our cache (from a
previous, possibly aborted rip), or return a new one.
@rtype: L{Persistable} for L{result.RipResult}
"""
presult = self._pcache.get(cddbdiscid)
if not presult.object:
logger.debug('result for cddbdiscid %r not in cache', cddbdiscid)
if not create:
logger.debug('returning None')
return None
logger.debug('creating result')
presult.object = result.RipResult()
presult.persist(presult.object)
else:
logger.debug('result for cddbdiscid %r found in cache, reusing',
cddbdiscid)
return presult
def getIds(self):
paths = glob.glob(os.path.join(self._path, '*.pickle'))
return [os.path.splitext(os.path.basename(path))[0] for path in paths]
class TableCache:
"""
I read and write entries to and from the cache of tables.
If no path is specified, the cache will write to the current cache
directory and read from all possible cache directories (to allow for
pre-0.2.1 cddbdiscid-keyed entries).
"""
def __init__(self, path=None):
if not path:
self._path = directory.cache_path('table')
else:
self._path = path
self._pcache = PersistedCache(self._path)
def get(self, cddbdiscid, mbdiscid):
# Before 0.2.1, we only saved by cddbdiscid, and had collisions
# mbdiscid collisions are a lot less likely
ptable = self._pcache.get('mbdiscid.' + mbdiscid)
if not ptable.object:
ptable = self._pcache.get(cddbdiscid)
if ptable.object:
if ptable.object.getMusicBrainzDiscId() != mbdiscid:
logger.debug('cached table is for different mb id %r' % (
ptable.object.getMusicBrainzDiscId()))
ptable.object = None
else:
logger.debug('no valid cached table found for %r' %
cddbdiscid)
if not ptable.object:
# get an empty persistable from the writable location
ptable = self._pcache.get('mbdiscid.' + mbdiscid)
return ptable

View File

@@ -0,0 +1,76 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_checksum -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import binascii
import wave
from whipper.extern.task import task as etask
from whipper.program.arc import accuraterip_checksum
import logging
logger = logging.getLogger(__name__)
# checksums are not CRC's. a CRC is a specific type of checksum.
class CRC32Task(etask.Task):
# TODO: Support sampleStart, sampleLength later on (should be trivial, just
# add change the read part in _crc32 to skip some samples and/or not
# read too far)
def __init__(self, path, sampleStart=0, sampleLength=-1):
self.path = path
def start(self, runner):
etask.Task.start(self, runner)
self.schedule(0.0, self._crc32)
def _crc32(self):
w = wave.open(self.path)
d = w._data_chunk.read()
self.checksum = binascii.crc32(d) & 0xffffffff
self.stop()
class FastAccurateRipChecksumTask(etask.Task):
description = 'Calculating (Fast) AccurateRip checksum'
def __init__(self, path, trackNumber, trackCount, wave, v2=False):
self.path = path
self.trackNumber = trackNumber
self.trackCount = trackCount
self._wave = wave
self._v2 = v2
self.checksum = None
def start(self, runner):
etask.Task.start(self, runner)
self.schedule(0.0, self._arc)
def _arc(self):
arc = accuraterip_checksum(self.path, self.trackNumber, self.trackCount,
self._wave, self._v2)
self.checksum = arc
self.stop()

307
whipper/common/common.py Normal file
View File

@@ -0,0 +1,307 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_common -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import os
import os.path
import commands
import math
import subprocess
from whipper.extern import asyncsub
import logging
logger = logging.getLogger(__name__)
FRAMES_PER_SECOND = 75
SAMPLES_PER_FRAME = 588 # a sample is 2 16-bit values, left and right channel
WORDS_PER_FRAME = SAMPLES_PER_FRAME * 2
BYTES_PER_FRAME = SAMPLES_PER_FRAME * 4
class EjectError(SystemError):
"""
Possibly ejects the drive in command.main.
"""
def __init__(self, device, *args):
"""
args is a tuple used by BaseException.__str__
device is the device path to eject
"""
self.args = args
self.device = device
def msfToFrames(msf):
"""
Converts a string value in MM:SS:FF to frames.
@param msf: the MM:SS:FF value to convert
@type msf: str
@rtype: int
@returns: number of frames
"""
if not ':' in msf:
return int(msf)
m, s, f = msf.split(':')
return 60 * FRAMES_PER_SECOND * int(m) \
+ FRAMES_PER_SECOND * int(s) \
+ int(f)
def framesToMSF(frames, frameDelimiter=':'):
f = frames % FRAMES_PER_SECOND
frames -= f
s = (frames / FRAMES_PER_SECOND) % 60
frames -= s * 60
m = frames / FRAMES_PER_SECOND / 60
return "%02d:%02d%s%02d" % (m, s, frameDelimiter, f)
def framesToHMSF(frames):
# cdparanoia style
f = frames % FRAMES_PER_SECOND
frames -= f
s = (frames / FRAMES_PER_SECOND) % 60
frames -= s * FRAMES_PER_SECOND
m = (frames / FRAMES_PER_SECOND / 60) % 60
frames -= m * FRAMES_PER_SECOND * 60
h = frames / FRAMES_PER_SECOND / 60 / 60
return "%02d:%02d:%02d.%02d" % (h, m, s, f)
def formatTime(seconds, fractional=3):
"""
Nicely format time in a human-readable format, like
HH:MM:SS.mmm
If fractional is zero, no seconds will be shown.
If it is greater than 0, we will show seconds and fractions of seconds.
As a side consequence, there is no way to show seconds without fractions.
@param seconds: the time in seconds to format.
@type seconds: int or float
@param fractional: how many digits to show for the fractional part of
seconds.
@type fractional: int
@rtype: string
@returns: a nicely formatted time string.
"""
chunks = []
if seconds < 0:
chunks.append(('-'))
seconds = -seconds
hour = 60 * 60
hours = seconds / hour
seconds %= hour
minute = 60
minutes = seconds / minute
seconds %= minute
chunk = '%02d:%02d' % (hours, minutes)
if fractional > 0:
chunk += ':%0*.*f' % (fractional + 3, fractional, seconds)
chunks.append(chunk)
return " ".join(chunks)
class MissingDependencyException(Exception):
dependency = None
def __init__(self, *args):
self.args = args
self.dependency = args[0]
class EmptyError(Exception):
pass
class MissingFrames(Exception):
"""
Less frames decoded than expected.
"""
pass
def shrinkPath(path):
"""
Shrink a full path to a shorter version.
Used to handle ENAMETOOLONG
"""
parts = list(os.path.split(path))
length = len(parts[-1])
target = 127
if length <= target:
target = pow(2, int(math.log(length, 2))) - 1
name, ext = os.path.splitext(parts[-1])
target -= len(ext) + 1
# split on space, then reassemble
words = name.split(' ')
length = 0
pieces = []
for word in words:
if length + 1 + len(word) <= target:
pieces.append(word)
length += 1 + len(word)
else:
break
name = " ".join(pieces)
# ext includes period
parts[-1] = u'%s%s' % (name, ext)
path = os.path.join(*parts)
return path
def getRealPath(refPath, filePath):
"""
Translate a .cue or .toc's FILE argument to an existing path.
Does Windows path translation.
Will look for the given file name, but with .flac and .wav as extensions.
@param refPath: path to the file from which the track is referenced;
for example, path to the .cue file in the same directory
@type refPath: unicode
@type filePath: unicode
"""
assert type(filePath) is unicode, "%r is not unicode" % filePath
if os.path.exists(filePath):
return filePath
candidatePaths = []
# .cue FILE statements can have Windows-style path separators, so convert
# them as one possible candidate
# on the other hand, the file may indeed contain a backslash in the name
# on linux
# FIXME: I guess we might do all possible combinations of splitting or
# keeping the slash, but let's just assume it's either Windows
# or linux
# See https://thomas.apestaart.org/morituri/trac/ticket/107
parts = filePath.split('\\')
if parts[0] == '':
parts[0] = os.path.sep
tpath = os.path.join(*parts)
for path in [filePath, tpath]:
if path == os.path.abspath(path):
candidatePaths.append(path)
else:
# if the path is relative:
# - check relatively to the cue file
# - check only the filename part relative to the cue file
candidatePaths.append(os.path.join(
os.path.dirname(refPath), path))
candidatePaths.append(os.path.join(
os.path.dirname(refPath), os.path.basename(path)))
# Now look for .wav and .flac files, as .flac files are often named .wav
for candidate in candidatePaths:
noext, _ = os.path.splitext(candidate)
for ext in ['wav', 'flac']:
cpath = '%s.%s' % (noext, ext)
if os.path.exists(cpath):
return cpath
raise KeyError("Cannot find file for %r" % filePath)
def getRelativePath(targetPath, collectionPath):
"""
Get a relative path from the directory of collectionPath to
targetPath.
Used to determine the path to use in .cue/.m3u files
"""
logger.debug('getRelativePath: target %r, collection %r' % (
targetPath, collectionPath))
targetDir = os.path.dirname(targetPath)
collectionDir = os.path.dirname(collectionPath)
if targetDir == collectionDir:
logger.debug('getRelativePath: target and collection in same dir')
return os.path.basename(targetPath)
else:
rel = os.path.relpath(
targetDir + os.path.sep,
collectionDir + os.path.sep)
logger.debug(
'getRelativePath: target and collection in different dir, %r' % rel
)
return os.path.join(rel, os.path.basename(targetPath))
class VersionGetter(object):
"""
I get the version of a program by looking for it in command output
according to a regexp.
"""
def __init__(self, dependency, args, regexp, expander):
"""
@param dependency: name of the dependency providing the program
@param args: the arguments to invoke to show the version
@type args: list of str
@param regexp: the regular expression to get the version
@param expander: the expansion string for the version using the
regexp group dict
"""
self._dep = dependency
self._args = args
self._regexp = regexp
self._expander = expander
def get(self):
version = "(Unknown)"
try:
p = asyncsub.Popen(self._args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True)
p.wait()
output = asyncsub.recv_some(p, e=0, stderr=1)
vre = self._regexp.search(output)
if vre:
version = self._expander % vre.groupdict()
except OSError, e:
import errno
if e.errno == errno.ENOENT:
raise MissingDependencyException(self._dep)
raise
return version

159
whipper/common/config.py Normal file
View File

@@ -0,0 +1,159 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_config -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import ConfigParser
import codecs
import os.path
import shutil
import tempfile
import urllib
from whipper.common import directory
import logging
logger = logging.getLogger(__name__)
class Config:
def __init__(self, path=None):
self._path = path or directory.config_path()
self._parser = ConfigParser.SafeConfigParser()
self.open()
def open(self):
# Open the file with the correct encoding
if os.path.exists(self._path):
with codecs.open(self._path, 'r', encoding='utf-8') as f:
self._parser.readfp(f)
logger.info('Loaded %d sections from config file' %
len(self._parser.sections()))
def write(self):
fd, path = tempfile.mkstemp(suffix=u'.moriturirc')
handle = os.fdopen(fd, 'w')
self._parser.write(handle)
handle.close()
shutil.move(path, self._path)
### any section
def _getter(self, suffix, section, option):
methodName = 'get' + suffix
method = getattr(self._parser, methodName)
try:
return method(section, option)
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
return None
def get(self, section, option):
return self._getter('', section, option)
def getboolean(self, section, option):
return self._getter('boolean', section, option)
### drive sections
def setReadOffset(self, vendor, model, release, offset):
"""
Set a read offset for the given drive.
Strips the given strings of leading and trailing whitespace.
"""
section = self._findOrCreateDriveSection(vendor, model, release)
self._parser.set(section, 'read_offset', str(offset))
self.write()
def getReadOffset(self, vendor, model, release):
"""
Get a read offset for the given drive.
"""
section = self._findDriveSection(vendor, model, release)
try:
return int(self._parser.get(section, 'read_offset'))
except ConfigParser.NoOptionError:
raise KeyError("Could not find read_offset for %s/%s/%s" % (
vendor, model, release))
def setDefeatsCache(self, vendor, model, release, defeat):
"""
Set whether the drive defeats the cache.
Strips the given strings of leading and trailing whitespace.
"""
section = self._findOrCreateDriveSection(vendor, model, release)
self._parser.set(section, 'defeats_cache', str(defeat))
self.write()
def getDefeatsCache(self, vendor, model, release):
section = self._findDriveSection(vendor, model, release)
try:
return self._parser.get(section, 'defeats_cache') == 'True'
except ConfigParser.NoOptionError:
raise KeyError("Could not find defeats_cache for %s/%s/%s" % (
vendor, model, release))
def _findDriveSection(self, vendor, model, release):
for name in self._parser.sections():
if not name.startswith('drive:'):
continue
logger.debug('Looking at section %r' % name)
conf = {}
for key in ['vendor', 'model', 'release']:
locals()[key] = locals()[key].strip()
conf[key] = self._parser.get(name, key)
logger.debug("%s: '%s' versus '%s'" % (
key, locals()[key], conf[key]
))
if vendor.strip() != conf['vendor']:
continue
if model.strip() != conf['model']:
continue
if release.strip() != conf['release']:
continue
return name
raise KeyError("Could not find configuration section for %s/%s/%s" % (
vendor, model, release))
def _findOrCreateDriveSection(self, vendor, model, release):
try:
section = self._findDriveSection(vendor, model, release)
except KeyError:
section = 'drive:' + urllib.quote('%s:%s:%s' % (
vendor, model, release))
self._parser.add_section(section)
__pychecker__ = 'no-local'
for key in ['vendor', 'model', 'release']:
self._parser.set(section, key, locals()[key].strip())
self.write()
return self._findDriveSection(vendor, model, release)

View File

@@ -0,0 +1,50 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_directory -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2013 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
from os import getenv, makedirs
from os.path import join, expanduser, exists
def config_path():
path = join(getenv('XDG_CONFIG_HOME') or join(expanduser('~'), u'.config'),
u'whipper')
if not exists(path):
makedirs(path)
return join(path, u'whipper.conf')
def cache_path(name=None):
path = join(getenv('XDG_CACHE_HOME') or join(expanduser('~'), u'.cache'),
u'whipper')
if name:
path = join(path, name)
if not exists(path):
makedirs(path)
return path
def data_path(name=None):
path = join(getenv('XDG_DATA_HOME')
or join(expanduser('~'), u'.local/share'),
u'whipper')
if name:
path = join(path, name)
if not exists(path):
makedirs(path)
return path

73
whipper/common/drive.py Normal file
View File

@@ -0,0 +1,73 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_drive -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
logger = logging.getLogger(__name__)
def _listify(listOrString):
if type(listOrString) == str:
return [listOrString, ]
return listOrString
def getAllDevicePaths():
try:
# see https://savannah.gnu.org/bugs/index.php?38477
return [str(dev) for dev in _getAllDevicePathsPyCdio()]
except ImportError:
logger.info('Cannot import pycdio')
return _getAllDevicePathsStatic()
def _getAllDevicePathsPyCdio():
import pycdio
import cdio
# using FS_AUDIO here only makes it list the drive when an audio cd
# is inserted
# ticket 102: this cdio call returns a list of str, or a single str
return _listify(cdio.get_devices_with_cap(pycdio.FS_MATCH_ALL, False))
def _getAllDevicePathsStatic():
ret = []
for c in ['/dev/cdrom', '/dev/cdrecorder']:
if os.path.exists(c):
ret.append(c)
return ret
def getDeviceInfo(path):
try:
import cdio
except ImportError:
return None
device = cdio.Device(path)
ok, vendor, model, release = device.get_hwinfo()
return (vendor, model, release)

89
whipper/common/encode.py Normal file
View File

@@ -0,0 +1,89 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_encode -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
from mutagen.flac import FLAC
from whipper.extern.task import task
from whipper.program import sox
from whipper.program import flac
import logging
logger = logging.getLogger(__name__)
class SoxPeakTask(task.Task):
description = 'Calculating peak level'
def __init__(self, track_path):
self.track_path = track_path
self.peak = None
def start(self, runner):
task.Task.start(self, runner)
self.schedule(0.0, self._sox_peak)
def _sox_peak(self):
self.peak = sox.peak_level(self.track_path)
self.stop()
class FlacEncodeTask(task.Task):
description = 'Encoding to FLAC'
def __init__(self, track_path, track_out_path, what="track"):
self.track_path = track_path
self.track_out_path = track_out_path
self.new_path = None
self.description = 'Encoding %s to FLAC' % what
def start(self, runner):
task.Task.start(self, runner)
self.schedule(0.0, self._flac_encode)
def _flac_encode(self):
self.new_path = flac.encode(self.track_path, self.track_out_path)
self.stop()
# TODO: Wizzup: Do we really want this as 'Task'...?
# I only made it a task for now because that it's easier to integrate in
# program/cdparanoia.py - where morituri currently does the tagging.
# We should just move the tagging to a more sensible place.
class TaggingTask(task.Task):
description = 'Writing tags to FLAC'
def __init__(self, track_path, tags):
self.track_path = track_path
self.tags = tags
def start(self, runner):
task.Task.start(self, runner)
self.schedule(0.0, self._tag)
def _tag(self):
w = FLAC(self.track_path)
for k, v in self.tags.items():
w[k] = v
w.save()
self.stop()

321
whipper/common/mbngs.py Normal file
View File

@@ -0,0 +1,321 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_mbngs -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009, 2010, 2011 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
"""
Handles communication with the musicbrainz server using NGS.
"""
import urllib2
import logging
logger = logging.getLogger(__name__)
VA_ID = "89ad4ac3-39f7-470e-963a-56509c546377" # Various Artists
class MusicBrainzException(Exception):
def __init__(self, exc):
self.args = (exc, )
self.exception = exc
class NotFoundException(MusicBrainzException):
def __str__(self):
return "Disc not found in MusicBrainz"
class TrackMetadata(object):
artist = None
title = None
duration = None # in ms
mbid = None
sortName = None
mbidArtist = None
class DiscMetadata(object):
"""
@param artist: artist(s) name
@param sortName: album artist sort name
@param release: earliest release date, in YYYY-MM-DD
@type release: unicode
@param title: title of the disc (with disambiguation)
@param releaseTitle: title of the release (without disambiguation)
@type tracks: C{list} of L{TrackMetadata}
"""
artist = None
sortName = None
title = None
various = False
tracks = None
release = None
releaseTitle = None
releaseType = None
mbid = None
mbidArtist = None
url = None
catalogNumber = None
barcode = None
def __init__(self):
self.tracks = []
def _record(record, which, name, what):
# optionally record to disc as a JSON serialization
if record:
import json
filename = 'morituri.%s.%s.json' % (which, name)
handle = open(filename, 'w')
handle.write(json.dumps(what))
handle.close()
logger.info('Wrote %s %s to %s', which, name, filename)
# credit is of the form [dict, str, dict, ... ]
# e.g. [
# {'artist': {
# 'sort-name': 'Sukilove',
# 'id': '5f4af6cf-a1b8-4e51-a811-befed399a1c6',
# 'name': 'Sukilove'
# }}, ' & ', {
# 'artist': {
# 'sort-name': 'Blackie and the Oohoos',
# 'id': '028a9dc7-f5ef-43c2-866b-08d69ffff363',
# 'name': 'Blackie & the Oohoos'}}]
# or
# [{'artist':
# {'sort-name': 'Pixies',
# 'id': 'b6b2bb8d-54a9-491f-9607-7b546023b433', 'name': 'Pixies'}}]
class _Credit(list):
"""
I am a representation of an artist-credit in musicbrainz for a disc
or track.
"""
def joiner(self, attributeGetter, joinString=None):
res = []
for item in self:
if isinstance(item, dict):
res.append(attributeGetter(item))
else:
if not joinString:
res.append(item)
else:
res.append(joinString)
return "".join(res)
def getSortName(self):
return self.joiner(lambda i: i.get('artist').get('sort-name', None))
def getName(self):
return self.joiner(lambda i: i.get('artist').get('name', None))
def getIds(self):
return self.joiner(lambda i: i.get('artist').get('id', None),
joinString=";")
def _getMetadata(releaseShort, release, discid, country=None):
"""
@type release: C{dict}
@param release: a release dict as returned in the value for key release
from get_release_by_id
@rtype: L{DiscMetadata} or None
"""
logger.debug('getMetadata for release id %r',
release['id'])
if not release['id']:
logger.warning('No id for release %r', release)
return None
assert release['id'], 'Release does not have an id'
if 'country' in release and country and release['country'] != country:
logger.warning('%r was not released in %r', release, country)
return None
discMD = DiscMetadata()
discMD.releaseType = releaseShort.get('release-group', {}).get('type')
discCredit = _Credit(release['artist-credit'])
# FIXME: is there a better way to check for VA ?
discMD.various = False
if discCredit[0]['artist']['id'] == VA_ID:
discMD.various = True
if len(discCredit) > 1:
logger.debug('artist-credit more than 1: %r', discCredit)
albumArtistName = discCredit.getName()
# getUniqueName gets disambiguating names like Muse (UK rock band)
discMD.artist = albumArtistName
discMD.sortName = discCredit.getSortName()
if 'date' not in release:
logger.warning("Release with ID '%s' (%s - %s) does not have a date",
release['id'], discMD.artist, release['title'])
else:
discMD.release = release['date']
discMD.mbid = release['id']
discMD.mbidArtist = discCredit.getIds()
discMD.url = 'https://musicbrainz.org/release/' + release['id']
discMD.barcode = release.get('barcode', None)
lil = release.get('label-info-list', [{}])
if lil:
discMD.catalogNumber = lil[0].get('catalog-number')
tainted = False
duration = 0
# only show discs from medium-list->disc-list with matching discid
for medium in release['medium-list']:
for disc in medium['disc-list']:
if disc['id'] == discid:
title = release['title']
discMD.releaseTitle = title
if 'disambiguation' in release:
title += " (%s)" % release['disambiguation']
count = len(release['medium-list'])
if count > 1:
title += ' (Disc %d of %d)' % (
int(medium['position']), count)
if 'title' in medium:
title += ": %s" % medium['title']
discMD.title = title
for t in medium['track-list']:
track = TrackMetadata()
trackCredit = _Credit(t['recording']['artist-credit'])
if len(trackCredit) > 1:
logger.debug('artist-credit more than 1: %r',
trackCredit)
# FIXME: leftover comment, need an example
# various artists discs can have tracks with no artist
track.artist = trackCredit.getName()
track.sortName = trackCredit.getSortName()
track.mbidArtist = trackCredit.getIds()
track.title = t['recording']['title']
track.mbid = t['recording']['id']
# FIXME: unit of duration ?
track.duration = int(t['recording'].get('length', 0))
if not track.duration:
logger.warning('track %r (%r) does not have duration' % (
track.title, track.mbid))
tainted = True
else:
duration += track.duration
discMD.tracks.append(track)
if not tainted:
discMD.duration = duration
else:
discMD.duration = 0
return discMD
# see http://bugs.musicbrainz.org/browser/python-musicbrainz2/trunk/examples/
# ripper.py
def musicbrainz(discid, country=None, record=False):
"""
Based on a MusicBrainz disc id, get a list of DiscMetadata objects
for the given disc id.
Example disc id: Mj48G109whzEmAbPBoGvd4KyCS4-
@type discid: str
@rtype: list of L{DiscMetadata}
"""
logger.debug('looking up results for discid %r', discid)
import musicbrainzngs
ret = []
try:
result = musicbrainzngs.get_releases_by_discid(discid,
includes=["artists", "recordings", "release-groups"])
except musicbrainzngs.ResponseError, e:
if isinstance(e.cause, urllib2.HTTPError):
if e.cause.code == 404:
raise NotFoundException(e)
else:
logger.debug('received bad response from the server')
raise MusicBrainzException(e)
# The result can either be a "disc" or a "cdstub"
if result.get('disc'):
logger.debug('found %d releases for discid %r',
len(result['disc']['release-list']), discid)
_record(record, 'releases', discid, result)
# Display the returned results to the user.
import json
for release in result['disc']['release-list']:
formatted = json.dumps(release, sort_keys=False, indent=4)
logger.debug('result %s: artist %r, title %r' % (
formatted, release['artist-credit-phrase'], release['title']))
# to get titles of recordings, we need to query the release with
# artist-credits
res = musicbrainzngs.get_release_by_id(
release['id'], includes=["artists", "artist-credits",
"recordings", "discids", "labels"])
_record(record, 'release', release['id'], res)
releaseDetail = res['release']
formatted = json.dumps(releaseDetail, sort_keys=False, indent=4)
logger.debug('release %s' % formatted)
md = _getMetadata(release, releaseDetail, discid, country)
if md:
logger.debug('duration %r', md.duration)
ret.append(md)
return ret
elif result.get('cdstub'):
logger.debug('query returned cdstub: ignored')
return None
else:
return None

68
whipper/common/path.py Normal file
View File

@@ -0,0 +1,68 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_path -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import re
class PathFilter(object):
"""
I filter path components for safe storage on file systems.
"""
def __init__(self, slashes=True, quotes=True, fat=True, special=False):
"""
@param slashes: whether to convert slashes to dashes
@param quotes: whether to normalize quotes
@param fat: whether to strip characters illegal on FAT filesystems
@param special: whether to strip special characters
"""
self._slashes = slashes
self._quotes = quotes
self._fat = fat
self._special = special
def filter(self, path):
if self._slashes:
path = re.sub(r'[/\\]', '-', path, re.UNICODE)
def separators(path):
# replace separators with a space-hyphen or hyphen
path = re.sub(r'[:]', ' -', path, re.UNICODE)
path = re.sub(r'[\|]', '-', path, re.UNICODE)
return path
# change all fancy single/double quotes to normal quotes
if self._quotes:
path = re.sub(ur'[\xc2\xb4\u2018\u2019\u201b]', "'", path,
re.UNICODE)
path = re.sub(ur'[\u201c\u201d\u201f]', '"', path, re.UNICODE)
if self._special:
path = separators(path)
path = re.sub(r'[\*\?&!\'\"\$\(\)`{}\[\]<>]', '_', path, re.UNICODE)
if self._fat:
path = separators(path)
# : and | already gone, but leave them here for reference
path = re.sub(r'[:\*\?"<>|"]', '_', path, re.UNICODE)
return path

703
whipper/common/program.py Normal file
View File

@@ -0,0 +1,703 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_program -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009, 2010, 2011 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
"""
Common functionality and class for all programs using morituri.
"""
import musicbrainzngs
import os
import sys
import time
from whipper.common import common, mbngs, cache, path
from whipper.common import checksum
from whipper.program import cdrdao, cdparanoia
from whipper.image import image
from whipper.extern.task import task
import logging
logger = logging.getLogger(__name__)
# FIXME: should Program have a runner ?
class Program:
"""
I maintain program state and functionality.
@ivar metadata:
@type metadata: L{mbngs.DiscMetadata}
@ivar result: the rip's result
@type result: L{result.RipResult}
@type outdir: unicode
@type config: L{whipper.common.config.Config}
"""
cuePath = None
logPath = None
metadata = None
outdir = None
result = None
_stdout = None
def __init__(self, config, record=False, stdout=sys.stdout):
"""
@param record: whether to record results of API calls for playback.
"""
self._record = record
self._cache = cache.ResultCache()
self._stdout = stdout
self._config = config
d = {}
for key, default in {
'fat': True,
'special': False
}.items():
value = None
value = self._config.getboolean('main', 'path_filter_'+ key)
if value is None:
value = default
d[key] = value
self._filter = path.PathFilter(**d)
def setWorkingDirectory(self, workingDirectory):
if workingDirectory:
logger.info('Changing to working directory %s' % workingDirectory)
os.chdir(workingDirectory)
def getFastToc(self, runner, toc_pickle, device):
"""
Retrieve the normal TOC table from a toc pickle or the drive.
Also retrieves the cdrdao version
@rtype: tuple of L{table.Table}, str
"""
def function(r, t):
r.run(t)
ptoc = cache.Persister(toc_pickle or None)
if not ptoc.object:
from pkg_resources import parse_version as V
version = cdrdao.getCDRDAOVersion()
if V(version) < V('1.2.3rc2'):
sys.stdout.write('Warning: cdrdao older than 1.2.3 has a '
'pre-gap length bug.\n'
'See http://sourceforge.net/tracker/?func=detail'
'&aid=604751&group_id=2171&atid=102171\n')
t = cdrdao.ReadTOCTask(device)
ptoc.persist(t.table)
toc = ptoc.object
assert toc.hasTOC()
return toc
def getTable(self, runner, cddbdiscid, mbdiscid, device, offset):
"""
Retrieve the Table either from the cache or the drive.
@rtype: L{table.Table}
"""
tcache = cache.TableCache()
ptable = tcache.get(cddbdiscid, mbdiscid)
itable = None
tdict = {}
# Ingore old cache, since we do not know what offset it used.
if type(ptable.object) is dict:
tdict = ptable.object
if offset in tdict:
itable = tdict[offset]
if not itable:
logger.debug('getTable: cddbdiscid %s, mbdiscid %s not in cache for offset %s, '
'reading table' % (
cddbdiscid, mbdiscid, offset))
t = cdrdao.ReadTableTask(device)
itable = t.table
tdict[offset] = itable
ptable.persist(tdict)
logger.debug('getTable: read table %r' % itable)
else:
logger.debug('getTable: cddbdiscid %s, mbdiscid %s in cache for offset %s' % (
cddbdiscid, mbdiscid, offset))
logger.debug('getTable: loaded table %r' % itable)
assert itable.hasTOC()
self.result.table = itable
logger.debug('getTable: returning table with mb id %s' %
itable.getMusicBrainzDiscId())
return itable
def getRipResult(self, cddbdiscid):
"""
Retrieve the persistable RipResult either from our cache (from a
previous, possibly aborted rip), or return a new one.
@rtype: L{result.RipResult}
"""
assert self.result is None
self._presult = self._cache.getRipResult(cddbdiscid)
self.result = self._presult.object
return self.result
def saveRipResult(self):
self._presult.persist()
def getPath(self, outdir, template, mbdiscid, i, disambiguate=False):
"""
Based on the template, get a complete path for the given track,
minus extension.
Also works for the disc name, using disc variables for the template.
@param outdir: the directory where to write the files
@type outdir: unicode
@param template: the template for writing the file
@type template: unicode
@param i: track number (0 for HTOA, or for disc)
@type i: int
@rtype: unicode
"""
assert type(outdir) is unicode, "%r is not unicode" % outdir
assert type(template) is unicode, "%r is not unicode" % template
# the template is similar to grip, except for %s/%S/%r/%R
# see #gripswitches
# returns without extension
v = {}
v['t'] = '%02d' % i
# default values
v['A'] = 'Unknown Artist'
v['d'] = mbdiscid # fallback for title
v['r'] = 'unknown'
v['R'] = 'Unknown'
v['B'] = '' # barcode
v['C'] = '' # catalog number
v['x'] = 'flac'
v['X'] = v['x'].upper()
v['y'] = '0000'
v['a'] = v['A']
if i == 0:
v['n'] = 'Hidden Track One Audio'
else:
v['n'] = 'Unknown Track %d' % i
if self.metadata:
release = self.metadata.release or '0000'
v['y'] = release[:4]
v['A'] = self._filter.filter(self.metadata.artist)
v['S'] = self._filter.filter(self.metadata.sortName)
v['d'] = self._filter.filter(self.metadata.title)
v['B'] = self.metadata.barcode
v['C'] = self.metadata.catalogNumber
if self.metadata.releaseType:
v['R'] = self.metadata.releaseType
v['r'] = self.metadata.releaseType.lower()
if i > 0:
try:
v['a'] = self._filter.filter(self.metadata.tracks[i - 1].artist)
v['s'] = self._filter.filter(
self.metadata.tracks[i - 1].sortName)
v['n'] = self._filter.filter(self.metadata.tracks[i - 1].title)
except IndexError, e:
print 'ERROR: no track %d found, %r' % (i, e)
raise
else:
# htoa defaults to disc's artist
v['a'] = self._filter.filter(self.metadata.artist)
# when disambiguating, use catalogNumber then barcode
if disambiguate:
templateParts = list(os.path.split(template))
if self.metadata.catalogNumber:
templateParts[-2] += ' (%s)' % self.metadata.catalogNumber
elif self.metadata.barcode:
templateParts[-2] += ' (%s)' % self.metadata.barcode
template = os.path.join(*templateParts)
logger.debug('Disambiguated template to %r' % template)
import re
template = re.sub(r'%(\w)', r'%(\1)s', template)
ret = os.path.join(outdir, template % v)
return ret
def getCDDB(self, cddbdiscid):
"""
@param cddbdiscid: list of id, tracks, offsets, seconds
@rtype: str
"""
# FIXME: convert to nonblocking?
import CDDB
try:
code, md = CDDB.query(cddbdiscid)
logger.debug('CDDB query result: %r, %r', code, md)
if code == 200:
return md['title']
except IOError, e:
# FIXME: for some reason errno is a str ?
if e.errno == 'socket error':
self._stdout.write("Warning: network error: %r\n" % (e, ))
else:
raise
return None
def getMusicBrainz(self, ittoc, mbdiscid, release=None, country=None, prompt=False):
"""
@type ittoc: L{whipper.image.table.Table}
"""
# look up disc on musicbrainz
self._stdout.write('Disc duration: %s, %d audio tracks\n' % (
common.formatTime(ittoc.duration() / 1000.0),
ittoc.getAudioTracks()))
logger.debug('MusicBrainz submit url: %r',
ittoc.getMusicBrainzSubmitURL())
ret = None
metadatas = None
e = None
for _ in range(0, 4):
try:
metadatas = mbngs.musicbrainz(mbdiscid,
country=country,
record=self._record)
break
except mbngs.NotFoundException, e:
break
except musicbrainzngs.NetworkError, e:
self._stdout.write("Warning: network error: %r\n" % (e, ))
break
except mbngs.MusicBrainzException, e:
self._stdout.write("Warning: %r\n" % (e, ))
time.sleep(5)
continue
if not metadatas:
if e:
self._stdout.write("Error: %r\n" % (e, ))
self._stdout.write('Continuing without metadata\n')
if metadatas:
deltas = {}
self._stdout.write('\nMatching releases:\n')
for metadata in metadatas:
self._stdout.write('\n')
self._stdout.write('Artist : %s\n' %
metadata.artist.encode('utf-8'))
self._stdout.write('Title : %s\n' %
metadata.title.encode('utf-8'))
self._stdout.write('Duration: %s\n' %
common.formatTime(metadata.duration / 1000.0))
self._stdout.write('URL : %s\n' % metadata.url)
self._stdout.write('Release : %s\n' % metadata.mbid)
self._stdout.write('Type : %s\n' % metadata.releaseType)
if metadata.barcode:
self._stdout.write("Barcode : %s\n" % metadata.barcode)
if metadata.catalogNumber:
self._stdout.write("Cat no : %s\n" % metadata.catalogNumber)
delta = abs(metadata.duration - ittoc.duration())
if not delta in deltas:
deltas[delta] = []
deltas[delta].append(metadata)
lowest = None
if not release and len(metadatas) > 1:
# Select the release that most closely matches the duration.
lowest = min(deltas.keys())
if prompt:
guess = (deltas[lowest])[0].mbid
release = raw_input("\nPlease select a release [%s]: " % guess)
if not release:
release = guess
if release:
metadatas = [m for m in metadatas if m.url.endswith(release)]
logger.debug('Asked for release %r, only kept %r',
release, metadatas)
if len(metadatas) == 1:
self._stdout.write('\n')
self._stdout.write('Picked requested release id %s\n' %
release)
self._stdout.write('Artist : %s\n' %
metadatas[0].artist.encode('utf-8'))
self._stdout.write('Title : %s\n' %
metadatas[0].title.encode('utf-8'))
elif not metadatas:
self._stdout.write(
"Requested release id '%s', "
"but none of the found releases match\n" % release)
return
else:
if lowest:
metadatas = deltas[lowest]
# If we have multiple, make sure they match
if len(metadatas) > 1:
artist = metadatas[0].artist
releaseTitle = metadatas[0].releaseTitle
for i, metadata in enumerate(metadatas):
if not artist == metadata.artist:
logger.warning("artist 0: %r and artist %d: %r "
"are not the same" % (
artist, i, metadata.artist))
if not releaseTitle == metadata.releaseTitle:
logger.warning("title 0: %r and title %d: %r "
"are not the same" % (
releaseTitle, i, metadata.releaseTitle))
if (not release and len(deltas.keys()) > 1):
self._stdout.write('\n')
self._stdout.write('Picked closest match in duration.\n')
self._stdout.write('Others may be wrong in musicbrainz, '
'please correct.\n')
self._stdout.write('Artist : %s\n' %
artist.encode('utf-8'))
self._stdout.write('Title : %s\n' %
metadatas[0].title.encode('utf-8'))
# Select one of the returned releases. We just pick the first one.
ret = metadatas[0]
else:
self._stdout.write(
'Submit this disc to MusicBrainz at the above URL.\n')
ret = None
self._stdout.write('\n')
return ret
def getTagList(self, number):
"""
Based on the metadata, get a dict of tags for the given track.
@param number: track number (0 for HTOA)
@type number: int
@rtype: dict
"""
trackArtist = u'Unknown Artist'
albumArtist = u'Unknown Artist'
disc = u'Unknown Disc'
title = u'Unknown Track'
if self.metadata:
trackArtist = self.metadata.artist
albumArtist = self.metadata.artist
disc = self.metadata.title
mbidAlbum = self.metadata.mbid
mbidTrackAlbum = self.metadata.mbidArtist
mbDiscId = self.metadata.discid
if number > 0:
try:
track = self.metadata.tracks[number - 1]
trackArtist = track.artist
title = track.title
mbidTrack = track.mbid
mbidTrackArtist = track.mbidArtist
except IndexError, e:
print 'ERROR: no track %d found, %r' % (number, e)
raise
else:
# htoa defaults to disc's artist
title = 'Hidden Track One Audio'
tags = {}
if self.metadata and not self.metadata.various:
tags['ALBUMARTIST'] = albumArtist
tags['ARTIST'] = trackArtist
tags['TITLE'] = title
tags['ALBUM'] = disc
tags['TRACKNUMBER'] = u'%s' % number
if self.metadata:
if self.metadata.release is not None:
tags['DATE'] = self.metadata.release
if number > 0:
tags['MUSICBRAINZ_TRACKID'] = mbidTrack
tags['MUSICBRAINZ_ARTISTID'] = mbidTrackArtist
tags['MUSICBRAINZ_ALBUMID'] = mbidAlbum
tags['MUSICBRAINZ_ALBUMARTISTID'] = mbidTrackAlbum
tags['MUSICBRAINZ_DISCID'] = mbDiscId
# TODO/FIXME: ISRC tag
return tags
def getHTOA(self):
"""
Check if we have hidden track one audio.
@returns: tuple of (start, stop), or None
"""
track = self.result.table.tracks[0]
try:
index = track.getIndex(0)
except KeyError:
return None
start = index.absolute
stop = track.getIndex(1).absolute - 1
return (start, stop)
def verifyTrack(self, runner, trackResult):
t = checksum.CRC32Task(trackResult.filename)
try:
runner.run(t)
except task.TaskException, e:
if isinstance(e.exception, common.MissingFrames):
logger.warning('missing frames for %r' % trackResult.filename)
return False
else:
raise
ret = trackResult.testcrc == t.checksum
logger.debug('verifyTrack: track result crc %r, file crc %r, result %r',
trackResult.testcrc, t.checksum, ret)
return ret
def ripTrack(self, runner, trackResult, offset, device, taglist,
overread, what=None):
"""
Ripping the track may change the track's filename as stored in
trackResult.
@param trackResult: the object to store information in.
@type trackResult: L{result.TrackResult}
"""
if trackResult.number == 0:
start, stop = self.getHTOA()
else:
start = self.result.table.getTrackStart(trackResult.number)
stop = self.result.table.getTrackEnd(trackResult.number)
dirname = os.path.dirname(trackResult.filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
if not what:
what='track %d' % (trackResult.number, )
t = cdparanoia.ReadVerifyTrackTask(trackResult.filename,
self.result.table, start, stop, overread,
offset=offset,
device=device,
taglist=taglist,
what=what)
runner.run(t)
logger.debug('ripped track')
logger.debug('test speed %.3f/%.3f seconds' % (
t.testspeed, t.testduration))
logger.debug('copy speed %.3f/%.3f seconds' % (
t.copyspeed, t.copyduration))
trackResult.testcrc = t.testchecksum
trackResult.copycrc = t.copychecksum
trackResult.peak = t.peak
trackResult.quality = t.quality
trackResult.testspeed = t.testspeed
trackResult.copyspeed = t.copyspeed
# we want rerips to add cumulatively to the time
trackResult.testduration += t.testduration
trackResult.copyduration += t.copyduration
if trackResult.filename != t.path:
trackResult.filename = t.path
logger.info('Filename changed to %r', trackResult.filename)
def retagImage(self, runner, taglists):
cueImage = image.Image(self.cuePath)
t = image.ImageRetagTask(cueImage, taglists)
runner.run(t)
def verifyImage(self, runner, responses):
"""
Verify our image against the given AccurateRip responses.
Needs an initialized self.result.
Will set accurip and friends on each TrackResult.
"""
logger.debug('verifying Image against %d AccurateRip responses',
len(responses or []))
cueImage = image.Image(self.cuePath)
verifytask = image.ImageVerifyTask(cueImage)
cuetask = image.AccurateRipChecksumTask(cueImage)
runner.run(verifytask)
runner.run(cuetask)
self._verifyImageWithChecksums(responses, cuetask.checksums)
def _verifyImageWithChecksums(self, responses, checksums):
# loop over tracks to set our calculated AccurateRip CRC's
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
trackResult.ARCRC = csum
if not responses:
logger.warning('No AccurateRip responses, cannot verify.')
return
# now loop to match responses
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
confidence = None
response = None
# match against each response's checksum for this track
for j, r in enumerate(responses):
if "%08x" % csum == r.checksums[i]:
response = r
logger.debug(
"Track %02d matched response %d of %d in "
"AccurateRip database",
i + 1, j + 1, len(responses))
trackResult.accurip = True
# FIXME: maybe checksums should be ints
trackResult.ARDBCRC = int(r.checksums[i], 16)
# arsum = csum
confidence = r.confidences[i]
trackResult.ARDBConfidence = confidence
if not trackResult.accurip:
logger.warning("Track %02d: not matched in AccurateRip database",
i + 1)
# I have seen AccurateRip responses with 0 as confidence
# for example, Best of Luke Haines, disc 1, track 1
maxConfidence = -1
maxResponse = None
for r in responses:
if r.confidences[i] > maxConfidence:
maxConfidence = r.confidences[i]
maxResponse = r
logger.debug('Track %02d: found max confidence %d' % (
i + 1, maxConfidence))
trackResult.ARDBMaxConfidence = maxConfidence
if not response:
logger.warning('Track %02d: none of the responses matched.',
i + 1)
trackResult.ARDBCRC = int(
maxResponse.checksums[i], 16)
else:
trackResult.ARDBCRC = int(response.checksums[i], 16)
# TODO MW: Update this further for ARv2 code
def getAccurateRipResults(self):
"""
@rtype: list of str
"""
res = []
# loop over tracks
for i, trackResult in enumerate(self.result.tracks):
status = 'rip NOT accurate'
if trackResult.accurip:
status = 'rip accurate '
c = "(not found) "
ar = ", DB [notfound]"
if trackResult.ARDBMaxConfidence:
c = "(max confidence %3d)" % trackResult.ARDBMaxConfidence
if trackResult.ARDBConfidence is not None:
if trackResult.ARDBConfidence \
< trackResult.ARDBMaxConfidence:
c = "(confidence %3d of %3d)" % (
trackResult.ARDBConfidence,
trackResult.ARDBMaxConfidence)
ar = ", DB [%08x]" % trackResult.ARDBCRC
# htoa tracks (i == 0) do not have an ARCRC
if trackResult.ARCRC is None:
assert trackResult.number == 0, \
'no trackResult.ARCRC on non-HTOA track %d' % \
trackResult.number
res.append("Track 0: unknown (not tracked)")
else:
res.append("Track %2d: %s %s [%08x]%s" % (
trackResult.number, status, c, trackResult.ARCRC, ar))
return res
def writeCue(self, discName):
assert self.result.table.canCue()
cuePath = '%s.cue' % discName
logger.debug('write .cue file to %s', cuePath)
handle = open(cuePath, 'w')
# FIXME: do we always want utf-8 ?
handle.write(self.result.table.cue(cuePath).encode('utf-8'))
handle.close()
self.cuePath = cuePath
return cuePath
def writeLog(self, discName, logger):
logPath = '%s.log' % discName
handle = open(logPath, 'w')
log = logger.log(self.result)
handle.write(log.encode('utf-8'))
handle.close()
self.logPath = logPath
return logPath

223
whipper/common/renamer.py Normal file
View File

@@ -0,0 +1,223 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_renamer -*-
# vi:si:et:sw=4:sts=4:ts=4
# Morituri - for those about to RIP
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
#
# whipper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# whipper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import os
import tempfile
"""
Rename files on file system and inside metafiles in a resumable way.
"""
class Operator(object):
def __init__(self, statePath, key):
self._todo = []
self._done = []
self._statePath = statePath
self._key = key
self._resuming = False
def addOperation(self, operation):
"""
Add an operation.
"""
self._todo.append(operation)
def load(self):
"""
Load state from the given state path using the given key.
Verifies the state.
"""
todo = os.path.join(self._statePath, self._key + '.todo')
lines = []
with open(todo, 'r') as handle:
for line in handle.readlines():
lines.append(line)
name, data = line.split(' ', 1)
cls = globals()[name]
operation = cls.deserialize(data)
self._todo.append(operation)
done = os.path.join(self._statePath, self._key + '.done')
if os.path.exists(done):
with open(done, 'r') as handle:
for i, line in enumerate(handle.readlines()):
assert line == lines[i], "line %s is different than %s" % (
line, lines[i])
self._done.append(self._todo[i])
# last task done is i; check if the next one might have gotten done.
self._resuming = True
def save(self):
"""
Saves the state to the given state path using the given key.
"""
# only save todo first time
todo = os.path.join(self._statePath, self._key + '.todo')
if not os.path.exists(todo):
with open(todo, 'w') as handle:
for o in self._todo:
name = o.__class__.__name__
data = o.serialize()
handle.write('%s %s\n' % (name, data))
# save done every time
done = os.path.join(self._statePath, self._key + '.done')
with open(done, 'w') as handle:
for o in self._done:
name = o.__class__.__name__
data = o.serialize()
handle.write('%s %s\n' % (name, data))
def start(self):
"""
Execute the operations
"""
def next(self):
operation = self._todo[len(self._done)]
if self._resuming:
operation.redo()
self._resuming = False
else:
operation.do()
self._done.append(operation)
self.save()
class FileRenamer(Operator):
def addRename(self, source, destination):
"""
Add a rename operation.
@param source: source filename
@type source: str
@param destination: destination filename
@type destination: str
"""
class Operation(object):
def verify(self):
"""
Check if the operation will succeed in the current conditions.
Consider this a pre-flight check.
Does not eliminate the need to handle errors as they happen.
"""
def do(self):
"""
Perform the operation.
"""
pass
def redo(self):
"""
Perform the operation, without knowing if it already has been
(partly) performed.
"""
self.do()
def serialize(self):
"""
Serialize the operation.
The return value should bu usable with L{deserialize}
@rtype: str
"""
def deserialize(cls, data):
"""
Deserialize the operation with the given operation data.
@type data: str
"""
raise NotImplementedError
deserialize = classmethod(deserialize)
class RenameFile(Operation):
def __init__(self, source, destination):
self._source = source
self._destination = destination
def verify(self):
assert os.path.exists(self._source)
assert not os.path.exists(self._destination)
def do(self):
os.rename(self._source, self._destination)
def serialize(self):
return '"%s" "%s"' % (self._source, self._destination)
def deserialize(cls, data):
_, source, __, destination, ___ = data.split('"')
return RenameFile(source, destination)
deserialize = classmethod(deserialize)
def __eq__(self, other):
return self._source == other._source \
and self._destination == other._destination
class RenameInFile(Operation):
def __init__(self, path, source, destination):
self._path = path
self._source = source
self._destination = destination
def verify(self):
assert os.path.exists(self._path)
# check if the source exists in the given file
def do(self):
with open(self._path) as handle:
(fd, name) = tempfile.mkstemp(suffix='.morituri')
for s in handle:
os.write(fd, s.replace(self._source, self._destination))
os.close(fd)
os.rename(name, self._path)
def serialize(self):
return '"%s" "%s" "%s"' % (self._path, self._source, self._destination)
def deserialize(cls, data):
_, path, __, source, ___, destination, ____ = data.split('"')
return RenameInFile(path, source, destination)
deserialize = classmethod(deserialize)
def __eq__(self, other):
return self._source == other._source \
and self._destination == other._destination \
and self._path == other._path

149
whipper/common/task.py Normal file
View File

@@ -0,0 +1,149 @@
# -*- Mode: Python -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import signal
import subprocess
from whipper.extern import asyncsub
from whipper.extern.task import task
import logging
logger = logging.getLogger(__name__)
class SyncRunner(task.SyncRunner):
pass
class LoggableTask(task.Task):
pass
class LoggableMultiSeparateTask(task.MultiSeparateTask):
pass
class PopenTask(task.Task):
"""
I am a task that runs a command using Popen.
"""
logCategory = 'PopenTask'
bufsize = 1024
command = None
cwd = None
def start(self, runner):
task.Task.start(self, runner)
try:
self._popen = asyncsub.Popen(self.command,
bufsize=self.bufsize,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, close_fds=True, cwd=self.cwd)
except OSError, e:
import errno
if e.errno == errno.ENOENT:
self.commandMissing()
raise
logger.debug('Started %r with pid %d', self.command,
self._popen.pid)
self.schedule(1.0, self._read, runner)
def _read(self, runner):
try:
read = False
ret = self._popen.recv()
if ret:
logger.debug("read from stdout: %s", ret)
self.readbytesout(ret)
read = True
ret = self._popen.recv_err()
if ret:
logger.debug("read from stderr: %s", ret)
self.readbyteserr(ret)
read = True
# if we read anything, we might have more to read, so
# reschedule immediately
if read and self.runner:
self.schedule(0.0, self._read, runner)
return
# if we didn't read anything, give the command more time to
# produce output
if self._popen.poll() is None and self.runner:
# not finished yet
self.schedule(1.0, self._read, runner)
return
self._done()
except Exception, e:
logger.debug('exception during _read(): %r', str(e))
self.setException(e)
self.stop()
def _done(self):
assert self._popen.returncode is not None, "No returncode"
if self._popen.returncode >= 0:
logger.debug('Return code was %d', self._popen.returncode)
else:
logger.debug('Terminated with signal %d',
-self._popen.returncode)
self.setProgress(1.0)
if self._popen.returncode != 0:
self.failed()
else:
self.done()
self.stop()
return
def abort(self):
logger.debug('Aborting, sending SIGTERM to %d', self._popen.pid)
os.kill(self._popen.pid, signal.SIGTERM)
# self.stop()
def readbytesout(self, bytes):
"""
Called when bytes have been read from stdout.
"""
pass
def readbyteserr(self, bytes):
"""
Called when bytes have been read from stderr.
"""
pass
def done(self):
"""
Called when the command completed successfully.
"""
pass
def failed(self):
"""
Called when the command failed.
"""
pass
def commandMissing(self):
"""
Called when the command is missing.
"""
pass