Merge pull request #187 from RecursiveForest/rewrite-accuraterip

AccurateRip V2 support
This commit is contained in:
JoeLametta
2017-09-15 23:12:40 +02:00
committed by GitHub
20 changed files with 836 additions and 864 deletions

View File

@@ -11,7 +11,7 @@ install:
- sudo apt-get -qq update
- sudo pip install --upgrade -qq pip
- sudo apt-get -qq install cdparanoia cdrdao flac libcdio-dev libiso9660-dev libsndfile1-dev python-cddb python-gobject python-musicbrainzngs python-mutagen python-setuptools sox swig
- sudo pip install pycdio
- sudo pip install pycdio requests
# Testing dependencies
- sudo apt-get -qq install python-twisted-core

View File

@@ -75,6 +75,7 @@ Whipper relies on the following packages in order to run correctly and provide a
- [python-cddb](http://cddb-py.sourceforge.net/), for showing but not using metadata if disc not available in the MusicBrainz DB
- [pycdio](https://pypi.python.org/pypi/pycdio/) (to avoid bugs please use `pycdio` **0.20** & `libcdio` >= **0.90** or, with previous `libcdio` versions, `pycdio` **0.17**), for drive identification
- Required for drive offset and caching behavior to be stored in the configuration file
- [requests](https://pypi.python.org/pypi/requests) for retrieving AccurateRip database entries
- [libsndfile](http://www.mega-nerd.com/libsndfile/), for reading wav files
- [flac](https://xiph.org/flac/), for reading flac files
- [sox](http://sox.sourceforge.net/), for track peak detection

View File

@@ -21,7 +21,7 @@
import sys
from whipper.command.basecommand import BaseCommand
from whipper.common import accurip
from whipper.common.accurip import get_db_entry, ACCURATERIP_URL
import logging
logger = logging.getLogger(__name__)
@@ -38,20 +38,18 @@ retrieves and display accuraterip data from the given URL
help="accuraterip URL to load data from")
def do(self):
url = self.options.url
cache = accurip.AccuCache()
responses = cache.retrieve(url)
responses = get_db_entry(self.options.url.lstrip(ACCURATERIP_URL))
count = responses[0].trackCount
count = responses[0].num_tracks
sys.stdout.write("Found %d responses for %d tracks\n\n" % (
len(responses), count))
for (i, r) in enumerate(responses):
if r.trackCount != count:
if r.num_tracks != count:
sys.stdout.write(
"Warning: response %d has %d tracks instead of %d\n" % (
i, r.trackCount, count))
i, r.num_tracks, count))
# checksum and confidence by track
for track in range(count):
@@ -59,11 +57,11 @@ retrieves and display accuraterip data from the given URL
checksums = {}
for (i, r) in enumerate(responses):
if r.trackCount != count:
if r.num_tracks != count:
continue
assert len(r.checksums) == r.trackCount
assert len(r.confidences) == r.trackCount
assert len(r.checksums) == r.num_tracks
assert len(r.confidences) == r.num_tracks
entry = {}
entry["confidence"] = r.confidences[track]

View File

@@ -19,16 +19,15 @@
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import argparse
import cdio
import os
import glob
import urllib2
import socket
import sys
import logging
import gobject
from whipper.command.basecommand import BaseCommand
from whipper.common import (
accurip, common, config, drive, program, task
accurip, config, drive, program, task
)
from whipper.program import cdrdao, cdparanoia, utils
from whipper.result import result
@@ -41,7 +40,7 @@ logger = logging.getLogger(__name__)
SILENT = 1e-10
MAX_TRIES = 5
DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n'
DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n.%x'
DEFAULT_DISC_TEMPLATE = u'%r/%A - %d/%A - %d'
TEMPLATE_DESCRIPTION = '''
@@ -68,12 +67,6 @@ disc and track template are:
class _CD(BaseCommand):
"""
@type program: L{program.Program}
@ivar eject: whether to eject the drive after completing
"""
eject = True
@staticmethod
@@ -150,21 +143,11 @@ class _CD(BaseCommand):
"--cdr not passed")
return -1
# FIXME ?????
# Hackish fix for broken commit
offset = 0
info = drive.getDeviceInfo(self.device)
if info:
try:
offset = self.config.getReadOffset(*info)
except KeyError:
pass
# now, read the complete index table, which is slower
self.itable = self.program.getTable(self.runner,
self.ittoc.getCDDBDiscId(),
self.ittoc.getMusicBrainzDiscId(),
self.device, offset)
self.device, self.options.offset)
assert self.itable.getCDDBDiscId() == self.ittoc.getCDDBDiscId(), \
"full table's id %s differs from toc id %s" % (
@@ -174,10 +157,10 @@ class _CD(BaseCommand):
"full table's mb id %s differs from toc id mb %s" % (
self.itable.getMusicBrainzDiscId(),
self.ittoc.getMusicBrainzDiscId())
assert self.itable.getAccurateRipURL() == \
self.ittoc.getAccurateRipURL(), \
assert self.itable.accuraterip_path() == \
self.ittoc.accuraterip_path(), \
"full table's AR URL %s differs from toc AR URL %s" % (
self.itable.getAccurateRipURL(), self.ittoc.getAccurateRipURL())
self.itable.accuraterip_url(), self.ittoc.accuraterip_url())
if self.program.metadata:
self.program.metadata.discid = self.ittoc.getMusicBrainzDiscId()
@@ -200,15 +183,9 @@ class _CD(BaseCommand):
self.program.result.title = self.program.metadata \
and self.program.metadata.title \
or 'Unknown Title'
try:
import cdio
_, self.program.result.vendor, self.program.result.model, \
self.program.result.release = \
cdio.Device(self.device).get_hwinfo()
except ImportError:
raise ImportError("Pycdio module import failed.\n"
"This is a hard dependency: if not "
"available please install it")
_, self.program.result.vendor, self.program.result.model, \
self.program.result.release = \
cdio.Device(self.device).get_hwinfo()
self.doCommand()
@@ -346,41 +323,27 @@ Log files will log the path to tracks relative to this directory.
self.program.result.overread = self.options.overread
self.program.result.logger = self.options.logger
# write disc files
disambiguate = False
while True:
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid, 0,
disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if os.path.exists(dirname):
sys.stdout.write("Output directory %s already exists\n" %
dirname.encode('utf-8'))
logs = glob.glob(os.path.join(dirname, '*.log'))
if logs:
sys.stdout.write(
"Output directory %s is a finished rip\n" %
dirname.encode('utf-8'))
if not disambiguate:
disambiguate = True
continue
return
else:
break
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid,
self.program.metadata)
dirname = os.path.dirname(discName)
if os.path.exists(dirname):
logs = glob.glob(os.path.join(dirname, '*.log'))
if logs:
msg = ("output directory %s is a finished rip" %
dirname.encode('utf-8'))
logger.critical(msg)
raise RuntimeError(msg)
else:
sys.stdout.write("Creating output directory %s\n" %
sys.stdout.write("output directory %s already exists\n" %
dirname.encode('utf-8'))
os.makedirs(dirname)
break
# FIXME: say when we're continuing a rip
# FIXME: disambiguate if the pre-existing rip is different
print("creating output directory %s" % dirname.encode('utf-8'))
os.makedirs(dirname)
# FIXME: turn this into a method
def ripIfNotRipped(number):
def _ripIfNotRipped(number):
logger.debug('ripIfNotRipped for track %d' % number)
# we can have a previous result
trackResult = self.program.result.getTrackResult(number)
@@ -393,9 +356,9 @@ Log files will log the path to tracks relative to this directory.
path = self.program.getPath(self.program.outdir,
self.options.track_template,
self.mbdiscid, number,
disambiguate=disambiguate) \
+ '.' + 'flac'
self.mbdiscid,
self.program.metadata,
track_number=number)
logger.debug('ripIfNotRipped: path %r' % path)
trackResult.number = number
@@ -462,13 +425,11 @@ Log files will log the path to tracks relative to this directory.
"track can't be ripped. "
"Rip attempts number is equal to 'MAX_TRIES'")
if trackResult.testcrc == trackResult.copycrc:
sys.stdout.write('Checksums match for track %d\n' %
number)
sys.stdout.write('CRCs match for track %d\n' % number)
else:
sys.stdout.write(
'ERROR: checksums did not match for track %d\n' %
number)
raise
raise RuntimeError(
"CRCs did not match for track %d\n" % number
)
sys.stdout.write(
'Peak level: {:.2%} \n'.format(trackResult.peak))
@@ -501,113 +462,37 @@ Log files will log the path to tracks relative to this directory.
self.program.saveRipResult()
# check for hidden track one audio
htoapath = None
htoa = self.program.getHTOA()
if htoa:
start, stop = htoa
sys.stdout.write(
'Found Hidden Track One Audio from frame %d to %d\n' % (
start, stop))
# rip it
ripIfNotRipped(0)
htoapath = self.program.result.tracks[0].filename
print('found Hidden Track One Audio from frame %d to %d' % (
start, stop))
_ripIfNotRipped(0)
for i, track in enumerate(self.itable.tracks):
# FIXME: rip data tracks differently
if not track.audio:
sys.stdout.write(
'WARNING: skipping data track %d, not implemented\n' % (
i + 1, ))
print 'skipping data track %d, not implemented' % (i + 1)
# FIXME: make it work for now
track.indexes[1].relative = 0
continue
ripIfNotRipped(i + 1)
# write disc files
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid, 0,
disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if not os.path.exists(dirname):
os.makedirs(dirname)
_ripIfNotRipped(i + 1)
logger.debug('writing cue file for %r', discName)
self.program.writeCue(discName)
# write .m3u file
logger.debug('writing m3u file for %r', discName)
m3uPath = u'%s.m3u' % discName
handle = open(m3uPath, 'w')
u = u'#EXTM3U\n'
handle.write(u.encode('utf-8'))
self.program.write_m3u(discName)
def writeFile(handle, path, length):
targetPath = common.getRelativePath(path, m3uPath)
u = u'#EXTINF:%d,%s\n' % (length, targetPath)
handle.write(u.encode('utf-8'))
u = '%s\n' % targetPath
handle.write(u.encode('utf-8'))
if htoapath:
writeFile(handle, htoapath,
self.itable.getTrackStart(1) / common.FRAMES_PER_SECOND)
for i, track in enumerate(self.itable.tracks):
if not track.audio:
continue
path = self.program.getPath(self.program.outdir,
self.options.track_template,
self.mbdiscid, i + 1,
disambiguate=disambiguate
) + '.' + 'flac'
writeFile(handle, path,
(self.itable.getTrackLength(i + 1) /
common.FRAMES_PER_SECOND))
handle.close()
# verify using accuraterip
url = self.ittoc.getAccurateRipURL()
sys.stdout.write("AccurateRip URL %s\n" % url)
accucache = accurip.AccuCache()
try:
responses = accucache.retrieve(url)
except urllib2.URLError, e:
if isinstance(e.args[0], socket.gaierror):
if e.args[0].errno == -2:
sys.stdout.write("Warning: network error: %r\n" % (
e.args[0], ))
responses = None
else:
raise
else:
raise
self.program.verifyImage(self.runner, self.ittoc)
except accurip.EntryNotFound:
print('AccurateRip entry not found')
if not responses:
sys.stdout.write('Album not found in AccurateRip database\n')
if responses:
sys.stdout.write('%d AccurateRip reponses found\n' %
len(responses))
if responses[0].cddbDiscId != self.itable.getCDDBDiscId():
sys.stdout.write(
"AccurateRip response discid different: %s\n" %
responses[0].cddbDiscId)
self.program.verifyImage(self.runner, responses)
sys.stdout.write("\n".join(
self.program.getAccurateRipResults()) + "\n")
accurip.print_report(self.program.result)
self.program.saveRipResult()
# write log file
self.program.writeLog(discName, self.logger)

View File

@@ -117,16 +117,12 @@ Verifies the image from the given .cue files against the AccurateRip database.
def do(self):
prog = program.Program(config.Config())
runner = task.SyncRunner()
cache = accurip.AccuCache()
for arg in self.options.cuefile:
arg = arg.decode('utf-8')
cueImage = image.Image(arg)
cueImage.setup(runner)
url = cueImage.table.getAccurateRipURL()
responses = cache.retrieve(url)
# FIXME: this feels like we're poking at internals.
prog.cuePath = arg
prog.result = result.RipResult()
@@ -135,9 +131,14 @@ Verifies the image from the given .cue files against the AccurateRip database.
tr.number = track.number
prog.result.tracks.append(tr)
prog.verifyImage(runner, responses)
print "\n".join(prog.getAccurateRipResults()) + "\n"
verified = False
try:
verified = prog.verifyImage(runner, cueImage.table)
except accurip.EntryNotFound:
print('AccurateRip entry not found')
accurip.print_report(prog.result)
if not verified:
sys.exit(1)
class Image(BaseCommand):

View File

@@ -36,8 +36,13 @@ def main():
cmd.options.eject in ('failure', 'always')):
eject_device(e.device)
return 255
except RuntimeError, e:
print(e)
return 1
except KeyboardInterrupt:
return 2
except ImportError, e:
raise ImportError(e)
raise
except task.TaskException, e:
if isinstance(e.exception, ImportError):
raise ImportError(e.exception)

View File

@@ -27,8 +27,7 @@ import gobject
from whipper.command.basecommand import BaseCommand
from whipper.common import accurip, common, config, drive
from whipper.common import task as ctask
from whipper.program import cdrdao, cdparanoia, utils
from whipper.common import checksum
from whipper.program import arc, cdrdao, cdparanoia, utils
from whipper.extern.task import task
gobject.threads_init()
@@ -92,27 +91,14 @@ CD in the AccurateRip database."""
table = t.table
logger.debug("CDDB disc id: %r", table.getCDDBDiscId())
url = table.getAccurateRipURL()
logger.debug("AccurateRip URL: %s", url)
# FIXME: download url as a task too
responses = []
import urllib2
responses = None
try:
handle = urllib2.urlopen(url)
data = handle.read()
responses = accurip.getAccurateRipResponses(data)
except urllib2.HTTPError, e:
if e.code == 404:
sys.stdout.write(
'Album not found in AccurateRip database.\n')
return 1
else:
raise
responses = accurip.get_db_entry(table.accuraterip_path())
except accurip.EntryNotFound:
print('Accuraterip entry not found')
if responses:
logger.debug('%d AccurateRip responses found.' % len(responses))
if responses[0].cddbDiscId != table.getCDDBDiscId():
logger.warning("AccurateRip response discid different: %s",
responses[0].cddbDiscId)
@@ -120,17 +106,19 @@ CD in the AccurateRip database."""
# now rip the first track at various offsets, calculating AccurateRip
# CRC, and matching it against the retrieved ones
def match(archecksum, track, responses):
# archecksums is a tuple of accuraterip checksums: (v1, v2)
def match(archecksums, track, responses):
for i, r in enumerate(responses):
if archecksum == r.checksums[track - 1]:
return archecksum, i
for checksum in archecksums:
if checksum == r.checksums[track - 1]:
return checksum, i
return None, None
for offset in self._offsets:
sys.stdout.write('Trying read offset %d ...\n' % offset)
try:
archecksum = self._arcs(runner, table, 1, offset)
archecksums = self._arcs(runner, table, 1, offset)
except task.TaskException, e:
# let MissingDependency fall through
@@ -149,9 +137,9 @@ CD in the AccurateRip database."""
'WARNING: cannot rip with offset %d...\n' % offset)
continue
logger.debug('AR checksum calculated: %s' % archecksum)
logger.debug('AR checksums calculated: %s %s' % archecksums)
c, i = match(archecksum, 1, responses)
c, i = match(archecksums, 1, responses)
if c:
count = 1
logger.debug('MATCHED against response %d' % i)
@@ -163,7 +151,7 @@ CD in the AccurateRip database."""
# last one (to avoid readers that can't do overread
for track in range(2, (len(table.tracks) + 1) - 1):
try:
archecksum = self._arcs(runner, table, track, offset)
archecksums = self._arcs(runner, table, track, offset)
except task.TaskException, e:
if isinstance(e.exception, cdparanoia.FileSizeError):
sys.stdout.write(
@@ -171,7 +159,7 @@ CD in the AccurateRip database."""
offset)
continue
c, i = match(archecksum, track, responses)
c, i = match(archecksums, track, responses)
if c:
logger.debug('MATCHED track %d against response %d' % (
track, i))
@@ -188,9 +176,8 @@ CD in the AccurateRip database."""
sys.stdout.write('No matching offset found.\n')
sys.stdout.write('Consider trying again with a different disc.\n')
# TODO MW: Update this further for ARv2 code
def _arcs(self, runner, table, track, offset):
# rips the track with the given offset, return the arcs checksum
# rips the track with the given offset, return the arcs checksums
logger.debug('Ripping track %r with offset %d ...', track, offset)
fd, path = tempfile.mkstemp(
@@ -207,15 +194,15 @@ CD in the AccurateRip database."""
track, offset)
runner.run(t)
# TODO MW: Update this to also use the v2 checksum(s)
t = checksum.FastAccurateRipChecksumTask(path,
trackNumber=track,
trackCount=len(table.tracks),
wave=True, v2=False)
runner.run(t)
v1 = arc.accuraterip_checksum(
path, track, len(table.tracks), wave=True, v2=False
)
v2 = arc.accuraterip_checksum(
path, track, len(table.tracks), wave=True, v2=True
)
os.unlink(path)
return "%08x" % t.checksum
return ("%08x" % v1, "%08x" % v2)
def _foundOffset(self, device, offset):
sys.stdout.write('\nRead offset of device is: %d.\n' %

View File

@@ -1,6 +1,7 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
# Copyright (C) 2017 Samantha Baldwin
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
@@ -18,128 +19,261 @@
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import errno
import os
import requests
import struct
import urlparse
import urllib2
from errno import EEXIST
from os import makedirs
from os.path import dirname, exists, join
from whipper.common import directory
from whipper.program.arc import accuraterip_checksum
import logging
logger = logging.getLogger(__name__)
_CACHE_DIR = directory.cache_path()
ACCURATERIP_URL = "http://www.accuraterip.com/accuraterip/"
_CACHE_DIR = join(directory.cache_path(), 'accurip')
class AccuCache:
def __init__(self):
if not os.path.exists(_CACHE_DIR):
logger.debug('Creating cache directory %s', _CACHE_DIR)
os.makedirs(_CACHE_DIR)
def _getPath(self, url):
# split path starts with /
return os.path.join(_CACHE_DIR, urlparse.urlparse(url)[2][1:])
def retrieve(self, url, force=False):
logger.debug("Retrieving AccurateRip URL %s", url)
path = self._getPath(url)
logger.debug("Cached path: %s", path)
if force:
logger.debug("forced to download")
self.download(url)
elif not os.path.exists(path):
logger.debug("%s does not exist, downloading", path)
self.download(url)
if not os.path.exists(path):
logger.debug("%s does not exist, not in database", path)
return None
data = self._read(url)
return getAccurateRipResponses(data)
def download(self, url):
# FIXME: download url as a task too
try:
handle = urllib2.urlopen(url)
data = handle.read()
except urllib2.HTTPError, e:
if e.code == 404:
return None
else:
raise
self._cache(url, data)
return data
def _cache(self, url, data):
path = self._getPath(url)
try:
os.makedirs(os.path.dirname(path))
except OSError, e:
logger.debug('Could not make dir %s: %r' % (
path, str(e)))
if e.errno != errno.EEXIST:
raise
handle = open(path, 'wb')
handle.write(data)
handle.close()
def _read(self, url):
logger.debug("Reading %s from cache", url)
path = self._getPath(url)
handle = open(path, 'rb')
data = handle.read()
handle.close()
return data
class EntryNotFound(Exception):
pass
def getAccurateRipResponses(data):
ret = []
while data:
trackCount = struct.unpack("B", data[0])[0]
nbytes = 1 + 12 + trackCount * (1 + 8)
ret.append(AccurateRipResponse(data[:nbytes]))
data = data[nbytes:]
return ret
class AccurateRipResponse(object):
class _AccurateRipResponse(object):
"""
I represent the response of the AccurateRip online database.
An AccurateRip response contains a collection of metadata identifying a
particular digital audio compact disc.
@type checksums: list of str
For disc level metadata it contains the track count, two internal disc
IDs, and the CDDB disc ID.
A checksum and a confidence score is stored sequentially for each track in
the disc index, which excludes any audio hidden in track pre-gaps (such as
HTOA).
The response is stored as a packed binary structure.
"""
trackCount = None
discId1 = ""
discId2 = ""
cddbDiscId = ""
confidences = None
checksums = None
def __init__(self, data):
self.trackCount = struct.unpack("B", data[0])[0]
"""
The checksums and confidences arrays are indexed by relative track
position, so track 1 will have array index 0, track 2 will have array
index 1, and so forth. HTOA and other hidden tracks are not included.
"""
self.num_tracks = struct.unpack("B", data[0])[0]
self.discId1 = "%08x" % struct.unpack("<L", data[1:5])[0]
self.discId2 = "%08x" % struct.unpack("<L", data[5:9])[0]
self.cddbDiscId = "%08x" % struct.unpack("<L", data[9:13])[0]
self.confidences = []
self.checksums = []
pos = 13
for _ in range(self.trackCount):
for _ in range(self.num_tracks):
confidence = struct.unpack("B", data[pos])[0]
checksum = "%08x" % struct.unpack("<L", data[pos + 1:pos + 5])[0]
pos += 9
self.confidences.append(confidence)
self.checksums.append(checksum)
pos += 9
def __eq__(self, other):
return [
self.num_tracks, self.discId1, self.discId2, self.cddbDiscId,
self.confidences, self.checksums
] == [
other.num_tracks, other.discId1, other.discId2, other.cddbDiscId,
other.confidences, other.checksums
]
def _split_responses(raw_entry):
responses = []
while raw_entry:
track_count = struct.unpack("B", raw_entry[0])[0]
nbytes = 1 + 12 + track_count * (1 + 8)
responses.append(_AccurateRipResponse(raw_entry[:nbytes]))
raw_entry = raw_entry[nbytes:]
return responses
def calculate_checksums(track_paths):
"""
Return ARv1 and ARv2 checksums as two arrays of character strings in a
dictionary: {'v1': ['deadbeef', ...], 'v2': [...]}
Return None instead of checksum string for unchecksummable tracks.
HTOA checksums are not included in the database and are not calculated.
"""
track_count = len(track_paths)
v1_checksums = []
v2_checksums = []
logger.debug('checksumming %d tracks' % track_count)
# This is done sequentially because it is very fast.
for i, path in enumerate(track_paths):
v1_sum = accuraterip_checksum(
path, i+1, track_count, wave=True, v2=False
)
if not v1_sum:
logger.error(
'could not calculate AccurateRip v1 checksum for track %d %r' %
(i+1, path)
)
v1_checksums.append(None)
else:
v1_checksums.append("%08x" % v1_sum)
v2_sum = accuraterip_checksum(
path, i+1, track_count, wave=True, v2=True
)
if not v2_sum:
logger.error(
'could not calculate AccurateRip v2 checksum for track %d %r' %
(i+1, path)
)
v2_checksums.append(None)
else:
v2_checksums.append("%08x" % v2_sum)
return {'v1': v1_checksums, 'v2': v2_checksums}
def _download_entry(path):
url = ACCURATERIP_URL + path
logger.debug('downloading AccurateRip entry from %s', url)
try:
resp = requests.get(url)
except requests.exceptions.ConnectionError as e:
logger.error('error retrieving AccurateRip entry: %r' % e)
return None
if not resp.ok:
logger.error('error retrieving AccurateRip entry: %s %s %r' % (
resp.status_code, resp.reason, resp
))
return None
return resp.content
def _save_entry(raw_entry, path):
logger.debug('saving AccurateRip entry to %s', path)
# XXX: os.makedirs(exist_ok=True) in py3
try:
makedirs(dirname(path))
except OSError, e:
if e.errno != EEXIST:
logger.error('could not save entry to %s: %r' % (path, str(e)))
return
open(path, 'wb').write(raw_entry)
def get_db_entry(path):
"""
Retrieve cached AccurateRip disc entry as array of _AccurateRipResponses.
Downloads entry from accuraterip.com on cache fault.
`path' is in the format of the output of table.accuraterip_path().
"""
cached_path = join(_CACHE_DIR, path)
if exists(cached_path):
logger.debug('found accuraterip entry at %s', cached_path)
raw_entry = open(cached_path, 'rb').read()
else:
raw_entry = _download_entry(path)
if raw_entry:
_save_entry(raw_entry, cached_path)
if not raw_entry:
logger.warning('entry not found in AccurateRip database')
raise EntryNotFound
return _split_responses(raw_entry)
def _assign_checksums_and_confidences(tracks, checksums, responses):
for i, track in enumerate(tracks):
for v in ('v1', 'v2'):
track.AR[v]['CRC'] = checksums[v][i]
track.AR['DBMaxConfidence'], track.AR['DBMaxConfidenceCRC'] = max(
[(r.confidences[i], r.checksums[i]) for r in responses],
key=lambda t: t[0]
)
def _match_responses(tracks, responses):
"""
Match and save track accuraterip response checksums against
all non-hidden tracks.
Returns True if every track has a match for every entry for either
AccurateRip version.
"""
for r in responses:
for i, track in enumerate(tracks):
for v in ('v1', 'v2'):
if track.AR[v]['CRC'] == r.checksums[i]:
if r.confidences[i] > track.AR[v]['DBConfidence']:
track.AR[v]['DBCRC'] = r.checksums[i]
track.AR[v]['DBConfidence'] = r.confidences[i]
logger.debug(
'track %d matched response %s in AccurateRip'
' database: %s crc %s confidence %s' %
(i, r.cddbDiscId, v, track.AR[v]['DBCRC'],
track.AR[v]['DBConfidence'])
)
return any((
all([t.AR['v1']['DBCRC'] for t in tracks]),
all([t.AR['v2']['DBCRC'] for t in tracks])
))
def verify_result(result, responses, checksums):
"""
Verify track AccurateRip checksums against database responses.
Stores track checksums and database values on result.
"""
if not (result and responses and checksums):
return False
# exclude HTOA from AccurateRip verification
# NOTE: if pre-gap hidden audio support is expanded to include
# tracks other than HTOA, this is invalid.
tracks = filter(lambda t: t.number != 0, result.tracks)
if not tracks:
return False
_assign_checksums_and_confidences(tracks, checksums, responses)
return _match_responses(tracks, responses)
def print_report(result):
"""
Print AccurateRip verification results to stdout.
"""
for i, track in enumerate(result.tracks):
status = 'rip NOT accurate'
conf = '(not found)'
db = 'notfound'
if track.AR['DBMaxConfidence'] is not None:
db = track.AR['DBMaxConfidenceCRC']
conf = '(max confidence %3d)' % track.AR['DBMaxConfidence']
if track.AR['v1']['DBCRC'] or track.AR['v2']['DBCRC']:
status = 'rip accurate'
db = ', '.join(filter(None, (
track.AR['v1']['DBCRC'],
track.AR['v2']['DBCRC']
)))
max_conf = max(
[track.AR[v]['DBConfidence'] for v in ('v1', 'v2')]
)
if max_conf:
if max_conf < track.AR['DBMaxConfidence']:
conf = '(confidence %3d of %3d)' % (
max_conf, track.AR['DBMaxConfidence']
)
# htoa tracks (i == 0) do not have an ARCRC
if track.number == 0:
print('track 0: unknown (not tracked)')
continue
if not (track.AR['v1']['CRC'] or track.AR['v2']['CRC']):
logger.error(
'no track AR CRC on non-HTOA track %d' % track.number
)
print('track %2d: unknown (error)' % track.number)
else:
print('track %2d: %-16s %-23s v1 [%s], v2 [%s], DB [%s]' % (
track.number, status, conf,
track.AR['v1']['CRC'], track.AR['v2']['CRC'], db
))

View File

@@ -24,8 +24,6 @@ import wave
from whipper.extern.task import task as etask
from whipper.program.arc import accuraterip_checksum
import logging
logger = logging.getLogger(__name__)
@@ -49,27 +47,3 @@ class CRC32Task(etask.Task):
self.checksum = binascii.crc32(d) & 0xffffffff
self.stop()
class FastAccurateRipChecksumTask(etask.Task):
description = 'Calculating (Fast) AccurateRip checksum'
def __init__(self, path, trackNumber, trackCount, wave, v2=False):
self.path = path
self.trackNumber = trackNumber
self.trackCount = trackCount
self._wave = wave
self._v2 = v2
self.checksum = None
def start(self, runner):
etask.Task.start(self, runner)
self.schedule(0.0, self._arc)
def _arc(self):
arc = accuraterip_checksum(self.path, self.trackNumber,
self.trackCount,
self._wave, self._v2)
self.checksum = arc
self.stop()

View File

@@ -23,12 +23,12 @@ Common functionality and class for all programs using whipper.
"""
import musicbrainzngs
import re
import os
import sys
import time
from whipper.common import common, mbngs, cache, path
from whipper.common import checksum
from whipper.common import accurip, cache, checksum, common, mbngs, path
from whipper.program import cdrdao, cdparanoia
from whipper.image import image
from whipper.extern.task import task
@@ -178,34 +178,34 @@ class Program:
template_part += ' (%s)' % metadata.barcode
return template_part
def getPath(self, outdir, template, mbdiscid, i, disambiguate=False):
def getPath(self, outdir, template, mbdiscid, metadata, track_number=None):
"""
Based on the template, get a complete path for the given track,
minus extension.
Also works for the disc name, using disc variables for the template.
Return disc or track path relative to outdir according to
template. Track paths do not include extension.
@param outdir: the directory where to write the files
@type outdir: unicode
@param template: the template for writing the file
@type template: unicode
@param i: track number (0 for HTOA, or for disc)
@type i: int
Tracks are named according to the track template, filling in
the variables and adding the file extension. Variables
exclusive to the track template are:
- %t: track number
- %a: track artist
- %n: track title
- %s: track sort name
@rtype: unicode
Disc files (.cue, .log, .m3u) are named according to the disc
template, filling in the variables and adding the file
extension. Variables for both disc and track template are:
- %A: album artist
- %S: album sort name
- %d: disc title
- %y: release year
- %r: release type, lowercase
- %R: Release type, normal case
- %x: audio extension, lowercase
- %X: audio extension, uppercase
"""
assert type(outdir) is unicode, "%r is not unicode" % outdir
assert type(template) is unicode, "%r is not unicode" % template
# the template is similar to grip, except for %s/%S/%r/%R
# see #gripswitches
# returns without extension
v = {}
v['t'] = '%02d' % i
# default values
v['A'] = 'Unknown Artist'
v['d'] = mbdiscid # fallback for title
v['r'] = 'unknown'
@@ -215,59 +215,38 @@ class Program:
v['x'] = 'flac'
v['X'] = v['x'].upper()
v['y'] = '0000'
if track_number is not None:
v['a'] = v['A']
v['t'] = '%02d' % track_number
if track_number == 0:
v['n'] = 'Hidden Track One Audio'
else:
v['n'] = 'Unknown Track %d' % track_number
v['a'] = v['A']
if i == 0:
v['n'] = 'Hidden Track One Audio'
else:
v['n'] = 'Unknown Track %d' % i
if self.metadata:
release = self.metadata.release or '0000'
if metadata:
release = metadata.release or '0000'
v['y'] = release[:4]
v['A'] = self._filter.filter(self.metadata.artist)
v['S'] = self._filter.filter(self.metadata.sortName)
v['d'] = self._filter.filter(self.metadata.title)
v['B'] = self.metadata.barcode
v['C'] = self.metadata.catalogNumber
if self.metadata.releaseType:
v['R'] = self.metadata.releaseType
v['r'] = self.metadata.releaseType.lower()
if i > 0:
try:
v['a'] = self._filter.filter(
self.metadata.tracks[i - 1].artist)
v['s'] = self._filter.filter(
self.metadata.tracks[i - 1].sortName)
v['n'] = self._filter.filter(
self.metadata.tracks[i - 1].title)
except IndexError, e:
print 'ERROR: no track %d found, %r' % (i, e)
raise
else:
v['A'] = self._filter.filter(metadata.artist)
v['S'] = self._filter.filter(metadata.sortName)
v['d'] = self._filter.filter(metadata.title)
v['B'] = metadata.barcode
v['C'] = metadata.catalogNumber
if metadata.releaseType:
v['R'] = metadata.releaseType
v['r'] = metadata.releaseType.lower()
if track_number > 0:
v['a'] = self._filter.filter(
metadata.tracks[track_number - 1].artist)
v['s'] = self._filter.filter(
metadata.tracks[track_number - 1].sortName)
v['n'] = self._filter.filter(
metadata.tracks[track_number - 1].title)
elif track_number == 0:
# htoa defaults to disc's artist
v['a'] = self._filter.filter(self.metadata.artist)
v['a'] = self._filter.filter(metadata.artist)
# when disambiguating, use catalogNumber then barcode
if disambiguate:
templateParts = template.split(os.sep)
# Find the section of the template with the release name
for i, part in enumerate(templateParts):
if "%d" in part:
templateParts[i] = self.addDisambiguation(part, self.metadata) # noqa: E501
break
else:
# No parts of the template contain the release
templateParts[-1] = self.addDisambiguation(templateParts[-1], self.metadata) # noqa: E501
template = os.path.join(*templateParts)
logger.debug('Disambiguated template to %r' % template)
import re
template = re.sub(r'%(\w)', r'%(\1)s', template)
ret = os.path.join(outdir, template % v)
return ret
return os.path.join(outdir, template % v)
def getCDDB(self, cddbdiscid):
"""
@@ -579,118 +558,55 @@ class Program:
t = image.ImageRetagTask(cueImage, taglists)
runner.run(t)
def verifyImage(self, runner, responses):
def verifyImage(self, runner, table):
"""
verify table against accuraterip and cue_path track lengths
Verify our image against the given AccurateRip responses.
Needs an initialized self.result.
Will set accurip and friends on each TrackResult.
Populates self.result.tracks with above TrackResults.
"""
logger.debug('verifying Image against %d AccurateRip responses',
len(responses or []))
cueImage = image.Image(self.cuePath)
# assigns track lengths
verifytask = image.ImageVerifyTask(cueImage)
cuetask = image.AccurateRipChecksumTask(cueImage)
runner.run(verifytask)
runner.run(cuetask)
if verifytask.exception:
logger.error(verifytask.exceptionMessage)
return False
self._verifyImageWithChecksums(responses, cuetask.checksums)
responses = accurip.get_db_entry(table.accuraterip_path())
logger.info('%d AccurateRip response(s) found' % len(responses))
def _verifyImageWithChecksums(self, responses, checksums):
# loop over tracks to set our calculated AccurateRip CRC's
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
trackResult.ARCRC = csum
checksums = accurip.calculate_checksums([
os.path.join(os.path.dirname(self.cuePath), t.indexes[1].path)
for t in filter(lambda t: t.number != 0, cueImage.cue.table.tracks)
])
if not (checksums and any(checksums['v1']) and any(checksums['v2'])):
return False
return accurip.verify_result(self.result, responses, checksums)
if not responses:
logger.warning('No AccurateRip responses, cannot verify.')
return
def write_m3u(self, discname):
m3uPath = u'%s.m3u' % discname
with open(m3uPath, 'w') as f:
f.write(u'#EXTM3U\n'.encode('utf-8'))
for track in self.result.tracks:
if not track.filename:
# false positive htoa
continue
if track.number == 0:
length = (self.result.table.getTrackStart(1) /
common.FRAMES_PER_SECOND)
else:
length = (self.result.table.getTrackLength(track.number) /
common.FRAMES_PER_SECOND)
# now loop to match responses
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
confidence = None
response = None
# match against each response's checksum for this track
for j, r in enumerate(responses):
if "%08x" % csum == r.checksums[i]:
response = r
logger.debug(
"Track %02d matched response %d of %d in "
"AccurateRip database",
i + 1, j + 1, len(responses))
trackResult.accurip = True
# FIXME: maybe checksums should be ints
trackResult.ARDBCRC = int(r.checksums[i], 16)
# arsum = csum
confidence = r.confidences[i]
trackResult.ARDBConfidence = confidence
if not trackResult.accurip:
logger.warning("Track %02d: not matched in "
"AccurateRip database", i + 1)
# I have seen AccurateRip responses with 0 as confidence
# for example, Best of Luke Haines, disc 1, track 1
maxConfidence = -1
maxResponse = None
for r in responses:
if r.confidences[i] > maxConfidence:
maxConfidence = r.confidences[i]
maxResponse = r
logger.debug('Track %02d: found max confidence %d' % (
i + 1, maxConfidence))
trackResult.ARDBMaxConfidence = maxConfidence
if not response:
logger.warning('Track %02d: none of the responses matched.',
i + 1)
trackResult.ARDBCRC = int(
maxResponse.checksums[i], 16)
else:
trackResult.ARDBCRC = int(response.checksums[i], 16)
# TODO MW: Update this further for ARv2 code
def getAccurateRipResults(self):
"""
@rtype: list of str
"""
res = []
# loop over tracks
for i, trackResult in enumerate(self.result.tracks):
status = 'rip NOT accurate'
if trackResult.accurip:
status = 'rip accurate '
c = "(not found) "
ar = ", DB [notfound]"
if trackResult.ARDBMaxConfidence:
c = "(max confidence %3d)" % trackResult.ARDBMaxConfidence
if trackResult.ARDBConfidence is not None:
if trackResult.ARDBConfidence \
< trackResult.ARDBMaxConfidence:
c = "(confidence %3d of %3d)" % (
trackResult.ARDBConfidence,
trackResult.ARDBMaxConfidence)
ar = ", DB [%08x]" % trackResult.ARDBCRC
# htoa tracks (i == 0) do not have an ARCRC
if trackResult.ARCRC is None:
assert trackResult.number == 0, \
'no trackResult.ARCRC on non-HTOA track %d' % \
trackResult.number
res.append("Track 0: unknown (not tracked)")
else:
res.append("Track %2d: %s %s [%08x]%s" % (
trackResult.number, status, c, trackResult.ARCRC, ar))
return res
target_path = common.getRelativePath(track.filename, m3uPath)
u = u'#EXTINF:%d,%s\n' % (length, target_path)
f.write(u.encode('utf-8'))
u = '%s\n' % target_path
f.write(u.encode('utf-8'))
def writeCue(self, discName):
assert self.result.table.canCue()

View File

@@ -26,7 +26,6 @@ import os
from whipper.common import encode
from whipper.common import common
from whipper.common import checksum
from whipper.image import cue, table
from whipper.extern.task import task
from whipper.program.soxi import AudioLengthTask
@@ -108,47 +107,6 @@ class Image(object):
logger.debug('setup image done')
class AccurateRipChecksumTask(task.MultiSeparateTask):
"""
I calculate the AccurateRip checksums of all tracks.
"""
description = "Checksumming tracks"
# TODO MW: Update this further for V2 code
def __init__(self, image):
task.MultiSeparateTask.__init__(self)
self._image = image
cue = image.cue
self.checksums = []
logger.debug('Checksumming %d tracks' % len(cue.table.tracks))
for trackIndex, track in enumerate(cue.table.tracks):
index = track.indexes[1]
length = cue.getTrackLength(track)
if length < 0:
logger.debug('track %d has unknown length' %
(trackIndex + 1, ))
else:
logger.debug('track %d is %d samples long' % (
trackIndex + 1, length))
path = image.getRealPath(index.path)
checksumTask = checksum.FastAccurateRipChecksumTask(
path,
trackNumber=trackIndex + 1,
trackCount=len(cue.table.tracks),
wave=True, v2=False)
self.addTask(checksumTask)
def stop(self):
self.checksums = [t.checksum for t in self.tasks]
task.MultiSeparateTask.stop(self)
class ImageVerifyTask(task.MultiSeparateTask):
"""
I verify a disk image and get the necessary track lengths.

View File

@@ -475,51 +475,6 @@ class Table(object):
logger.debug('MusicBrainz values: %r', result)
return result
def getAccurateRipIds(self):
"""
Calculate the two AccurateRip ID's.
@returns: the two 8-character hexadecimal disc ID's
@rtype: tuple of (str, str)
"""
# AccurateRip does not take into account data tracks,
# but does count the data track to determine the leadout offset
discId1 = 0
discId2 = 0
for track in self.tracks:
if not track.audio:
continue
offset = self.getTrackStart(track.number)
discId1 += offset
discId2 += (offset or 1) * track.number
# also add end values, where leadout offset is one past the end
# of the last track
last = self.tracks[-1]
offset = self.getTrackEnd(last.number) + 1
discId1 += offset
discId2 += offset * (self.getAudioTracks() + 1)
discId1 &= 0xffffffff
discId2 &= 0xffffffff
return ("%08x" % discId1, "%08x" % discId2)
def getAccurateRipURL(self):
"""
Return the full AccurateRip URL.
@returns: the AccurateRip URL
@rtype: str
"""
discId1, discId2 = self.getAccurateRipIds()
return "http://www.accuraterip.com/accuraterip/" \
"%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId())
def cue(self, cuePath='', program='whipper'):
"""
@param cuePath: path to the cue file to be written. If empty,
@@ -851,6 +806,41 @@ class Table(object):
return True
def accuraterip_ids(self):
"""
returns both AccurateRip disc ids as a tuple of 8-char
hexadecimal strings (discid1, discid2)
"""
# AccurateRip does not take into account data tracks,
# but does count the data track to determine the leadout offset
discId1 = 0
discId2 = 0
for track in self.tracks:
if not track.audio:
continue
offset = self.getTrackStart(track.number)
discId1 += offset
discId2 += (offset or 1) * track.number
# also add end values, where leadout offset is one past the end
# of the last track
offset = self.getTrackEnd(self.tracks[-1].number) + 1
discId1 += offset
discId2 += offset * (self.getAudioTracks() + 1)
discId1 &= 0xffffffff
discId2 &= 0xffffffff
return ("%08x" % discId1, "%08x" % discId2)
def accuraterip_path(self):
discId1, discId2 = self.accuraterip_ids()
return "%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId()
)
def canCue(self):
"""
Check if this table can be used to generate a .cue file

View File

@@ -7,21 +7,26 @@ ARB = 'accuraterip-checksum'
FLAC = 'flac'
def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
def _execute(cmd, **redirects):
logger.debug('executing %r', cmd)
return Popen(cmd, **redirects)
def accuraterip_checksum(f, track_number, total_tracks, wave=False, v2=False):
v = '--accuraterip-v1'
if v2:
v = '--accuraterip-v2'
track, tracks = str(track), str(tracks)
track_number, total_tracks = str(track_number), str(total_tracks)
if not wave:
flac = Popen([FLAC, '-cds', f], stdout=PIPE)
arc = Popen([ARB, v, '/dev/stdin', track, tracks],
stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
if wave:
cmd = [ARB, v, f, track_number, total_tracks]
redirects = dict(stdout=PIPE, stderr=PIPE)
else:
arc = Popen([ARB, v, f, track, tracks],
stdout=PIPE, stderr=PIPE)
flac = _execute([FLAC, '-cds', f], stdout=PIPE)
cmd = [ARB, v, '/dev/stdin', track_number, total_tracks]
redirects = dict(stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
arc = _execute(cmd, **redirects)
if not wave:
flac.stdout.close()
@@ -30,23 +35,24 @@ def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
if not wave:
flac.wait()
flac_rc = flac.returncode
if flac.returncode != 0:
logger.warning(
'ARC calculation failed: flac return code is non zero: %r' %
flac.returncode
)
return None
arc_rc = arc.returncode
if not wave and flac_rc != 0:
logger.warning('ARC calculation failed: flac return code is non zero')
if arc.returncode != 0:
logger.warning(
'ARC calculation failed: arc return code is non zero: %r' %
arc.returncode
)
return None
if arc_rc != 0:
logger.warning('ARC calculation failed: arc return code is non zero')
return None
out = out.strip()
try:
outh = int('0x%s' % out, base=16)
checksum = int('0x%s' % out.strip(), base=16)
logger.debug('returned %r', checksum)
return checksum
except ValueError:
logger.warning('ARC output is not usable')
return None
return outh

View File

@@ -202,23 +202,29 @@ class WhipperLogger(result.Logger):
lines.append(" Copy CRC: %08X" % trackResult.copycrc)
# AccurateRip track status
# Currently there's no support for AccurateRip V2
if trackResult.accurip:
lines.append(" AccurateRip V1:")
self._inARDatabase += 1
if trackResult.ARCRC == trackResult.ARDBCRC:
lines.append(" Result: Found, exact match")
self._accuratelyRipped += 1
else:
lines.append(" Result: Found, NO exact match")
lines.append(" Confidence: %d" %
trackResult.ARDBConfidence)
lines.append(" Local CRC: %08X" % trackResult.ARCRC)
lines.append(" Remote CRC: %08X" % trackResult.ARDBCRC)
elif trackResult.number != 0:
lines.append(" AccurateRip V1:")
lines.append(" Result: Track not present in "
"AccurateRip database")
for v in ('v1', 'v2'):
if trackResult.AR[v]['DBCRC']:
lines.append(" AccurateRip %s:" % v)
self._inARDatabase += 1
if trackResult.AR[v]['CRC'] == trackResult.AR[v]['DBCRC']:
lines.append(" Result: Found, exact match")
self._accuratelyRipped += 1
else:
lines.append(" Result: Found, NO exact match")
lines.append(
" Confidence: %d" % trackResult.AR[v]['DBConfidence']
)
lines.append(
" Local CRC: %s" % trackResult.AR[v]['CRC'].upper()
)
lines.append(
" Remote CRC: %s" % trackResult.AR[v]['DBCRC'].upper()
)
elif trackResult.number != 0:
lines.append(" AccurateRip %s:" % v)
lines.append(
" Result: Track not present in AccurateRip database"
)
# Check if Test & Copy CRCs are equal
if trackResult.testcrc == trackResult.copycrc:

View File

@@ -23,56 +23,46 @@ import time
class TrackResult:
"""
@type filename: unicode
@ivar testcrc: 4-byte CRC for the test read
@type testcrc: int
@ivar copycrc: 4-byte CRC for the copy read
@type copycrc: int
@var accurip: whether this track's AR CRC was found in the
database, and thus whether the track is considered
accurately ripped.
If false, it can be ripped wrong, not exist in
the database, ...
@type accurip: bool
@var ARCRC: our calculated 4 byte AccurateRip CRC for this
track.
@type ARCRC: int
@var ARDBCRC: the 4-byte AccurateRip CRC this
track did or should have matched in the database.
If None, the track is not in the database.
@type ARDBCRC: int
@var ARDBConfidence: confidence for the matched AccurateRip CRC for
this track in the database.
If None, the track is not in the database.
@var ARDBMaxConfidence: maximum confidence in the AccurateRip database for
this track; can still be 0.
If None, the track is not in the database.
"""
number = None
filename = None
pregap = 0 # in frames
pre_emphasis = None
peak = 0.0
quality = 0.0
testspeed = 0.0
copyspeed = 0.0
testduration = 0.0
copyduration = 0.0
# 4 byte CRCs for the test and copy reads
testcrc = None
copycrc = None
accurip = False # whether it's in the database
ARCRC = None
ARDBCRC = None
ARDBConfidence = None
ARDBMaxConfidence = None
AR = None
classVersion = 3
def __init__(self):
"""
CRC: calculated 4 byte AccurateRip CRC
DBCRC: 4 byte AccurateRip CRC from the AR database
DBConfidence: confidence for the matched AccurateRip DB CRC
DBMaxConfidence: track's maximum confidence in the AccurateRip DB
DBMaxConfidenceCRC: maximum confidence CRC
"""
self.AR = {
'v1': {
'CRC': None,
'DBCRC': None,
'DBConfidence': None,
},
'v2': {
'CRC': None,
'DBCRC': None,
'DBConfidence': None,
},
'DBMaxConfidence': None,
'DBMaxConfidenceCRC': None,
}
class RipResult:
"""

Binary file not shown.

View File

@@ -1,31 +1,316 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import sys
from StringIO import StringIO
from os import chmod, makedirs
from os.path import dirname, exists, join
from shutil import copy, rmtree
from tempfile import mkdtemp
from unittest import TestCase
from whipper.common import accurip
from whipper.test import common as tcommon
from whipper.common.accurip import (
calculate_checksums, get_db_entry, print_report, verify_result,
_split_responses, EntryNotFound
)
from whipper.result.result import RipResult, TrackResult
class AccurateRipResponseTestCase(tcommon.TestCase):
class TestAccurateRipResponse(TestCase):
@classmethod
def setUpClass(cls):
cls.path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
cls.entry = _split_responses(
open(join(dirname(__file__), cls.path[6:])).read()
)
cls.other_path = '4/8/2/dBAR-011-0010e284-009228a3-9809ff0b.bin'
def testResponse(self):
path = os.path.join(os.path.dirname(__file__),
'dBAR-011-0010e284-009228a3-9809ff0b.bin')
data = open(path, "rb").read()
def setUp(self):
self.cache_dir = mkdtemp(suffix='whipper_accurip_cache_test')
accurip._CACHE_DIR = self.cache_dir
responses = accurip.getAccurateRipResponses(data)
self.assertEquals(len(responses), 3)
def cleanup(cachedir):
chmod(cachedir, 0755)
rmtree(cachedir)
self.addCleanup(cleanup, self.cache_dir)
response = responses[0]
def test_uses_cache_dir(self):
# copy normal entry into other entry's place
makedirs(dirname(join(self.cache_dir, self.other_path)))
copy(
join(dirname(__file__), self.path[6:]),
join(self.cache_dir, self.other_path)
)
# ask cache for other entry and assert cached entry equals normal entry
self.assertEquals(self.entry, get_db_entry(self.other_path))
self.assertEquals(response.trackCount, 11)
self.assertEquals(response.discId1, "0010e284")
self.assertEquals(response.discId2, "009228a3")
self.assertEquals(response.cddbDiscId, "9809ff0b")
def test_raises_entrynotfound_for_no_entry(self):
with self.assertRaises(EntryNotFound):
get_db_entry('definitely_a_404')
for i in range(11):
self.assertEquals(response.confidences[i], 35)
self.assertEquals(response.checksums[0], "beea32c8")
self.assertEquals(response.checksums[10], "acee98ca")
def test_can_return_entry_without_saving(self):
chmod(self.cache_dir, 0)
self.assertEqual(get_db_entry(self.path), self.entry)
chmod(self.cache_dir, 0755)
self.assertFalse(exists(join(self.cache_dir, self.path)))
def test_retrieves_and_saves_accuraterip_entry(self):
# for path, entry in zip(self.paths[0], self.entries):
self.assertFalse(exists(join(self.cache_dir, self.path)))
self.assertEquals(get_db_entry(self.path), self.entry)
self.assertTrue(exists(join(self.cache_dir, self.path)))
def test_AccurateRipResponse_parses_correctly(self):
responses = get_db_entry(self.path)
self.assertEquals(len(responses), 2)
self.assertEquals(responses[0].num_tracks, 2)
self.assertEquals(responses[0].discId1, '0000f21c')
self.assertEquals(responses[0].discId2, '00027ef8')
self.assertEquals(responses[0].cddbDiscId, '05021002')
self.assertEquals(responses[0].confidences[0], 12)
self.assertEquals(responses[0].confidences[1], 20)
self.assertEquals(responses[0].checksums[0], '284fc705')
self.assertEquals(responses[0].checksums[1], '9cc1f32e')
self.assertEquals(responses[1].num_tracks, 2)
self.assertEquals(responses[1].discId1, '0000f21c')
self.assertEquals(responses[1].discId2, '00027ef8')
self.assertEquals(responses[1].cddbDiscId, '05021002')
self.assertEquals(responses[1].confidences[0], 4)
self.assertEquals(responses[1].confidences[1], 4)
self.assertEquals(responses[1].checksums[0], 'dc77f9ab')
self.assertEquals(responses[1].checksums[1], 'dd97d2c3')
# XXX: test arc.py
class TestCalculateChecksums(TestCase):
def test_returns_none_for_bad_files(self):
self.assertEquals(
calculate_checksums(['/does/not/exist']),
{'v1': [None], 'v2': [None]}
)
# TODO: test success when file exists
class TestVerifyResult(TestCase):
@classmethod
def setUpClass(cls):
path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
cls.responses = _split_responses(
open(join(dirname(__file__), path[6:])).read()
)
cls.checksums = {
'v1': ['284fc705', '9cc1f32e'],
'v2': ['dc77f9ab', 'dd97d2c3'],
}
def setUp(self):
self.result = RipResult()
for n in range(1, 2+1):
track = TrackResult()
track.number = n
self.result.tracks.append(track)
def test_empty_result_returns_false(self):
self.assertEquals(
verify_result(RipResult(), self.responses, self.checksums),
False
)
def test_empty_responses_returns_false(self):
self.assertEquals(
verify_result(self.result, [], self.checksums),
False
)
# XXX: would this happen?
def test_empty_checksums_returns_false(self):
self.assertEquals(
verify_result(self.result, self.responses, {}),
False
)
def test_wrong_checksums_returns_false(self):
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['deadbeef', '89abcdef'],
'v2': ['76543210', '01234567']
}),
False
)
def test_incomplete_checksums(self):
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', '9cc1f32e'],
'v2': [None, 'dd97d2c3'],
}),
True
)
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', None],
'v2': ['dc77f9ab', 'dd97d2c3'],
}),
True
)
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', None],
'v2': [None, 'dd97d2c3'],
}),
True
)
def test_matches_only_v1_or_v2_responses(self):
self.assertEquals(
verify_result(
self.result, [self.responses[0]], self.checksums
),
True
)
self.assertEquals(
verify_result(
self.result, [self.responses[1]], self.checksums
),
True
)
def test_passes_with_htoa(self):
htoa = TrackResult()
htoa.number = 0
self.result.tracks.append(htoa)
self.assertEquals(
verify_result(self.result, self.responses, self.checksums),
True
)
def test_stores_accuraterip_results_on_result(self):
self.assertEquals(
verify_result(self.result, self.responses, self.checksums),
True
)
self.assertEquals(self.result.tracks[0].AR, {
'v1': {
'CRC': '284fc705',
'DBCRC': '284fc705',
'DBConfidence': 12,
},
'v2': {
'CRC': 'dc77f9ab',
'DBCRC': 'dc77f9ab',
'DBConfidence': 4,
},
'DBMaxConfidence': 12,
'DBMaxConfidenceCRC': '284fc705',
})
self.assertEquals(self.result.tracks[1].AR, {
'v1': {
'CRC': '9cc1f32e',
'DBCRC': '9cc1f32e',
'DBConfidence': 20,
},
'v2': {
'CRC': 'dd97d2c3',
'DBCRC': 'dd97d2c3',
'DBConfidence': 4,
},
'DBMaxConfidence': 20,
'DBMaxConfidenceCRC': '9cc1f32e',
})
class TestAccurateRipReport(TestCase):
def setUp(self):
sys.stdout = StringIO()
self.result = RipResult()
track = TrackResult()
track.number = 1
track.AR = {
'v1': {
'CRC': '284fc705',
'DBCRC': '284fc705',
'DBConfidence': 12,
},
'v2': {
'CRC': 'dc77f9ab',
'DBCRC': 'dc77f9ab',
'DBConfidence': 4,
},
'DBMaxConfidence': 12,
'DBMaxConfidenceCRC': '284fc705',
}
self.result.tracks.append(track)
def tearDown(self):
sys.stdout = sys.__stdout__
def test_report_no_result(self):
track = TrackResult()
track.number = 1
self.result.tracks[0] = track
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: unknown (error)\n'
)
def test_track_not_found(self):
self.result.tracks[0].AR['DBMaxConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip NOT accurate (not found) '
' v1 [284fc705], v2 [dc77f9ab], DB [notfound]\n'
)
def test_htoa_not_tracked(self):
self.result.tracks[0].number = 0
self.result.tracks[0].AR['v1']['CRC'] = None
self.result.tracks[0].AR['v2']['CRC'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 0: unknown (not tracked)\n'
)
def test_report_v1_only(self):
self.result.tracks[0].AR['v2']['DBCRC'] = None
self.result.tracks[0].AR['v2']['DBConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (max confidence 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705]\n'
)
def test_report_v2_only(self):
self.result.tracks[0].AR['v1']['DBCRC'] = None
self.result.tracks[0].AR['v1']['DBConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (confidence 4 of 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [dc77f9ab]\n'
)
def test_report_v1_and_v2_max_confidence(self):
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (max confidence 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
)
def test_report_v1_and_v2(self):
self.result.tracks[0].AR['DBMaxConfidence'] = 66
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (confidence 12 of 66)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
)

View File

@@ -2,96 +2,19 @@
# vi:si:et:sw=4:sts=4:ts=4
import os
import pickle
import unittest
from whipper.result import result
from whipper.common import program, accurip, mbngs, config
from whipper.common import program, mbngs, config
from whipper.command.cd import DEFAULT_DISC_TEMPLATE
class TrackImageVerifyTestCase(unittest.TestCase):
# example taken from a rip of Luke Haines Is Dead, disc 1
# AccurateRip database has 0 confidence for 1st track
# Rip had a wrong result for track 9
def testVerify(self):
path = os.path.join(os.path.dirname(__file__),
'dBAR-020-002e5023-029d8e49-040eaa14.bin')
data = open(path, "rb").read()
responses = accurip.getAccurateRipResponses(data)
# these crc's were calculated from an actual rip
checksums = [1644890007, 2945205445, 3983436658, 1528082495,
1203704270, 1163423644, 3649097244, 100524219,
1583356174, 373652058, 1842579359, 2850056507,
1329730252, 2526965856, 2525886806, 209743350,
3184062337, 2099956663, 2943874164, 2321637196]
prog = program.Program(config.Config())
prog.result = result.RipResult()
# fill it with empty trackresults
for i, c in enumerate(checksums):
r = result.TrackResult()
r.number = i + 1
prog.result.tracks.append(r)
prog._verifyImageWithChecksums(responses, checksums)
# now check if the results were filled in properly
tr = prog.result.getTrackResult(1)
self.assertEquals(tr.accurip, False)
self.assertEquals(tr.ARDBMaxConfidence, 0)
self.assertEquals(tr.ARDBCRC, 0)
self.assertEquals(tr.ARDBCRC, 0)
tr = prog.result.getTrackResult(2)
self.assertEquals(tr.accurip, True)
self.assertEquals(tr.ARDBMaxConfidence, 2)
self.assertEquals(tr.ARDBCRC, checksums[2 - 1])
tr = prog.result.getTrackResult(10)
self.assertEquals(tr.accurip, False)
self.assertEquals(tr.ARDBMaxConfidence, 2)
# we know track 10 was ripped wrong
self.assertNotEquals(tr.ARDBCRC, checksums[10 - 1])
res = prog.getAccurateRipResults()
self.assertEquals(res[1 - 1],
"Track 1: rip NOT accurate (not found) "
"[620b0797], DB [notfound]")
self.assertEquals(res[2 - 1],
"Track 2: rip accurate (max confidence 2) "
"[af8c44c5], DB [af8c44c5]")
self.assertEquals(res[10 - 1],
"Track 10: rip NOT accurate (max confidence 2) "
"[16457a5a], DB [eb6e55b4]")
class HTOATestCase(unittest.TestCase):
def setUp(self):
path = os.path.join(os.path.dirname(__file__),
'silentalarm.result.pickle')
self._tracks = pickle.load(open(path, 'rb'))
def testGetAccurateRipResults(self):
prog = program.Program(config.Config())
prog.result = result.RipResult()
prog.result.tracks = self._tracks
prog.getAccurateRipResults()
class PathTestCase(unittest.TestCase):
def testStandardTemplateEmpty(self):
prog = program.Program(config.Config())
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
'mbdiscid', 0)
'mbdiscid', None)
self.assertEquals(path,
unicode('/tmp/unknown/Unknown Artist - mbdiscid/'
'Unknown Artist - mbdiscid'))
@@ -101,10 +24,9 @@ class PathTestCase(unittest.TestCase):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
prog.metadata = md
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
'mbdiscid', 0)
'mbdiscid', md, 0)
self.assertEquals(path,
unicode('/tmp/unknown/Jeff Buckley - Grace/'
'Jeff Buckley - Grace'))
@@ -114,92 +36,7 @@ class PathTestCase(unittest.TestCase):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
prog.metadata = md
path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', 0)
path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', md, 0)
self.assertEquals(path,
u'/tmp/Jeff Buckley/Grace')
def testDisambiguateOnRelease(self):
"""Test that disambiguation gets placed in the same part of the path
as the release name.
See https://github.com/JoeLametta/whipper/issues/127"""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
templates = {
u'%A/%d - %y': u'Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
u'%A - %d - %y': u'Guy Davis - Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
u'%A/%y/%d': u'Guy Davis/1996/Call Down the Thunder (RHR CD 89)',
u'%y/%d/%A': u'1996/Call Down the Thunder (RHR CD 89)/Guy Davis',
u'%d/%A/%y': u'Call Down the Thunder (RHR CD 89)/Guy Davis/1996',
}
for template, expected_path in templates.iteritems():
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path, u'/tmp/' + expected_path)
def testDisambiguateOnReleaseOnlyOnce(self):
"""Test that disambiguation gets added only once."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
template = u'%A/%d - %y/%d/%d'
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path,
u'/tmp/Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)/Call Down the Thunder/Call Down the Thunder') # noqa: E501
def testDisambiguateOnNoReleaseTitle(self):
"""Test that disambiguation gets added even if there's no release
title in the template."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
templates = {
u'%A/%y': u'Guy Davis/1996 (RHR CD 89)',
u'%A - %y': u'Guy Davis - 1996 (RHR CD 89)',
u'%y/%A': u'1996/Guy Davis (RHR CD 89)',
}
for template, expected_path in templates.iteritems():
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path, u'/tmp/' + expected_path)
def testAddDisambiguationUnitTest(self):
"""Unit test for Program.addDisambiguation()."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
# No relevant disambiguation metadata
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test')
# Only barcode available
md.barcode = '033651008927'
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test (033651008927)')
# Both catalog number and barcode available
md.catalogNumber = 'RHR CD 89'
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test (RHR CD 89)')

View File

@@ -62,11 +62,10 @@ class LadyhawkeTestCase(tcommon.TestCase):
"KnpGsLhvH.lPrNc1PBL21lb9Bg4-")
def testAccurateRip(self):
self.assertEquals(self.table.getAccurateRipIds(), (
self.assertEquals(self.table.accuraterip_ids(), (
"0013bd5a", "00b8d489"))
self.assertEquals(self.table.getAccurateRipURL(),
"http://www.accuraterip.com/accuraterip/a/5/d/"
"dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
self.assertEquals(self.table.accuraterip_path(),
"a/5/d/dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
def testDuration(self):
self.assertEquals(self.table.duration(), 2761413)

View File

@@ -89,8 +89,8 @@ class CureTestCase(common.TestCase):
common.diffStrings(ref, cue)
# we verify it because it has failed in readdisc in the past
self.assertEquals(self.toc.table.getAccurateRipURL(),
'http://www.accuraterip.com/accuraterip/3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin') # noqa: E501
self.assertEquals(self.toc.table.accuraterip_path(),
'3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin')
def testGetRealPath(self):
self.assertRaises(KeyError, self.toc.getRealPath, u'track01.wav')
@@ -164,8 +164,8 @@ class BlocTestCase(common.TestCase):
def testAccurateRip(self):
# we verify it because it has failed in readdisc in the past
self.assertEquals(self.toc.table.getAccurateRipURL(),
'http://www.accuraterip.com/accuraterip/e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin') # noqa: E501
self.assertEquals(self.toc.table.accuraterip_path(),
'e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin')
# The Breeders - Mountain Battles has CDText