Files
whipper-gui/whipper/extern/freedb.py
JoeLametta e56c636fd3 Improve docstrings
Signed-off-by: JoeLametta <JoeLametta@users.noreply.github.com>
2020-02-22 15:11:22 +00:00

211 lines
7.4 KiB
Python

# Audio Tools, a module and set of tools for manipulating audio data
# Copyright (C) 2007-2016 Brian Langenberger
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
# USA
def digit_sum(i):
"""Return the sum of all digits for the given integer."""
return sum(map(int, str(i)))
class DiscID:
def __init__(self, offsets, total_length, track_count, playable_length):
"""
Init DiscID.
:param offsets: list of track offsets, in CD frames
:param total_length: total length of the disc, in seconds
:param track_count: total number of tracks on the disc
:param playable_length: playable length of the disc, in seconds
The first three items are for generating the hex disc ID itself
while the last is for performing queries.
"""
assert(len(offsets) == track_count)
for o in offsets:
assert(o >= 0)
self.offsets = offsets
self.total_length = total_length
self.track_count = track_count
self.playable_length = playable_length
def __repr__(self):
return "DiscID({})".format(
", ".join(["{}={}".format(attr, getattr(self, attr))
for attr in ["offsets",
"total_length",
"track_count",
"playable_length"]]))
def __str__(self):
return "{:08X}".format(int(self))
def __int__(self):
digit_sum_ = sum([digit_sum(o // 75) for o in self.offsets])
return (((digit_sum_ % 255) << 24) |
((self.total_length & 0xFFFF) << 8) |
(self.track_count & 0xFF))
def perform_lookup(disc_id, freedb_server, freedb_port):
"""
Perform a web-based lookup using a DiscID on the given server and port.
Iterate over a list of MetaData objects per successful match, like:
``[track1, track2, ...], [track1, track2, ...], ...``
:raises HTTPError: if an error occurs querying the server
:raises ValueError: if the server returns invalid data
"""
import re
from time import sleep
RESPONSE = re.compile(r'(\d{3}) (.+?)[\r\n]+')
QUERY_RESULT = re.compile(r'(\S+) ([0-9a-fA-F]{8}) (.+)')
FREEDB_LINE = re.compile(r'(\S+?)=(.+?)[\r\n]+')
query = freedb_command(freedb_server,
freedb_port,
"query",
*([disc_id.__str__(),
"{:d}".format(disc_id.track_count)] +
["{:d}".format(o) for o in disc_id.offsets] +
["{:d}".format(disc_id.playable_length)]))
line = next(query)
response = RESPONSE.match(line)
if response is None:
raise ValueError("invalid response from server")
else:
# a list of (category, disc id, disc title) tuples
matches = []
code = int(response.group(1))
if code == 200:
# single exact match
match = QUERY_RESULT.match(response.group(2))
if match is not None:
matches.append((match.group(1),
match.group(2),
match.group(3)))
else:
raise ValueError("invalid query result")
elif (code == 211) or (code == 210):
# multiple exact or inexact matches
line = next(query)
while not line.startswith("."):
match = QUERY_RESULT.match(line)
if match is not None:
matches.append((match.group(1),
match.group(2),
match.group(3)))
else:
raise ValueError("invalid query result")
line = next(query)
elif code == 202:
# no match found
pass
else:
# some error has occurred
raise ValueError(response.group(2))
if len(matches) > 0:
# for each result, query FreeDB for XMCD file data
# XXX: Pylint, redefining argument with the local name 'disc_id'
for (category, disc_id, _) in matches:
sleep(1) # add a slight delay to keep the server happy
query = freedb_command(freedb_server,
freedb_port,
"read",
category,
disc_id)
response = RESPONSE.match(next(query))
if response is not None:
# FIXME: check response code here
freedb = {}
line = next(query)
while not line.startswith("."):
if not line.startswith("#"):
entry = FREEDB_LINE.match(line)
if entry is not None:
if entry.group(1) in freedb:
freedb[entry.group(1)] += entry.group(2)
else:
freedb[entry.group(1)] = entry.group(2)
line = next(query)
yield freedb
else:
raise ValueError("invalid response from server")
def freedb_command(freedb_server, freedb_port, cmd, *args):
"""
Generate and perform a query against FreeDB using the given command.
Yields a list of Unicode strings.
:param freedb_server: URL of FreeDB server to be queried
:type freedb_server: str
:param freedb_port: port number of FreeDB server to be queried
:type freedb_port: int
:param cmd: CDDB command
:type cmd: str
"""
from urllib.error import URLError
from urllib.request import urlopen
from urllib.parse import urlencode
from socket import getfqdn
from whipper import __version__ as VERSION
# some debug type checking
assert(isinstance(cmd, str))
for arg in args:
assert(isinstance(arg, str))
POST = []
# generate query to post with arguments in specific order
if len(args) > 0:
POST.append(("cmd", "cddb {} {}".format(cmd, " ".join(args))))
else:
POST.append(("cmd", "cddb {}".format(cmd)))
POST.append(
("hello",
"user {} {} {}".format(getfqdn(), "whipper", VERSION)))
POST.append(("proto", "6"))
try:
# get Request object from post
request = urlopen(
"http://{}:{:d}/~cddb/cddb.cgi".format(freedb_server, freedb_port),
urlencode(POST).encode())
except URLError as e:
raise ValueError(str(e))
try:
# yield lines of output
line = request.readline()
while len(line) > 0:
yield line.decode("UTF-8", "replace")
line = request.readline()
finally:
request.close()