# -*- Mode: Python; test-case-name: whipper.test.test_common_program -*- # vi:si:et:sw=4:sts=4:ts=4 # Copyright (C) 2009, 2010, 2011 Thomas Vander Stichele # This file is part of whipper. # # whipper is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # whipper is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with whipper. If not, see . """Common functionality and class for all programs using whipper.""" import musicbrainzngs import re import os import sys import time from whipper.common import accurip, cache, checksum, common, mbngs, path from whipper.program import cdrdao, cdparanoia from whipper.image import image from whipper.extern.task import task import logging logger = logging.getLogger(__name__) # FIXME: should Program have a runner ? class Program: """I maintain program state and functionality. :cvar cuePath: :vartype cuePath: :cvar logPath: :vartype logPath: :cvar metadata: :vartype metadata: L{mbngs.DiscMetadata} :cvar outdir: :vartype outdir: unicode :cvar result: the rip's result :vartype result: L{result.RipResult} :ivar config: :vartype config: :ivar record: whether to record results of API calls for playback. :vartype record: :param stdout: (Default value = sys.stdout). :type stdout: :ivar cache: :vartype cache: """ cuePath = None logPath = None metadata = None outdir = None result = None _stdout = None def __init__(self, config, record=False, stdout=sys.stdout): self._record = record self._cache = cache.ResultCache() self._stdout = stdout self._config = config d = {} for key, default in { 'fat': True, 'special': False }.items(): value = None value = self._config.getboolean('main', 'path_filter_' + key) if value is None: value = default d[key] = value self._filter = path.PathFilter(**d) def setWorkingDirectory(self, workingDirectory): if workingDirectory: logger.info('Changing to working directory %s' % workingDirectory) os.chdir(workingDirectory) def getFastToc(self, runner, device): """Retrieve the normal TOC table from a toc pickle or the drive. Also retrieves the cdrdao version """ def function(r, t): r.run(t) ptoc = cache.Persister() if not ptoc.object: from pkg_resources import parse_version as V version = cdrdao.getCDRDAOVersion() if V(version) < V('1.2.3rc2'): sys.stdout.write('Warning: cdrdao older than 1.2.3 has a ' 'pre-gap length bug.\n' 'See http://sourceforge.net/tracker/?func=detail&aid=604751&group_id=2171&atid=102171\n') # noqa: E501 t = cdrdao.ReadTOCTask(device) ptoc.persist(t.table) toc = ptoc.object assert toc.hasTOC() return toc def getTable(self, runner, cddbdiscid, mbdiscid, device, offset): """Retrieve the Table either from the cache or the drive. :param runner: :type runner: :param cddbdiscid: :type cddbdiscid: :param mbdiscid: :type mbdiscid: :param device: :type device: :param offset: :type offset: :returns: :rtype: L{table.Table} """ tcache = cache.TableCache() ptable = tcache.get(cddbdiscid, mbdiscid) itable = None tdict = {} # Ignore old cache, since we do not know what offset it used. if type(ptable.object) is dict: tdict = ptable.object if offset in tdict: itable = tdict[offset] if not itable: logger.debug('getTable: cddbdiscid %s, mbdiscid %s not ' 'in cache for offset %s, reading table' % ( cddbdiscid, mbdiscid, offset)) t = cdrdao.ReadTableTask(device) itable = t.table tdict[offset] = itable ptable.persist(tdict) logger.debug('getTable: read table %r' % itable) else: logger.debug('getTable: cddbdiscid %s, mbdiscid %s in cache ' 'for offset %s' % (cddbdiscid, mbdiscid, offset)) logger.debug('getTable: loaded table %r' % itable) assert itable.hasTOC() self.result.table = itable logger.debug('getTable: returning table with mb id %s' % itable.getMusicBrainzDiscId()) return itable def getRipResult(self, cddbdiscid): """Retrieve the persistable RipResult. RipResult is either retrieved from our cache (from a previous, possibly aborted rip), or return a new one. :param cddbdiscid: :type cddbdiscid: :returns: :rtype: L{result.RipResult} """ assert self.result is None self._presult = self._cache.getRipResult(cddbdiscid) self.result = self._presult.object return self.result def saveRipResult(self): self._presult.persist() def addDisambiguation(self, template_part, metadata): """Add disambiguation to template path part string.""" if metadata.catalogNumber: template_part += ' (%s)' % metadata.catalogNumber elif metadata.barcode: template_part += ' (%s)' % metadata.barcode return template_part def getPath(self, outdir, template, mbdiscid, metadata, track_number=None): """Return disc or track path relative to outdir according to template. Track paths do not include extension. Tracks are named according to the track template, filling in the variables and adding the file extension. Variables exclusive to the track template are: * %t: track number. * %a: track artist. * %n: track title. * %s: track sort name. Disc files (.cue, .log, .m3u) are named according to the disc template, filling in the variables and adding the file extension. Variables for both disc and track template are: * %A: album artist. * %S: album sort name. * %d: disc title. * %y: release year. * %r: release type, lowercase. * %R: Release type, normal case. * %x: audio extension, lowercase. * %X: audio extension, uppercase. :param outdir: :type outdir: :param template: :type template: :param mbdiscid: :type mbdiscid: :param metadata: :type metadata: :param track_number: (Default value = None) :type track_number: """ assert type(outdir) is unicode, "%r is not unicode" % outdir assert type(template) is unicode, "%r is not unicode" % template v = {} v['A'] = 'Unknown Artist' v['d'] = mbdiscid # fallback for title v['r'] = 'unknown' v['R'] = 'Unknown' v['B'] = '' # barcode v['C'] = '' # catalog number v['x'] = 'flac' v['X'] = v['x'].upper() v['y'] = '0000' if track_number is not None: v['a'] = v['A'] v['t'] = '%02d' % track_number if track_number == 0: v['n'] = 'Hidden Track One Audio' else: v['n'] = 'Unknown Track %d' % track_number if metadata: release = metadata.release or '0000' v['y'] = release[:4] v['A'] = self._filter.filter(metadata.artist) v['S'] = self._filter.filter(metadata.sortName) v['d'] = self._filter.filter(metadata.title) v['B'] = metadata.barcode v['C'] = metadata.catalogNumber if metadata.releaseType: v['R'] = metadata.releaseType v['r'] = metadata.releaseType.lower() if track_number > 0: v['a'] = self._filter.filter( metadata.tracks[track_number - 1].artist) v['s'] = self._filter.filter( metadata.tracks[track_number - 1].sortName) v['n'] = self._filter.filter( metadata.tracks[track_number - 1].title) elif track_number == 0: # htoa defaults to disc's artist v['a'] = self._filter.filter(metadata.artist) template = re.sub(r'%(\w)', r'%(\1)s', template) return os.path.join(outdir, template % v) def getCDDB(self, cddbdiscid): """Query CDDB using the given cddbdiscid to find the disc title. :param cddbdiscid: list of id, tracks, offsets, seconds. :type cddbdiscid: :returns: :rtype: str """ # FIXME: convert to nonblocking? import CDDB try: code, md = CDDB.query(cddbdiscid) logger.debug('CDDB query result: %r, %r', code, md) if code == 200: return md['title'] except IOError as e: # FIXME: for some reason errno is a str ? if e.errno == 'socket error': self._stdout.write("Warning: network error: %r\n" % (e, )) else: raise return None def getMusicBrainz(self, ittoc, mbdiscid, release=None, country=None, prompt=False): """Look up disc on MusicBrainz and get the relevant metadata. :param ittoc: :type ittoc: L{whipper.image.table.Table} :param mbdiscid: :type mbdiscid: :param release: (Default value = None) :type release: :param country: (Default value = None) :type country: :param prompt: (Default value = False) :type prompt: """ # look up disc on MusicBrainz self._stdout.write('Disc duration: %s, %d audio tracks\n' % ( common.formatTime(ittoc.duration() / 1000.0), ittoc.getAudioTracks())) logger.debug('MusicBrainz submit url: %r', ittoc.getMusicBrainzSubmitURL()) ret = None metadatas = None e = None for _ in range(0, 4): try: metadatas = mbngs.musicbrainz(mbdiscid, country=country, record=self._record) break except mbngs.NotFoundException as e: logger.warning("release not found: %r" % (e, )) break except musicbrainzngs.NetworkError as e: logger.warning("network error: %r" % (e, )) break except mbngs.MusicBrainzException as e: logger.warning("musicbrainz exception: %r" % (e, )) time.sleep(5) continue if not metadatas: self._stdout.write('Continuing without metadata\n') if metadatas: deltas = {} self._stdout.write('\nMatching releases:\n') for metadata in metadatas: self._stdout.write('\n') self._stdout.write('Artist : %s\n' % metadata.artist.encode('utf-8')) self._stdout.write('Title : %s\n' % metadata.title.encode('utf-8')) self._stdout.write('Duration: %s\n' % common.formatTime(metadata.duration / 1000.0)) self._stdout.write('URL : %s\n' % metadata.url) self._stdout.write('Release : %s\n' % metadata.mbid) self._stdout.write('Type : %s\n' % metadata.releaseType) if metadata.barcode: self._stdout.write("Barcode : %s\n" % metadata.barcode) if metadata.catalogNumber: self._stdout.write("Cat no : %s\n" % metadata.catalogNumber) delta = abs(metadata.duration - ittoc.duration()) if delta not in deltas: deltas[delta] = [] deltas[delta].append(metadata) lowest = None if not release and len(metadatas) > 1: # Select the release that most closely matches the duration. lowest = min(deltas.keys()) if prompt: guess = (deltas[lowest])[0].mbid release = raw_input( "\nPlease select a release [%s]: " % guess) if not release: release = guess if release: metadatas = [m for m in metadatas if m.url.endswith(release)] logger.debug('Asked for release %r, only kept %r', release, metadatas) if len(metadatas) == 1: self._stdout.write('\n') self._stdout.write('Picked requested release id %s\n' % release) self._stdout.write('Artist : %s\n' % metadatas[0].artist.encode('utf-8')) self._stdout.write('Title : %s\n' % metadatas[0].title.encode('utf-8')) elif not metadatas: self._stdout.write( "Requested release id '%s', " "but none of the found releases match\n" % release) return else: if lowest: metadatas = deltas[lowest] # If we have multiple, make sure they match if len(metadatas) > 1: artist = metadatas[0].artist releaseTitle = metadatas[0].releaseTitle for i, metadata in enumerate(metadatas): if not artist == metadata.artist: logger.warning("artist 0: %r and artist %d: %r " "are not the same" % ( artist, i, metadata.artist)) if not releaseTitle == metadata.releaseTitle: logger.warning("title 0: %r and title %d: %r " "are not the same" % ( releaseTitle, i, metadata.releaseTitle)) if (not release and len(deltas.keys()) > 1): self._stdout.write('\n') self._stdout.write('Picked closest match in duration.\n') self._stdout.write('Others may be wrong in MusicBrainz, ' 'please correct.\n') self._stdout.write('Artist : %s\n' % artist.encode('utf-8')) self._stdout.write('Title : %s\n' % metadatas[0].title.encode('utf-8')) # Select one of the returned releases. We just pick the first one. ret = metadatas[0] else: self._stdout.write( 'Submit this disc to MusicBrainz at the above URL.\n') ret = None self._stdout.write('\n') return ret def getTagList(self, number): """Based on the metadata, get a dict of tags for the given track. :param number: track number (0 for HTOA). :type number: int :returns: :rtype: dict """ trackArtist = u'Unknown Artist' albumArtist = u'Unknown Artist' disc = u'Unknown Disc' title = u'Unknown Track' if self.metadata: trackArtist = self.metadata.artist albumArtist = self.metadata.artist disc = self.metadata.title mbidAlbum = self.metadata.mbid mbidTrackAlbum = self.metadata.mbidArtist mbDiscId = self.metadata.discid if number > 0: try: track = self.metadata.tracks[number - 1] trackArtist = track.artist title = track.title mbidTrack = track.mbid mbidTrackArtist = track.mbidArtist except IndexError as e: print 'ERROR: no track %d found, %r' % (number, e) raise else: # htoa defaults to disc's artist title = 'Hidden Track One Audio' tags = {} if self.metadata and not self.metadata.various: tags['ALBUMARTIST'] = albumArtist tags['ARTIST'] = trackArtist tags['TITLE'] = title tags['ALBUM'] = disc tags['TRACKNUMBER'] = u'%s' % number if self.metadata: if self.metadata.release is not None: tags['DATE'] = self.metadata.release if number > 0: tags['MUSICBRAINZ_TRACKID'] = mbidTrack tags['MUSICBRAINZ_ARTISTID'] = mbidTrackArtist tags['MUSICBRAINZ_ALBUMID'] = mbidAlbum tags['MUSICBRAINZ_ALBUMARTISTID'] = mbidTrackAlbum tags['MUSICBRAINZ_DISCID'] = mbDiscId # TODO/FIXME: ISRC tag return tags def getHTOA(self): """Check if we have hidden track one audio. :returns: tuple of (start, stop), or None. """ track = self.result.table.tracks[0] try: index = track.getIndex(0) except KeyError: return None start = index.absolute stop = track.getIndex(1).absolute - 1 return (start, stop) def verifyTrack(self, runner, trackResult): is_wave = not trackResult.filename.endswith('.flac') t = checksum.CRC32Task(trackResult.filename, is_wave=is_wave) try: runner.run(t) except task.TaskException as e: if isinstance(e.exception, common.MissingFrames): logger.warning('missing frames for %r' % trackResult.filename) return False else: raise ret = trackResult.testcrc == t.checksum logger.debug('verifyTrack: track result crc %r, ' 'file crc %r, result %r', trackResult.testcrc, t.checksum, ret) return ret def ripTrack(self, runner, trackResult, offset, device, taglist, overread, what=None): """. Ripping the track may change the track's filename as stored in trackResult. :param trackResult: the object to store information in. :type trackResult: L{result.TrackResult} :param runner: :type runner: :param offset: :type offset: :param device: :type device: :param taglist: :type taglist: :param overread: :type overread: :param what: (Default value = None) :type what: """ if trackResult.number == 0: start, stop = self.getHTOA() else: start = self.result.table.getTrackStart(trackResult.number) stop = self.result.table.getTrackEnd(trackResult.number) dirname = os.path.dirname(trackResult.filename) if not os.path.exists(dirname): os.makedirs(dirname) if not what: what = 'track %d' % (trackResult.number, ) t = cdparanoia.ReadVerifyTrackTask(trackResult.filename, self.result.table, start, stop, overread, offset=offset, device=device, taglist=taglist, what=what) runner.run(t) logger.debug('ripped track') logger.debug('test speed %.3f/%.3f seconds' % ( t.testspeed, t.testduration)) logger.debug('copy speed %.3f/%.3f seconds' % ( t.copyspeed, t.copyduration)) trackResult.testcrc = t.testchecksum trackResult.copycrc = t.copychecksum trackResult.peak = t.peak trackResult.quality = t.quality trackResult.testspeed = t.testspeed trackResult.copyspeed = t.copyspeed # we want rerips to add cumulatively to the time trackResult.testduration += t.testduration trackResult.copyduration += t.copyduration if trackResult.filename != t.path: trackResult.filename = t.path logger.info('Filename changed to %r', trackResult.filename) def retagImage(self, runner, taglists): cueImage = image.Image(self.cuePath) t = image.ImageRetagTask(cueImage, taglists) runner.run(t) def verifyImage(self, runner, table): """Verify table against accuraterip and cue_path track lengths. Verify our image against the given AccurateRip responses. Needs an initialized self.result. Will set accurip and friends on each TrackResult. Populates self.result.tracks with above TrackResults. :param runner: :type runner: :param table: :type table: """ cueImage = image.Image(self.cuePath) # assigns track lengths verifytask = image.ImageVerifyTask(cueImage) runner.run(verifytask) if verifytask.exception: logger.error(verifytask.exceptionMessage) return False responses = accurip.get_db_entry(table.accuraterip_path()) logger.info('%d AccurateRip response(s) found' % len(responses)) checksums = accurip.calculate_checksums([ os.path.join(os.path.dirname(self.cuePath), t.indexes[1].path) for t in filter(lambda t: t.number != 0, cueImage.cue.table.tracks) ]) if not (checksums and any(checksums['v1']) and any(checksums['v2'])): return False return accurip.verify_result(self.result, responses, checksums) def write_m3u(self, discname): m3uPath = u'%s.m3u' % discname with open(m3uPath, 'w') as f: f.write(u'#EXTM3U\n'.encode('utf-8')) for track in self.result.tracks: if not track.filename: # false positive htoa continue if track.number == 0: length = (self.result.table.getTrackStart(1) / common.FRAMES_PER_SECOND) else: length = (self.result.table.getTrackLength(track.number) / common.FRAMES_PER_SECOND) target_path = common.getRelativePath(track.filename, m3uPath) u = u'#EXTINF:%d,%s\n' % (length, target_path) f.write(u.encode('utf-8')) u = '%s\n' % target_path f.write(u.encode('utf-8')) def writeCue(self, discName): assert self.result.table.canCue() cuePath = '%s.cue' % discName logger.debug('write .cue file to %s', cuePath) handle = open(cuePath, 'w') # FIXME: do we always want utf-8 ? handle.write(self.result.table.cue(cuePath).encode('utf-8')) handle.close() self.cuePath = cuePath return cuePath def writeLog(self, discName, logger): logPath = '%s.log' % discName handle = open(logPath, 'w') log = logger.log(self.result) handle.write(log.encode('utf-8')) handle.close() self.logPath = logPath return logPath