AccurateRip V2 support

- output path no longer has fallbacks
- refactor accuraterip cache
- use requests to download accuraterip entries
- add tests for accuraterip functionality
- remove gobject support from accuraterip-checksum calculation
- default track template now includes extension
- begin to remove support for continuing rip
- begin to use print instead of sys.stdout.write() throughout
This commit is contained in:
Samantha Baldwin
2017-09-04 21:26:34 -04:00
parent 3f248bfc00
commit bfa0308880
14 changed files with 799 additions and 819 deletions

View File

@@ -19,6 +19,7 @@
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import argparse
import cdio
import os
import glob
import urllib2
@@ -41,7 +42,7 @@ logger = logging.getLogger(__name__)
SILENT = 1e-10
MAX_TRIES = 5
DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n'
DEFAULT_TRACK_TEMPLATE = u'%r/%A - %d/%t. %a - %n.%x'
DEFAULT_DISC_TEMPLATE = u'%r/%A - %d/%A - %d'
TEMPLATE_DESCRIPTION = '''
@@ -68,12 +69,6 @@ disc and track template are:
class _CD(BaseCommand):
"""
@type program: L{program.Program}
@ivar eject: whether to eject the drive after completing
"""
eject = True
@staticmethod
@@ -164,7 +159,7 @@ class _CD(BaseCommand):
self.itable = self.program.getTable(self.runner,
self.ittoc.getCDDBDiscId(),
self.ittoc.getMusicBrainzDiscId(),
self.device, offset)
self.device, self.options.offset)
assert self.itable.getCDDBDiscId() == self.ittoc.getCDDBDiscId(), \
"full table's id %s differs from toc id %s" % (
@@ -174,10 +169,10 @@ class _CD(BaseCommand):
"full table's mb id %s differs from toc id mb %s" % (
self.itable.getMusicBrainzDiscId(),
self.ittoc.getMusicBrainzDiscId())
assert self.itable.getAccurateRipURL() == \
self.ittoc.getAccurateRipURL(), \
assert self.itable.accuraterip_path() == \
self.ittoc.accuraterip_path(), \
"full table's AR URL %s differs from toc AR URL %s" % (
self.itable.getAccurateRipURL(), self.ittoc.getAccurateRipURL())
self.itable.accuraterip_url(), self.ittoc.accuraterip_url())
if self.program.metadata:
self.program.metadata.discid = self.ittoc.getMusicBrainzDiscId()
@@ -200,15 +195,9 @@ class _CD(BaseCommand):
self.program.result.title = self.program.metadata \
and self.program.metadata.title \
or 'Unknown Title'
try:
import cdio
_, self.program.result.vendor, self.program.result.model, \
self.program.result.release = \
cdio.Device(self.device).get_hwinfo()
except ImportError:
raise ImportError("Pycdio module import failed.\n"
"This is a hard dependency: if not "
"available please install it")
_, self.program.result.vendor, self.program.result.model, \
self.program.result.release = \
cdio.Device(self.device).get_hwinfo()
self.doCommand()
@@ -348,41 +337,25 @@ Log files will log the path to tracks relative to this directory.
self.program.result.overread = self.options.overread
self.program.result.logger = self.options.logger
# write disc files
disambiguate = False
while True:
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid, 0,
disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if os.path.exists(dirname):
sys.stdout.write("Output directory %s already exists\n" %
dirname.encode('utf-8'))
logs = glob.glob(os.path.join(dirname, '*.log'))
if logs:
sys.stdout.write(
"Output directory %s is a finished rip\n" %
dirname.encode('utf-8'))
if not disambiguate:
disambiguate = True
continue
return
else:
break
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid,
self.program.metadata)
dirname = os.path.dirname(discName)
if os.path.exists(dirname):
logs = glob.glob(os.path.join(dirname, '*.log'))
if logs:
bye("output directory %s is a finished rip" %
dirname.encode('utf-8'))
else:
sys.stdout.write("Creating output directory %s\n" %
sys.stdout.write("output directory %s already exists\n" %
dirname.encode('utf-8'))
os.makedirs(dirname)
break
# FIXME: say when we're continuing a rip
# FIXME: disambiguate if the pre-existing rip is different
print("creating output directory %s" % dirname.encode('utf-8'))
os.makedirs(dirname)
# FIXME: turn this into a method
def ripIfNotRipped(number):
def _ripIfNotRipped(number):
logger.debug('ripIfNotRipped for track %d' % number)
# we can have a previous result
trackResult = self.program.result.getTrackResult(number)
@@ -395,9 +368,9 @@ Log files will log the path to tracks relative to this directory.
path = self.program.getPath(self.program.outdir,
self.options.track_template,
self.mbdiscid, number,
disambiguate=disambiguate) \
+ '.' + 'flac'
self.mbdiscid,
self.program.metadata,
track_number=number)
logger.debug('ripIfNotRipped: path %r' % path)
trackResult.number = number
@@ -464,12 +437,11 @@ Log files will log the path to tracks relative to this directory.
"track can't be ripped. "
"Rip attempts number is equal to 'MAX_TRIES'")
if trackResult.testcrc == trackResult.copycrc:
sys.stdout.write('Checksums match for track %d\n' %
number)
sys.stdout.write('CRCs match for track %d\n' % number)
else:
sys.stdout.write(
'ERROR: checksums did not match for track %d\n' %
number)
'ERROR: CRCs did not match for track %d\n' % number
)
raise
sys.stdout.write(
@@ -507,109 +479,35 @@ Log files will log the path to tracks relative to this directory.
htoa = self.program.getHTOA()
if htoa:
start, stop = htoa
sys.stdout.write(
'Found Hidden Track One Audio from frame %d to %d\n' % (
start, stop))
# rip it
ripIfNotRipped(0)
print('found Hidden Track One Audio from frame %d to %d' % (
start, stop))
_ripIfNotRipped(0)
htoapath = self.program.result.tracks[0].filename
for i, track in enumerate(self.itable.tracks):
# FIXME: rip data tracks differently
if not track.audio:
sys.stdout.write(
'WARNING: skipping data track %d, not implemented\n' % (
i + 1, ))
print('skipping data track %d, not implemented' % i + 1)
# FIXME: make it work for now
track.indexes[1].relative = 0
continue
ripIfNotRipped(i + 1)
# write disc files
discName = self.program.getPath(self.program.outdir,
self.options.disc_template,
self.mbdiscid, 0,
disambiguate=disambiguate)
dirname = os.path.dirname(discName)
if not os.path.exists(dirname):
os.makedirs(dirname)
_ripIfNotRipped(i + 1)
logger.debug('writing cue file for %r', discName)
self.program.writeCue(discName)
# write .m3u file
logger.debug('writing m3u file for %r', discName)
m3uPath = u'%s.m3u' % discName
handle = open(m3uPath, 'w')
u = u'#EXTM3U\n'
handle.write(u.encode('utf-8'))
self.program.write_m3u(discName, htoapath)
def writeFile(handle, path, length):
targetPath = common.getRelativePath(path, m3uPath)
u = u'#EXTINF:%d,%s\n' % (length, targetPath)
handle.write(u.encode('utf-8'))
u = '%s\n' % targetPath
handle.write(u.encode('utf-8'))
if self.program.verifyImage(self.runner, self.ittoc, self.itable):
print('rip verified as accurate')
else:
print('rip NOT verified as accurate')
if htoapath:
writeFile(handle, htoapath,
self.itable.getTrackStart(1) / common.FRAMES_PER_SECOND)
for i, track in enumerate(self.itable.tracks):
if not track.audio:
continue
path = self.program.getPath(self.program.outdir,
self.options.track_template,
self.mbdiscid, i + 1,
disambiguate=disambiguate
) + '.' + 'flac'
writeFile(handle, path,
(self.itable.getTrackLength(i + 1) /
common.FRAMES_PER_SECOND))
handle.close()
# verify using accuraterip
url = self.ittoc.getAccurateRipURL()
sys.stdout.write("AccurateRip URL %s\n" % url)
accucache = accurip.AccuCache()
try:
responses = accucache.retrieve(url)
except urllib2.URLError, e:
if isinstance(e.args[0], socket.gaierror):
if e.args[0].errno == -2:
sys.stdout.write("Warning: network error: %r\n" % (
e.args[0], ))
responses = None
else:
raise
else:
raise
if not responses:
sys.stdout.write('Album not found in AccurateRip database\n')
if responses:
sys.stdout.write('%d AccurateRip reponses found\n' %
len(responses))
if responses[0].cddbDiscId != self.itable.getCDDBDiscId():
sys.stdout.write(
"AccurateRip response discid different: %s\n" %
responses[0].cddbDiscId)
self.program.verifyImage(self.runner, responses)
sys.stdout.write("\n".join(
self.program.getAccurateRipResults()) + "\n")
accurip.print_report(self.program.result)
self.program.saveRipResult()
# write log file
self.program.writeLog(discName, self.logger)

View File

@@ -27,7 +27,7 @@ import gobject
from whipper.command.basecommand import BaseCommand
from whipper.common import accurip, common, config, drive
from whipper.common import task as ctask
from whipper.program import cdrdao, cdparanoia, utils
from whipper.program import arc, cdrdao, cdparanoia, utils
from whipper.common import checksum
from whipper.extern.task import task
@@ -92,23 +92,7 @@ CD in the AccurateRip database."""
table = t.table
logger.debug("CDDB disc id: %r", table.getCDDBDiscId())
url = table.getAccurateRipURL()
logger.debug("AccurateRip URL: %s", url)
# FIXME: download url as a task too
responses = []
import urllib2
try:
handle = urllib2.urlopen(url)
data = handle.read()
responses = accurip.getAccurateRipResponses(data)
except urllib2.HTTPError, e:
if e.code == 404:
sys.stdout.write(
'Album not found in AccurateRip database.\n')
return 1
else:
raise
responses = accurip.get_db_entry(table.accuraterip_path())
if responses:
logger.debug('%d AccurateRip responses found.' % len(responses))
@@ -120,17 +104,19 @@ CD in the AccurateRip database."""
# now rip the first track at various offsets, calculating AccurateRip
# CRC, and matching it against the retrieved ones
def match(archecksum, track, responses):
# archecksums is a tuple of accuraterip checksums: (v1, v2)
def match(archecksums, track, responses):
for i, r in enumerate(responses):
if archecksum == r.checksums[track - 1]:
return archecksum, i
for checksum in archecksums:
if checksum == r.checksums[track - 1]:
return checksum, i
return None, None
for offset in self._offsets:
sys.stdout.write('Trying read offset %d ...\n' % offset)
try:
archecksum = self._arcs(runner, table, 1, offset)
archecksums = self._arcs(runner, table, 1, offset)
except task.TaskException, e:
# let MissingDependency fall through
@@ -149,9 +135,9 @@ CD in the AccurateRip database."""
'WARNING: cannot rip with offset %d...\n' % offset)
continue
logger.debug('AR checksum calculated: %s' % archecksum)
logger.debug('AR checksums calculated: %s %s' % archecksums)
c, i = match(archecksum, 1, responses)
c, i = match(archecksums, 1, responses)
if c:
count = 1
logger.debug('MATCHED against response %d' % i)
@@ -161,9 +147,10 @@ CD in the AccurateRip database."""
# now try and rip all other tracks as well, except for the
# last one (to avoid readers that can't do overread
for track in range(2, (len(table.tracks) + 1) - 1):
# for track in range(2, (len(table.tracks) + 1) - 1):
for track in range(2, (len(table.tracks) + 1)):
try:
archecksum = self._arcs(runner, table, track, offset)
archecksums = self._arcs(runner, table, track, offset)
except task.TaskException, e:
if isinstance(e.exception, cdparanoia.FileSizeError):
sys.stdout.write(
@@ -171,7 +158,7 @@ CD in the AccurateRip database."""
offset)
continue
c, i = match(archecksum, track, responses)
c, i = match(archecksums, track, responses)
if c:
logger.debug('MATCHED track %d against response %d' % (
track, i))
@@ -188,9 +175,8 @@ CD in the AccurateRip database."""
sys.stdout.write('No matching offset found.\n')
sys.stdout.write('Consider trying again with a different disc.\n')
# TODO MW: Update this further for ARv2 code
def _arcs(self, runner, table, track, offset):
# rips the track with the given offset, return the arcs checksum
# rips the track with the given offset, return the arcs checksums
logger.debug('Ripping track %r with offset %d ...', track, offset)
fd, path = tempfile.mkstemp(
@@ -207,15 +193,15 @@ CD in the AccurateRip database."""
track, offset)
runner.run(t)
# TODO MW: Update this to also use the v2 checksum(s)
t = checksum.FastAccurateRipChecksumTask(path,
trackNumber=track,
trackCount=len(table.tracks),
wave=True, v2=False)
runner.run(t)
v1 = arc.accuraterip_checksum(
path, track, len(table.tracks), wave=True, v2=False
)
v2 = arc.accuraterip_checksum(
path, track, len(table.tracks), wave=True, v2=True
)
os.unlink(path)
return "%08x" % t.checksum
return ("%08x" % v1, "%08x" % v2)
def _foundOffset(self, device, offset):
sys.stdout.write('\nRead offset of device is: %d.\n' %

View File

@@ -1,6 +1,7 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
# Copyright (C) 2017 Samantha Baldwin
# Copyright (C) 2009 Thomas Vander Stichele
# This file is part of whipper.
@@ -18,128 +19,250 @@
# You should have received a copy of the GNU General Public License
# along with whipper. If not, see <http://www.gnu.org/licenses/>.
import errno
import os
import requests
import struct
import urlparse
import urllib2
from errno import EEXIST
from os import makedirs
from os.path import dirname, exists, join
from whipper.common import directory
from whipper.program.arc import accuraterip_checksum
import logging
logger = logging.getLogger(__name__)
_CACHE_DIR = directory.cache_path()
_CACHE_DIR = join(directory.cache_path(), 'accurip')
class AccuCache:
def __init__(self):
if not os.path.exists(_CACHE_DIR):
logger.debug('Creating cache directory %s', _CACHE_DIR)
os.makedirs(_CACHE_DIR)
def _getPath(self, url):
# split path starts with /
return os.path.join(_CACHE_DIR, urlparse.urlparse(url)[2][1:])
def retrieve(self, url, force=False):
logger.debug("Retrieving AccurateRip URL %s", url)
path = self._getPath(url)
logger.debug("Cached path: %s", path)
if force:
logger.debug("forced to download")
self.download(url)
elif not os.path.exists(path):
logger.debug("%s does not exist, downloading", path)
self.download(url)
if not os.path.exists(path):
logger.debug("%s does not exist, not in database", path)
return None
data = self._read(url)
return getAccurateRipResponses(data)
def download(self, url):
# FIXME: download url as a task too
try:
handle = urllib2.urlopen(url)
data = handle.read()
except urllib2.HTTPError, e:
if e.code == 404:
return None
else:
raise
self._cache(url, data)
return data
def _cache(self, url, data):
path = self._getPath(url)
try:
os.makedirs(os.path.dirname(path))
except OSError, e:
logger.debug('Could not make dir %s: %r' % (
path, str(e)))
if e.errno != errno.EEXIST:
raise
handle = open(path, 'wb')
handle.write(data)
handle.close()
def _read(self, url):
logger.debug("Reading %s from cache", url)
path = self._getPath(url)
handle = open(path, 'rb')
data = handle.read()
handle.close()
return data
def getAccurateRipResponses(data):
ret = []
while data:
trackCount = struct.unpack("B", data[0])[0]
nbytes = 1 + 12 + trackCount * (1 + 8)
ret.append(AccurateRipResponse(data[:nbytes]))
data = data[nbytes:]
return ret
class AccurateRipResponse(object):
class _AccurateRipResponse(object):
"""
I represent the response of the AccurateRip online database.
An AccurateRip response contains a collection of metadata identifying a
particular digital audio compact disc.
@type checksums: list of str
For disc level metadata it contains the track count, two internal disc
IDs, and the CDDB disc ID.
A checksum and a confidence score is stored sequentially for each track in
the disc index, which excludes any audio hidden in track pre-gaps (such as
HTOA).
The response is stored as a packed binary structure.
"""
trackCount = None
discId1 = ""
discId2 = ""
cddbDiscId = ""
confidences = None
checksums = None
def __init__(self, data):
self.trackCount = struct.unpack("B", data[0])[0]
"""
The checksums and confidences arrays are indexed by relative track
position, so track 1 will have array index 0, track 2 will have array
index 1, and so forth. HTOA and other hidden tracks are not included.
"""
self.num_tracks = struct.unpack("B", data[0])[0]
self.discId1 = "%08x" % struct.unpack("<L", data[1:5])[0]
self.discId2 = "%08x" % struct.unpack("<L", data[5:9])[0]
self.cddbDiscId = "%08x" % struct.unpack("<L", data[9:13])[0]
self.confidences = []
self.checksums = []
pos = 13
for _ in range(self.trackCount):
for _ in range(self.num_tracks):
confidence = struct.unpack("B", data[pos])[0]
checksum = "%08x" % struct.unpack("<L", data[pos + 1:pos + 5])[0]
pos += 9
self.confidences.append(confidence)
self.checksums.append(checksum)
pos += 9
def __eq__(self, other):
return [
self.num_tracks, self.discId1, self.discId2, self.cddbDiscId,
self.confidences, self.checksums
] == [
other.num_tracks, other.discId1, other.discId2, other.cddbDiscId,
other.confidences, other.checksums
]
def _split_responses(raw_entry):
responses = []
while raw_entry:
track_count = struct.unpack("B", raw_entry[0])[0]
nbytes = 1 + 12 + track_count * (1 + 8)
responses.append(_AccurateRipResponse(raw_entry[:nbytes]))
raw_entry = raw_entry[nbytes:]
return responses
def calculate_checksums(track_paths):
"""
Return ARv1 and ARv2 checksums as two arrays of character strings in a
dictionary: {'v1': ['deadbeef', ...], 'v2': [...]}
Return None instead of checksum string for unchecksummable tracks.
HTOA checksums are not included in the database and are not calculated.
"""
track_count = len(track_paths)
v1_checksums = []
v2_checksums = []
logger.debug('checksumming %d tracks' % track_count)
# This is done sequentially because it is very fast.
for i, path in enumerate(track_paths):
v1_sum = accuraterip_checksum(
path, i+1, track_count, wave=True, v2=False
)
if not v1_sum:
logger.error(
'could not calculate AccurateRip v1 checksum for track %d %r' %
(i+1, path)
)
v1_checksums.append(None)
else:
v1_checksums.append("%08x" % v1_sum)
v2_sum = accuraterip_checksum(
path, i+1, track_count, wave=True, v2=True
)
if not v2_sum:
logger.error(
'could not calculate AccurateRip v2 checksum for track %d %r' %
(i+1, path)
)
v2_checksums.append(None)
else:
v2_checksums.append("%08x" % v2_sum)
return {'v1': v1_checksums, 'v2': v2_checksums}
def _download_entry(path):
url = "http://www.accuraterip.com/accuraterip/" + path
logger.debug('downloading AccurateRip entry from %s', url)
try:
resp = requests.get(url)
except requests.exceptions.ConnectionError as e:
logger.error('error retrieving AccurateRip entry: %r' % e)
return None
if not resp.ok:
logger.error('error retrieving AccurateRip entry: %s %s %r' % (
resp.status_code, resp.reason, resp
))
return None
return resp.content
def _save_entry(raw_entry, path):
logger.debug('saving AccurateRip entry to %s', path)
# XXX: os.makedirs(exist_ok=True) in py3
try:
makedirs(dirname(path))
except OSError, e:
if e.errno != EEXIST:
logger.error('could not save entry to %s: %r' % (path, str(e)))
return
open(path, 'wb').write(raw_entry)
def get_db_entry(path):
"""
Retrieve cached AccurateRip disc entry as array of _AccurateRipResponses.
Downloads entry from accuraterip.com on cache fault.
`path' is in the format of the output of table.accuraterip_path().
"""
cached_path = join(_CACHE_DIR, path)
if exists(cached_path):
logger.debug('found accuraterip entry at %s', cached_path)
raw_entry = open(cached_path, 'rb').read()
else:
raw_entry = _download_entry(path)
if raw_entry:
_save_entry(raw_entry, cached_path)
if not raw_entry:
return None
return _split_responses(raw_entry)
def _assign_checksums_and_confidences(tracks, checksums, responses):
for i, track in enumerate(tracks):
for v in ('v1', 'v2'):
track.AR[v]['CRC'] = checksums[v][i]
track.AR['DBMaxConfidence'], track.AR['DBMaxConfidenceCRC'] = max(
[(r.confidences[i], r.checksums[i]) for r in responses],
key=lambda t: t[0]
)
def _match_responses(tracks, responses):
"""
Match and save track accuraterip response checksums against
all non-hidden tracks.
Returns True if every track has a match for every entry for either
AccurateRip version.
"""
for r in responses:
for i, track in enumerate(tracks):
for v in ('v1', 'v2'):
if track.AR[v]['CRC'] == r.checksums[i]:
if r.confidences[i] > track.AR[v]['DBConfidence']:
track.AR[v]['DBCRC'] = r.checksums[i]
track.AR[v]['DBConfidence'] = r.confidences[i]
logger.debug(
'track %d matched response %s in AccurateRip'
' database: %s crc %s confidence %s' %
(i, r.cddbDiscId, v, track.AR[v]['DBCRC'],
track.AR[v]['DBConfidence'])
)
return any((
all([t.AR['v1']['DBCRC'] for t in tracks]),
all([t.AR['v2']['DBCRC'] for t in tracks])
))
def verify_result(result, responses, checksums):
"""
Verify track AccurateRip checksums against database responses.
Stores track checksums and database values on result.
"""
if not (result and responses and checksums):
return False
# exclude HTOA from AccurateRip verification
# NOTE: if pre-gap hidden audio support is expanded to include
# tracks other than HTOA, this is invalid.
tracks = filter(lambda t: t.number != 0, result.tracks)
if not tracks:
return False
_assign_checksums_and_confidences(tracks, checksums, responses)
return _match_responses(tracks, responses)
def print_report(result):
"""
Print AccurateRip verification results to stdout.
"""
for i, track in enumerate(result.tracks):
status = 'rip NOT accurate'
conf = '(not found)'
db = 'notfound'
if track.AR['DBMaxConfidence'] is not None:
db = track.AR['DBMaxConfidenceCRC']
conf = '(max confidence %3d)' % track.AR['DBMaxConfidence']
if track.AR['v1']['DBCRC'] or track.AR['v2']['DBCRC']:
status = 'rip accurate'
db = ', '.join(filter(None, (
track.AR['v1']['DBCRC'],
track.AR['v2']['DBCRC']
)))
max_conf = max(
[track.AR[v]['DBConfidence'] for v in ('v1', 'v2')]
)
if max_conf:
if max_conf < track.AR['DBMaxConfidence']:
conf = '(confidence %3d of %3d)' % (
max_conf, track.AR['DBMaxConfidence']
)
# htoa tracks (i == 0) do not have an ARCRC
if track.number == 0:
print('track 0: unknown (not tracked)')
continue
if not (track.AR['v1']['CRC'] or track.AR['v2']['CRC']):
logger.error(
'no track AR CRC on non-HTOA track %d' % track.number
)
print('track %2d: unknown (error)' % track.number)
else:
print('track %2d: %-16s %-23s v1 [%s], v2 [%s], DB [%s]' % (
track.number, status, conf,
track.AR['v1']['CRC'], track.AR['v2']['CRC'], db
))

View File

@@ -49,27 +49,3 @@ class CRC32Task(etask.Task):
self.checksum = binascii.crc32(d) & 0xffffffff
self.stop()
class FastAccurateRipChecksumTask(etask.Task):
description = 'Calculating (Fast) AccurateRip checksum'
def __init__(self, path, trackNumber, trackCount, wave, v2=False):
self.path = path
self.trackNumber = trackNumber
self.trackCount = trackCount
self._wave = wave
self._v2 = v2
self.checksum = None
def start(self, runner):
etask.Task.start(self, runner)
self.schedule(0.0, self._arc)
def _arc(self):
arc = accuraterip_checksum(self.path, self.trackNumber,
self.trackCount,
self._wave, self._v2)
self.checksum = arc
self.stop()

View File

@@ -23,12 +23,12 @@ Common functionality and class for all programs using whipper.
"""
import musicbrainzngs
import re
import os
import sys
import time
from whipper.common import common, mbngs, cache, path
from whipper.common import checksum
from whipper.common import accurip, cache, checksum, common, mbngs, path
from whipper.program import cdrdao, cdparanoia
from whipper.image import image
from whipper.extern.task import task
@@ -178,34 +178,34 @@ class Program:
template_part += ' (%s)' % metadata.barcode
return template_part
def getPath(self, outdir, template, mbdiscid, i, disambiguate=False):
def getPath(self, outdir, template, mbdiscid, metadata, track_number=None):
"""
Based on the template, get a complete path for the given track,
minus extension.
Also works for the disc name, using disc variables for the template.
Return disc or track path relative to outdir according to
template. Track paths do not include extension.
@param outdir: the directory where to write the files
@type outdir: unicode
@param template: the template for writing the file
@type template: unicode
@param i: track number (0 for HTOA, or for disc)
@type i: int
Tracks are named according to the track template, filling in
the variables and adding the file extension. Variables
exclusive to the track template are:
- %t: track number
- %a: track artist
- %n: track title
- %s: track sort name
@rtype: unicode
Disc files (.cue, .log, .m3u) are named according to the disc
template, filling in the variables and adding the file
extension. Variables for both disc and track template are:
- %A: album artist
- %S: album sort name
- %d: disc title
- %y: release year
- %r: release type, lowercase
- %R: Release type, normal case
- %x: audio extension, lowercase
- %X: audio extension, uppercase
"""
assert type(outdir) is unicode, "%r is not unicode" % outdir
assert type(template) is unicode, "%r is not unicode" % template
# the template is similar to grip, except for %s/%S/%r/%R
# see #gripswitches
# returns without extension
v = {}
v['t'] = '%02d' % i
# default values
v['A'] = 'Unknown Artist'
v['d'] = mbdiscid # fallback for title
v['r'] = 'unknown'
@@ -215,59 +215,38 @@ class Program:
v['x'] = 'flac'
v['X'] = v['x'].upper()
v['y'] = '0000'
if track_number:
v['a'] = v['A']
v['t'] = '%02d' % track_number
if track_number == 0:
v['n'] = 'Hidden Track One Audio'
else:
v['n'] = 'Unknown Track %d' % track_number
v['a'] = v['A']
if i == 0:
v['n'] = 'Hidden Track One Audio'
else:
v['n'] = 'Unknown Track %d' % i
if self.metadata:
release = self.metadata.release or '0000'
if metadata:
release = metadata.release or '0000'
v['y'] = release[:4]
v['A'] = self._filter.filter(self.metadata.artist)
v['S'] = self._filter.filter(self.metadata.sortName)
v['d'] = self._filter.filter(self.metadata.title)
v['B'] = self.metadata.barcode
v['C'] = self.metadata.catalogNumber
if self.metadata.releaseType:
v['R'] = self.metadata.releaseType
v['r'] = self.metadata.releaseType.lower()
if i > 0:
try:
v['a'] = self._filter.filter(
self.metadata.tracks[i - 1].artist)
v['s'] = self._filter.filter(
self.metadata.tracks[i - 1].sortName)
v['n'] = self._filter.filter(
self.metadata.tracks[i - 1].title)
except IndexError, e:
print 'ERROR: no track %d found, %r' % (i, e)
raise
else:
v['A'] = self._filter.filter(metadata.artist)
v['S'] = self._filter.filter(metadata.sortName)
v['d'] = self._filter.filter(metadata.title)
v['B'] = metadata.barcode
v['C'] = metadata.catalogNumber
if metadata.releaseType:
v['R'] = metadata.releaseType
v['r'] = metadata.releaseType.lower()
if track_number > 0:
v['a'] = self._filter.filter(
metadata.tracks[track_number - 1].artist)
v['s'] = self._filter.filter(
metadata.tracks[track_number - 1].sortName)
v['n'] = self._filter.filter(
metadata.tracks[track_number - 1].title)
elif track_number:
# htoa defaults to disc's artist
v['a'] = self._filter.filter(self.metadata.artist)
v['a'] = self._filter.filter(metadata.artist)
# when disambiguating, use catalogNumber then barcode
if disambiguate:
templateParts = template.split(os.sep)
# Find the section of the template with the release name
for i, part in enumerate(templateParts):
if "%d" in part:
templateParts[i] = self.addDisambiguation(part, self.metadata) # noqa: E501
break
else:
# No parts of the template contain the release
templateParts[-1] = self.addDisambiguation(templateParts[-1], self.metadata) # noqa: E501
template = os.path.join(*templateParts)
logger.debug('Disambiguated template to %r' % template)
import re
template = re.sub(r'%(\w)', r'%(\1)s', template)
ret = os.path.join(outdir, template % v)
return ret
return os.path.join(outdir, template % v)
def getCDDB(self, cddbdiscid):
"""
@@ -579,118 +558,65 @@ class Program:
t = image.ImageRetagTask(cueImage, taglists)
runner.run(t)
def verifyImage(self, runner, responses):
def verifyImage(self, runner, ittoc, itable):
"""
verify ittoc against accuraterip and cue_path track lengths
Verify our image against the given AccurateRip responses.
Needs an initialized self.result.
Will set accurip and friends on each TrackResult.
Populates self.result.tracks with above TrackResults.
"""
logger.debug('verifying Image against %d AccurateRip responses',
len(responses or []))
cueImage = image.Image(self.cuePath)
# assigns track lengths
verifytask = image.ImageVerifyTask(cueImage)
cuetask = image.AccurateRipChecksumTask(cueImage)
runner.run(verifytask)
runner.run(cuetask)
self._verifyImageWithChecksums(responses, cuetask.checksums)
def _verifyImageWithChecksums(self, responses, checksums):
# loop over tracks to set our calculated AccurateRip CRC's
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
trackResult.ARCRC = csum
if verifytask.exception:
logger.error(verifytask.exceptionMessage)
return False
responses = accurip.get_db_entry(ittoc.accuraterip_path())
if not responses:
logger.warning('No AccurateRip responses, cannot verify.')
return
logger.warning('album not found in AccurateRip database')
return False
logger.info('%d AccurateRip response(s) found' % len(responses))
# now loop to match responses
for i, csum in enumerate(checksums):
trackResult = self.result.getTrackResult(i + 1)
for r in responses:
if r.cddbDiscId != itable.getCDDBDiscId():
logger.error(
'AccurateRip response discid differs: %s' % r.cddbDiscId
)
return False
confidence = None
response = None
checksums = accurip.calculate_checksums([
os.path.join(os.path.dirname(self.cuePath), t.indexes[1].path)
for t in filter(lambda t: t.number != 0, cueImage.cue.table.tracks)
])
if not (checksums and any(checksums['v1']) and any(checksums['v2'])):
return False
return accurip.verify_result(self.result, responses, checksums)
# match against each response's checksum for this track
for j, r in enumerate(responses):
if "%08x" % csum == r.checksums[i]:
response = r
logger.debug(
"Track %02d matched response %d of %d in "
"AccurateRip database",
i + 1, j + 1, len(responses))
trackResult.accurip = True
# FIXME: maybe checksums should be ints
trackResult.ARDBCRC = int(r.checksums[i], 16)
# arsum = csum
confidence = r.confidences[i]
trackResult.ARDBConfidence = confidence
def write_m3u(self, discname, htoapath):
m3uPath = u'%s.m3u' % discname
with open(m3uPath, 'w') as f:
f.write(u'#EXTM3U\n')
if not trackResult.accurip:
logger.warning("Track %02d: not matched in "
"AccurateRip database", i + 1)
def writeFile(path, length):
target_path = common.getRelativePath(path, m3uPath)
f.write(u'#EXTINF:%d,%s\n' % (length, target_path))
f.write('%s\n' % target_path)
# I have seen AccurateRip responses with 0 as confidence
# for example, Best of Luke Haines, disc 1, track 1
maxConfidence = -1
maxResponse = None
for r in responses:
if r.confidences[i] > maxConfidence:
maxConfidence = r.confidences[i]
maxResponse = r
if htoapath:
writeFile(htoapath,
self.result.table.getTrackStart(1) /
common.FRAMES_PER_SECOND)
logger.debug('Track %02d: found max confidence %d' % (
i + 1, maxConfidence))
trackResult.ARDBMaxConfidence = maxConfidence
if not response:
logger.warning('Track %02d: none of the responses matched.',
i + 1)
trackResult.ARDBCRC = int(
maxResponse.checksums[i], 16)
else:
trackResult.ARDBCRC = int(response.checksums[i], 16)
# TODO MW: Update this further for ARv2 code
def getAccurateRipResults(self):
"""
@rtype: list of str
"""
res = []
# loop over tracks
for i, trackResult in enumerate(self.result.tracks):
status = 'rip NOT accurate'
if trackResult.accurip:
status = 'rip accurate '
c = "(not found) "
ar = ", DB [notfound]"
if trackResult.ARDBMaxConfidence:
c = "(max confidence %3d)" % trackResult.ARDBMaxConfidence
if trackResult.ARDBConfidence is not None:
if trackResult.ARDBConfidence \
< trackResult.ARDBMaxConfidence:
c = "(confidence %3d of %3d)" % (
trackResult.ARDBConfidence,
trackResult.ARDBMaxConfidence)
ar = ", DB [%08x]" % trackResult.ARDBCRC
# htoa tracks (i == 0) do not have an ARCRC
if trackResult.ARCRC is None:
assert trackResult.number == 0, \
'no trackResult.ARCRC on non-HTOA track %d' % \
trackResult.number
res.append("Track 0: unknown (not tracked)")
else:
res.append("Track %2d: %s %s [%08x]%s" % (
trackResult.number, status, c, trackResult.ARCRC, ar))
return res
for i, track in enumerate(self.result.tracks):
path = track.filename
writeFile(track.filename,
(self.result.table.getTrackLength(i + 1) /
common.FRAMES_PER_SECOND))
def writeCue(self, discName):
assert self.result.table.canCue()

View File

@@ -108,47 +108,6 @@ class Image(object):
logger.debug('setup image done')
class AccurateRipChecksumTask(task.MultiSeparateTask):
"""
I calculate the AccurateRip checksums of all tracks.
"""
description = "Checksumming tracks"
# TODO MW: Update this further for V2 code
def __init__(self, image):
task.MultiSeparateTask.__init__(self)
self._image = image
cue = image.cue
self.checksums = []
logger.debug('Checksumming %d tracks' % len(cue.table.tracks))
for trackIndex, track in enumerate(cue.table.tracks):
index = track.indexes[1]
length = cue.getTrackLength(track)
if length < 0:
logger.debug('track %d has unknown length' %
(trackIndex + 1, ))
else:
logger.debug('track %d is %d samples long' % (
trackIndex + 1, length))
path = image.getRealPath(index.path)
checksumTask = checksum.FastAccurateRipChecksumTask(
path,
trackNumber=trackIndex + 1,
trackCount=len(cue.table.tracks),
wave=True, v2=False)
self.addTask(checksumTask)
def stop(self):
self.checksums = [t.checksum for t in self.tasks]
task.MultiSeparateTask.stop(self)
class ImageVerifyTask(task.MultiSeparateTask):
"""
I verify a disk image and get the necessary track lengths.

View File

@@ -475,51 +475,6 @@ class Table(object):
logger.debug('MusicBrainz values: %r', result)
return result
def getAccurateRipIds(self):
"""
Calculate the two AccurateRip ID's.
@returns: the two 8-character hexadecimal disc ID's
@rtype: tuple of (str, str)
"""
# AccurateRip does not take into account data tracks,
# but does count the data track to determine the leadout offset
discId1 = 0
discId2 = 0
for track in self.tracks:
if not track.audio:
continue
offset = self.getTrackStart(track.number)
discId1 += offset
discId2 += (offset or 1) * track.number
# also add end values, where leadout offset is one past the end
# of the last track
last = self.tracks[-1]
offset = self.getTrackEnd(last.number) + 1
discId1 += offset
discId2 += offset * (self.getAudioTracks() + 1)
discId1 &= 0xffffffff
discId2 &= 0xffffffff
return ("%08x" % discId1, "%08x" % discId2)
def getAccurateRipURL(self):
"""
Return the full AccurateRip URL.
@returns: the AccurateRip URL
@rtype: str
"""
discId1, discId2 = self.getAccurateRipIds()
return "http://www.accuraterip.com/accuraterip/" \
"%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId())
def cue(self, cuePath='', program='whipper'):
"""
@param cuePath: path to the cue file to be written. If empty,
@@ -851,6 +806,41 @@ class Table(object):
return True
def accuraterip_ids(self):
"""
returns both AccurateRip disc ids as a tuple of 8-char
hexadecimal strings (discid1, discid2)
"""
# AccurateRip does not take into account data tracks,
# but does count the data track to determine the leadout offset
discId1 = 0
discId2 = 0
for track in self.tracks:
if not track.audio:
continue
offset = self.getTrackStart(track.number)
discId1 += offset
discId2 += (offset or 1) * track.number
# also add end values, where leadout offset is one past the end
# of the last track
offset = self.getTrackEnd(self.tracks[-1].number) + 1
discId1 += offset
discId2 += offset * (self.getAudioTracks() + 1)
discId1 &= 0xffffffff
discId2 &= 0xffffffff
return ("%08x" % discId1, "%08x" % discId2)
def accuraterip_path(self):
discId1, discId2 = self.accuraterip_ids()
return "%s/%s/%s/dBAR-%.3d-%s-%s-%s.bin" % (
discId1[-1], discId1[-2], discId1[-3],
self.getAudioTracks(), discId1, discId2, self.getCDDBDiscId()
)
def canCue(self):
"""
Check if this table can be used to generate a .cue file

View File

@@ -6,22 +6,25 @@ logger = logging.getLogger(__name__)
ARB = 'accuraterip-checksum'
FLAC = 'flac'
def _execute(cmd, **redirects):
logger.debug('executing %r', cmd)
return Popen(cmd, **redirects)
def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
def accuraterip_checksum(f, track_number, total_tracks, wave=False, v2=False):
v = '--accuraterip-v1'
if v2:
v = '--accuraterip-v2'
track, tracks = str(track), str(tracks)
track_number, total_tracks = str(track_number), str(total_tracks)
if not wave:
flac = Popen([FLAC, '-cds', f], stdout=PIPE)
arc = Popen([ARB, v, '/dev/stdin', track, tracks],
stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
if wave:
cmd = [ARB, v, f, track_number, total_tracks]
redirects = dict(stdout=PIPE, stderr=PIPE)
else:
arc = Popen([ARB, v, f, track, tracks],
stdout=PIPE, stderr=PIPE)
flac = _execute([FLAC, '-cds', f], stdout=PIPE)
cmd = [ARB, v, '/dev/stdin', track_number, total_tracks]
redirects = dict(stdin=flac.stdout, stdout=PIPE, stderr=PIPE)
arc = _execute(cmd, **redirects)
if not wave:
flac.stdout.close()
@@ -30,23 +33,24 @@ def accuraterip_checksum(f, track, tracks, wave=False, v2=False):
if not wave:
flac.wait()
flac_rc = flac.returncode
if flac.returncode != 0:
logger.warning(
'ARC calculation failed: flac return code is non zero: %r' %
flac.returncode
)
return None
arc_rc = arc.returncode
if not wave and flac_rc != 0:
logger.warning('ARC calculation failed: flac return code is non zero')
if arc.returncode != 0:
logger.warning(
'ARC calculation failed: arc return code is non zero: %r' %
arc.returncode
)
return None
if arc_rc != 0:
logger.warning('ARC calculation failed: arc return code is non zero')
return None
out = out.strip()
try:
outh = int('0x%s' % out, base=16)
checksum = int('0x%s' % out.strip(), base=16)
logger.debug('returned %r', checksum)
return checksum
except ValueError:
logger.warning('ARC output is not usable')
return None
return outh

View File

@@ -202,23 +202,29 @@ class WhipperLogger(result.Logger):
lines.append(" Copy CRC: %08X" % trackResult.copycrc)
# AccurateRip track status
# Currently there's no support for AccurateRip V2
if trackResult.accurip:
lines.append(" AccurateRip V1:")
self._inARDatabase += 1
if trackResult.ARCRC == trackResult.ARDBCRC:
lines.append(" Result: Found, exact match")
self._accuratelyRipped += 1
else:
lines.append(" Result: Found, NO exact match")
lines.append(" Confidence: %d" %
trackResult.ARDBConfidence)
lines.append(" Local CRC: %08X" % trackResult.ARCRC)
lines.append(" Remote CRC: %08X" % trackResult.ARDBCRC)
elif trackResult.number != 0:
lines.append(" AccurateRip V1:")
lines.append(" Result: Track not present in "
"AccurateRip database")
for v in ('v1', 'v2'):
if trackResult.AR[v]['DBCRC']:
lines.append(" AccurateRip %s:" % v)
self._inARDatabase += 1
if trackResult.AR[v]['CRC'] == trackResult.AR[v]['DBCRC']:
lines.append(" Result: Found, exact match")
self._accuratelyRipped += 1
else:
lines.append(" Result: Found, NO exact match")
lines.append(
" Confidence: %d" % trackResult.AR[v]['DBConfidence']
)
lines.append(
" Local CRC: %s" % trackResult.AR[v]['CRC'].upper()
)
lines.append(
" Remote CRC: %s" % trackResult.AR[v]['DBCRC'].upper()
)
elif trackResult.number != 0:
lines.append(" AccurateRip %s:" % v)
lines.append(
" Result: Track not present in AccurateRip database"
)
# Check if Test & Copy CRCs are equal
if trackResult.testcrc == trackResult.copycrc:

View File

@@ -23,56 +23,46 @@ import time
class TrackResult:
"""
@type filename: unicode
@ivar testcrc: 4-byte CRC for the test read
@type testcrc: int
@ivar copycrc: 4-byte CRC for the copy read
@type copycrc: int
@var accurip: whether this track's AR CRC was found in the
database, and thus whether the track is considered
accurately ripped.
If false, it can be ripped wrong, not exist in
the database, ...
@type accurip: bool
@var ARCRC: our calculated 4 byte AccurateRip CRC for this
track.
@type ARCRC: int
@var ARDBCRC: the 4-byte AccurateRip CRC this
track did or should have matched in the database.
If None, the track is not in the database.
@type ARDBCRC: int
@var ARDBConfidence: confidence for the matched AccurateRip CRC for
this track in the database.
If None, the track is not in the database.
@var ARDBMaxConfidence: maximum confidence in the AccurateRip database for
this track; can still be 0.
If None, the track is not in the database.
"""
number = None
filename = None
pregap = 0 # in frames
pre_emphasis = None
peak = 0.0
quality = 0.0
testspeed = 0.0
copyspeed = 0.0
testduration = 0.0
copyduration = 0.0
# 4 byte CRCs for the test and copy reads
testcrc = None
copycrc = None
accurip = False # whether it's in the database
ARCRC = None
ARDBCRC = None
ARDBConfidence = None
ARDBMaxConfidence = None
AR = None
classVersion = 3
def __init__(self):
"""
CRC: calculated 4 byte AccurateRip CRC
DBCRC: 4 byte AccurateRip CRC from the AR database
DBConfidence: confidence for the matched AccurateRip DB CRC
DBMaxConfidence: track's maximum confidence in the AccurateRip DB
DBMaxConfidenceCRC: maximum confidence CRC
"""
self.AR = {
'v1': {
'CRC': None,
'DBCRC': None,
'DBConfidence': None,
},
'v2': {
'CRC': None,
'DBCRC': None,
'DBConfidence': None,
},
'DBMaxConfidence': None,
'DBMaxConfidenceCRC': None,
}
class RipResult:
"""

View File

@@ -1,31 +1,313 @@
# -*- Mode: Python; test-case-name: whipper.test.test_common_accurip -*-
# vi:si:et:sw=4:sts=4:ts=4
import os
import sys
from StringIO import StringIO
from os import chmod, makedirs
from os.path import dirname, exists, join
from shutil import copy, rmtree
from tempfile import mkdtemp
from unittest import TestCase
from whipper.common import accurip
from whipper.test import common as tcommon
from whipper.common.accurip import (
calculate_checksums, get_db_entry, print_report, verify_result,
_split_responses
)
from whipper.result.result import RipResult, TrackResult
class AccurateRipResponseTestCase(tcommon.TestCase):
class TestAccurateRipResponse(TestCase):
@classmethod
def setUpClass(cls):
cls.path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
cls.entry = _split_responses(
open(join(dirname(__file__), cls.path[6:])).read()
)
cls.other_path = '4/8/2/dBAR-011-0010e284-009228a3-9809ff0b.bin'
def testResponse(self):
path = os.path.join(os.path.dirname(__file__),
'dBAR-011-0010e284-009228a3-9809ff0b.bin')
data = open(path, "rb").read()
def setUp(self):
self.cache_dir = mkdtemp(suffix='whipper_accurip_cache_test')
accurip._CACHE_DIR = self.cache_dir
def cleanup(cachedir):
chmod(cachedir, 0755)
rmtree(cachedir)
self.addCleanup(cleanup, self.cache_dir)
responses = accurip.getAccurateRipResponses(data)
self.assertEquals(len(responses), 3)
def test_uses_cache_dir(self):
# copy normal entry into other entry's place
makedirs(dirname(join(self.cache_dir, self.other_path)))
copy(
join(dirname(__file__), self.path[6:]),
join(self.cache_dir, self.other_path)
)
# ask cache for other entry and assert cached entry equals normal entry
self.assertEquals(self.entry, get_db_entry(self.other_path))
response = responses[0]
def test_returns_none_for_no_entry(self):
self.assertIsNone(get_db_entry('definitely_a_404'))
self.assertEquals(response.trackCount, 11)
self.assertEquals(response.discId1, "0010e284")
self.assertEquals(response.discId2, "009228a3")
self.assertEquals(response.cddbDiscId, "9809ff0b")
def test_can_return_entry_without_saving(self):
chmod(self.cache_dir, 0)
self.assertEqual(get_db_entry(self.path), self.entry)
chmod(self.cache_dir, 0755)
self.assertFalse(exists(join(self.cache_dir, self.path)))
for i in range(11):
self.assertEquals(response.confidences[i], 35)
self.assertEquals(response.checksums[0], "beea32c8")
self.assertEquals(response.checksums[10], "acee98ca")
def test_retrieves_and_saves_accuraterip_entry(self):
# for path, entry in zip(self.paths[0], self.entries):
self.assertFalse(exists(join(self.cache_dir, self.path)))
self.assertEquals(get_db_entry(self.path), self.entry)
self.assertTrue(exists(join(self.cache_dir, self.path)))
def test_AccurateRipResponse_parses_correctly(self):
responses = get_db_entry(self.path)
self.assertEquals(len(responses), 2)
self.assertEquals(responses[0].num_tracks, 2)
self.assertEquals(responses[0].discId1, '0000f21c')
self.assertEquals(responses[0].discId2, '00027ef8')
self.assertEquals(responses[0].cddbDiscId, '05021002')
self.assertEquals(responses[0].confidences[0], 12)
self.assertEquals(responses[0].confidences[1], 20)
self.assertEquals(responses[0].checksums[0], '284fc705')
self.assertEquals(responses[0].checksums[1], '9cc1f32e')
self.assertEquals(responses[1].num_tracks, 2)
self.assertEquals(responses[1].discId1, '0000f21c')
self.assertEquals(responses[1].discId2, '00027ef8')
self.assertEquals(responses[1].cddbDiscId, '05021002')
self.assertEquals(responses[1].confidences[0], 4)
self.assertEquals(responses[1].confidences[1], 4)
self.assertEquals(responses[1].checksums[0], 'dc77f9ab')
self.assertEquals(responses[1].checksums[1], 'dd97d2c3')
# XXX: test arc.py
class TestCalculateChecksums(TestCase):
def test_returns_none_for_bad_files(self):
paths = ['/does/not/exist']
self.assertEquals(
calculate_checksums(['/does/not/exist']),
{'v1': [None], 'v2': [None]}
)
# TODO: test success when file exists
class TestVerifyResult(TestCase):
@classmethod
def setUpClass(cls):
path = 'c/1/2/dBAR-002-0000f21c-00027ef8-05021002.bin'
cls.responses = _split_responses(
open(join(dirname(__file__), path[6:])).read()
)
cls.checksums = {
'v1': ['284fc705', '9cc1f32e'],
'v2': ['dc77f9ab', 'dd97d2c3'],
}
def setUp(self):
RipResult
self.result = RipResult()
for n in range(1, 2+1):
track = TrackResult()
track.number = n
self.result.tracks.append(track)
def test_empty_result_returns_false(self):
self.assertEquals(
verify_result(RipResult(), self.responses, self.checksums),
False
)
def test_empty_responses_returns_false(self):
self.assertEquals(
verify_result(self.result, [], self.checksums),
False
)
# XXX: would this happen?
def test_empty_checksums_returns_false(self):
self.assertEquals(
verify_result(self.result, self.responses, {}),
False
)
def test_wrong_checksums_returns_false(self):
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['deadbeef', '89abcdef'],
'v2': ['76543210', '01234567']
}),
False
)
def test_incomplete_checksums(self):
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', '9cc1f32e'],
'v2': [None, 'dd97d2c3'],
}),
True
)
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', None],
'v2': ['dc77f9ab', 'dd97d2c3'],
}),
True
)
self.assertEquals(
verify_result(self.result, self.responses, {
'v1': ['284fc705', None],
'v2': [None, 'dd97d2c3'],
}),
True
)
def test_matches_only_v1_or_v2_responses(self):
self.assertEquals(
verify_result(
self.result, [self.responses[0]], self.checksums
),
True
)
self.assertEquals(
verify_result(
self.result, [self.responses[1]], self.checksums
),
True
)
def test_passes_with_htoa(self):
htoa = TrackResult()
htoa.number = 0
self.result.tracks.append(htoa)
self.assertEquals(
verify_result(self.result, self.responses, self.checksums),
True
)
def test_stores_accuraterip_results_on_result(self):
self.assertEquals(
verify_result(self.result, self.responses, self.checksums),
True
)
self.assertEquals(self.result.tracks[0].AR, {
'v1': {
'CRC': '284fc705',
'DBCRC': '284fc705',
'DBConfidence': 12,
},
'v2': {
'CRC': 'dc77f9ab',
'DBCRC': 'dc77f9ab',
'DBConfidence': 4,
},
'DBMaxConfidence': 12,
'DBMaxConfidenceCRC': '284fc705',
})
self.assertEquals(self.result.tracks[1].AR, {
'v1': {
'CRC': '9cc1f32e',
'DBCRC': '9cc1f32e',
'DBConfidence': 20,
},
'v2': {
'CRC': 'dd97d2c3',
'DBCRC': 'dd97d2c3',
'DBConfidence': 4,
},
'DBMaxConfidence': 20,
'DBMaxConfidenceCRC': '9cc1f32e',
})
class TestAccurateRipReport(TestCase):
def setUp(self):
sys.stdout = StringIO()
self.result = RipResult()
track = TrackResult()
track.number = 1
track.AR = {
'v1': {
'CRC': '284fc705',
'DBCRC': '284fc705',
'DBConfidence': 12,
},
'v2': {
'CRC': 'dc77f9ab',
'DBCRC': 'dc77f9ab',
'DBConfidence': 4,
},
'DBMaxConfidence': 12,
'DBMaxConfidenceCRC': '284fc705',
}
self.result.tracks.append(track)
def tearDown(self):
sys.stdout = sys.__stdout__
def test_report_no_result(self):
track = TrackResult()
track.number = 1
self.result.tracks[0] = track
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: unknown (error)\n'
)
def test_track_not_found(self):
self.result.tracks[0].AR['DBMaxConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip NOT accurate (not found) '
' v1 [284fc705], v2 [dc77f9ab], DB [notfound]\n'
)
def test_htoa_not_tracked(self):
self.result.tracks[0].number = 0
self.result.tracks[0].AR['v1']['CRC'] = None
self.result.tracks[0].AR['v2']['CRC'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 0: unknown (not tracked)\n'
)
def test_report_v1_only(self):
self.result.tracks[0].AR['v2']['DBCRC'] = None
self.result.tracks[0].AR['v2']['DBConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (max confidence 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705]\n'
)
def test_report_v2_only(self):
self.result.tracks[0].AR['v1']['DBCRC'] = None
self.result.tracks[0].AR['v1']['DBConfidence'] = None
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (confidence 4 of 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [dc77f9ab]\n'
)
def test_report_v1_and_v2_max_confidence(self):
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (max confidence 12)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
)
def test_report_v1_and_v2(self):
self.result.tracks[0].AR['DBMaxConfidence'] = 66
print_report(self.result)
self.assertEquals(
sys.stdout.getvalue(),
'track 1: rip accurate (confidence 12 of 66)'
' v1 [284fc705], v2 [dc77f9ab], DB [284fc705, dc77f9ab]\n'
)

View File

@@ -12,86 +12,13 @@ from whipper.common import program, accurip, mbngs, config
from whipper.command.cd import DEFAULT_DISC_TEMPLATE
class TrackImageVerifyTestCase(unittest.TestCase):
# example taken from a rip of Luke Haines Is Dead, disc 1
# AccurateRip database has 0 confidence for 1st track
# Rip had a wrong result for track 9
def testVerify(self):
path = os.path.join(os.path.dirname(__file__),
'dBAR-020-002e5023-029d8e49-040eaa14.bin')
data = open(path, "rb").read()
responses = accurip.getAccurateRipResponses(data)
# these crc's were calculated from an actual rip
checksums = [1644890007, 2945205445, 3983436658, 1528082495,
1203704270, 1163423644, 3649097244, 100524219,
1583356174, 373652058, 1842579359, 2850056507,
1329730252, 2526965856, 2525886806, 209743350,
3184062337, 2099956663, 2943874164, 2321637196]
prog = program.Program(config.Config())
prog.result = result.RipResult()
# fill it with empty trackresults
for i, c in enumerate(checksums):
r = result.TrackResult()
r.number = i + 1
prog.result.tracks.append(r)
prog._verifyImageWithChecksums(responses, checksums)
# now check if the results were filled in properly
tr = prog.result.getTrackResult(1)
self.assertEquals(tr.accurip, False)
self.assertEquals(tr.ARDBMaxConfidence, 0)
self.assertEquals(tr.ARDBCRC, 0)
self.assertEquals(tr.ARDBCRC, 0)
tr = prog.result.getTrackResult(2)
self.assertEquals(tr.accurip, True)
self.assertEquals(tr.ARDBMaxConfidence, 2)
self.assertEquals(tr.ARDBCRC, checksums[2 - 1])
tr = prog.result.getTrackResult(10)
self.assertEquals(tr.accurip, False)
self.assertEquals(tr.ARDBMaxConfidence, 2)
# we know track 10 was ripped wrong
self.assertNotEquals(tr.ARDBCRC, checksums[10 - 1])
res = prog.getAccurateRipResults()
self.assertEquals(res[1 - 1],
"Track 1: rip NOT accurate (not found) "
"[620b0797], DB [notfound]")
self.assertEquals(res[2 - 1],
"Track 2: rip accurate (max confidence 2) "
"[af8c44c5], DB [af8c44c5]")
self.assertEquals(res[10 - 1],
"Track 10: rip NOT accurate (max confidence 2) "
"[16457a5a], DB [eb6e55b4]")
class HTOATestCase(unittest.TestCase):
def setUp(self):
path = os.path.join(os.path.dirname(__file__),
'silentalarm.result.pickle')
self._tracks = pickle.load(open(path, 'rb'))
def testGetAccurateRipResults(self):
prog = program.Program(config.Config())
prog.result = result.RipResult()
prog.result.tracks = self._tracks
prog.getAccurateRipResults()
class PathTestCase(unittest.TestCase):
def testStandardTemplateEmpty(self):
prog = program.Program(config.Config())
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
'mbdiscid', 0)
'mbdiscid', None)
self.assertEquals(path,
unicode('/tmp/unknown/Unknown Artist - mbdiscid/'
'Unknown Artist - mbdiscid'))
@@ -101,10 +28,9 @@ class PathTestCase(unittest.TestCase):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
prog.metadata = md
path = prog.getPath(u'/tmp', DEFAULT_DISC_TEMPLATE,
'mbdiscid', 0)
'mbdiscid', md, 0)
self.assertEquals(path,
unicode('/tmp/unknown/Jeff Buckley - Grace/'
'Jeff Buckley - Grace'))
@@ -114,92 +40,7 @@ class PathTestCase(unittest.TestCase):
md = mbngs.DiscMetadata()
md.artist = md.sortName = 'Jeff Buckley'
md.title = 'Grace'
prog.metadata = md
path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', 0)
path = prog.getPath(u'/tmp', u'%A/%d', 'mbdiscid', md, 0)
self.assertEquals(path,
u'/tmp/Jeff Buckley/Grace')
def testDisambiguateOnRelease(self):
"""Test that disambiguation gets placed in the same part of the path
as the release name.
See https://github.com/JoeLametta/whipper/issues/127"""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
templates = {
u'%A/%d - %y': u'Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
u'%A - %d - %y': u'Guy Davis - Call Down the Thunder - 1996 (RHR CD 89)', # noqa: E501
u'%A/%y/%d': u'Guy Davis/1996/Call Down the Thunder (RHR CD 89)',
u'%y/%d/%A': u'1996/Call Down the Thunder (RHR CD 89)/Guy Davis',
u'%d/%A/%y': u'Call Down the Thunder (RHR CD 89)/Guy Davis/1996',
}
for template, expected_path in templates.iteritems():
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path, u'/tmp/' + expected_path)
def testDisambiguateOnReleaseOnlyOnce(self):
"""Test that disambiguation gets added only once."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
template = u'%A/%d - %y/%d/%d'
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path,
u'/tmp/Guy Davis/Call Down the Thunder - 1996 (RHR CD 89)/Call Down the Thunder/Call Down the Thunder') # noqa: E501
def testDisambiguateOnNoReleaseTitle(self):
"""Test that disambiguation gets added even if there's no release
title in the template."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
md.artist = 'Guy Davis'
md.sortName = 'Davis, Guy'
md.title = 'Call Down the Thunder'
md.release = '1996'
md.catalogNumber = 'RHR CD 89'
prog.metadata = md
templates = {
u'%A/%y': u'Guy Davis/1996 (RHR CD 89)',
u'%A - %y': u'Guy Davis - 1996 (RHR CD 89)',
u'%y/%A': u'1996/Guy Davis (RHR CD 89)',
}
for template, expected_path in templates.iteritems():
path = prog.getPath(u'/tmp', template, 'mbdiscid', 0, disambiguate=True) # noqa: E501
self.assertEquals(path, u'/tmp/' + expected_path)
def testAddDisambiguationUnitTest(self):
"""Unit test for Program.addDisambiguation()."""
prog = program.Program(config.Config())
md = mbngs.DiscMetadata()
# No relevant disambiguation metadata
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test')
# Only barcode available
md.barcode = '033651008927'
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test (033651008927)')
# Both catalog number and barcode available
md.catalogNumber = 'RHR CD 89'
self.assertEquals(
prog.addDisambiguation(u'Test', md),
u'Test (RHR CD 89)')

View File

@@ -62,11 +62,10 @@ class LadyhawkeTestCase(tcommon.TestCase):
"KnpGsLhvH.lPrNc1PBL21lb9Bg4-")
def testAccurateRip(self):
self.assertEquals(self.table.getAccurateRipIds(), (
self.assertEquals(self.table.accuraterip_ids(), (
"0013bd5a", "00b8d489"))
self.assertEquals(self.table.getAccurateRipURL(),
"http://www.accuraterip.com/accuraterip/a/5/d/"
"dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
self.assertEquals(self.table.accuraterip_path(),
"a/5/d/dBAR-012-0013bd5a-00b8d489-c60af50d.bin")
def testDuration(self):
self.assertEquals(self.table.duration(), 2761413)

View File

@@ -89,8 +89,8 @@ class CureTestCase(common.TestCase):
common.diffStrings(ref, cue)
# we verify it because it has failed in readdisc in the past
self.assertEquals(self.toc.table.getAccurateRipURL(),
'http://www.accuraterip.com/accuraterip/3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin') # noqa: E501
self.assertEquals(self.toc.table.accuraterip_path(),
'3/c/4/dBAR-013-0019d4c3-00fe8924-b90c650d.bin')
def testGetRealPath(self):
self.assertRaises(KeyError, self.toc.getRealPath, u'track01.wav')
@@ -164,8 +164,8 @@ class BlocTestCase(common.TestCase):
def testAccurateRip(self):
# we verify it because it has failed in readdisc in the past
self.assertEquals(self.toc.table.getAccurateRipURL(),
'http://www.accuraterip.com/accuraterip/e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin') # noqa: E501
self.assertEquals(self.toc.table.accuraterip_path(),
'e/d/2/dBAR-013-001af2de-0105994e-ad0be00d.bin')
# The Breeders - Mountain Battles has CDText