diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 53462463..adf75fd7 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -3,17 +3,24 @@ name: Unit tests on: [push, pull_request] jobs: - test_py2: - runs-on: ubuntu-20.04 + test: + strategy: + matrix: + include: + - pyversion: '2.7' + os: ubuntu-20.04 + - pyversion: '3' + os: ubuntu-latest + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v2 with: fetch-depth: 0 - - name: Set up Python 2.7 + - name: Set up Python ${{ matrix.pyversion }} uses: actions/setup-python@v2 with: - python-version: '2.7' + python-version: ${{ matrix.pyversion }} - name: Install dependencies run: | @@ -21,7 +28,6 @@ jobs: pip install -r requirements-dev.txt # FIXME: branding.py still has no permanent home curl https://gist.github.com/ydirson/3c36a7e19d762cc529a6c82340894ccc/raw/5ca39f621b1feab813e171f535c1aad1bd483f1d/branding.py -O -L - pip install pyliblzma pip install -e . command -v xz @@ -29,22 +35,22 @@ jobs: run: | pytest --cov -rP coverage xml - coverage html - coverage html -d htmlcov-tests --include="tests/*" - diff-cover --html-report coverage-diff.html coverage.xml + coverage html -d htmlcov-${{ matrix.pyversion }} + coverage html -d htmlcov-tests-${{ matrix.pyversion }} --include="tests/*" + diff-cover --compare-branch=origin/master --html-report coverage-diff-${{ matrix.pyversion }}.html coverage.xml - name: Pylint run: | pylint --version pylint --exit-zero xcp/ tests/ setup.py pylint --exit-zero --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" xcp/ tests/ setup.py > pylint.txt - diff-quality --violations=pylint --html-report pylint-diff.html pylint.txt + diff-quality --compare-branch=origin/master --violations=pylint --html-report pylint-diff-${{ matrix.pyversion }}.html pylint.txt - uses: actions/upload-artifact@v3 with: name: Coverage and pylint reports path: | - coverage-diff.html - pylint-diff.html - htmlcov - htmlcov-tests + coverage-diff-${{ matrix.pyversion }}.html + pylint-diff-${{ matrix.pyversion }}.html + htmlcov-${{ matrix.pyversion }} + htmlcov-tests-${{ matrix.pyversion }} diff --git a/requirements-dev.txt b/requirements-dev.txt index b2799686..ce760b45 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,6 +5,11 @@ diff_cover mock pytest pytest-cov +parameterized # dependencies also in setup.py until they can be used six future + +# python-2.7 only +configparser ; python_version < "3.0" +pyliblzma ; python_version < "3.0" diff --git a/tests/data/repo/boot/isolinux/mboot.c32 b/tests/data/repo/boot/isolinux/mboot.c32 new file mode 100644 index 00000000..3f5b2fe5 Binary files /dev/null and b/tests/data/repo/boot/isolinux/mboot.c32 differ diff --git a/tests/test_accessor.py b/tests/test_accessor.py index ade787e6..d001edb8 100644 --- a/tests/test_accessor.py +++ b/tests/test_accessor.py @@ -1,21 +1,42 @@ import unittest +import hashlib +from tempfile import NamedTemporaryFile + +from parameterized import parameterized_class import xcp.accessor +@parameterized_class([{"url": "file://tests/data/repo/"}, + {"url": "https://updates.xcp-ng.org/netinstall/8.2.1"}]) class TestAccessor(unittest.TestCase): - def test_http(self): - raise unittest.SkipTest("comment out if you really mean it") - a = xcp.accessor.createAccessor("https://updates.xcp-ng.org/netinstall/8.2.1", True) + def test_access(self): + a = xcp.accessor.createAccessor(self.url, True) a.start() self.assertTrue(a.access('.treeinfo')) self.assertFalse(a.access('no_such_file')) self.assertEqual(a.lastError, 404) a.finish() - def test_file(self): - a = xcp.accessor.createAccessor("file://tests/data/repo/", True) + def test_file_binfile(self): + BINFILE = "boot/isolinux/mboot.c32" + a = xcp.accessor.createAccessor(self.url, True) a.start() - self.assertTrue(a.access('.treeinfo')) - self.assertFalse(a.access('no_such_file')) - self.assertEqual(a.lastError, 404) + self.assertTrue(a.access(BINFILE)) + inf = a.openAddress(BINFILE) + with NamedTemporaryFile("wb") as outf: + while True: + data = inf.read() + if not data: # EOF + break + outf.write(data) + outf.flush() + hasher = hashlib.md5() + with open(outf.name, "rb") as bincontents: + while True: + data = bincontents.read() + if not data: # EOF + break + hasher.update(data) + csum = hasher.hexdigest() + self.assertEqual(csum, "eab52cebc3723863432dc672360f6dac") a.finish() diff --git a/tests/test_cpio.py b/tests/test_cpio.py index c3543c89..fdb34f40 100644 --- a/tests/test_cpio.py +++ b/tests/test_cpio.py @@ -14,11 +14,11 @@ def writeRandomFile(fn, size, start=b'', add=b'a'): with open(fn, 'wb') as f: m = md5() m.update(start) - assert(len(add) != 0) + assert add while size > 0: d = m.digest() if size < len(d): - d=d[:size] + d = d[:size] f.write(d) size -= len(d) m.update(add) diff --git a/tests/test_dom0.py b/tests/test_dom0.py index bf198341..440bb109 100644 --- a/tests/test_dom0.py +++ b/tests/test_dom0.py @@ -30,7 +30,7 @@ def mock_version(open_mock, version): (2*1024, 4*1024, 8*1024), # Above max ] - with patch("__builtin__.open") as open_mock: + with patch("xcp.dom0.open") as open_mock: for host_gib, dom0_mib, _ in test_values: mock_version(open_mock, '2.8.0') expected = dom0_mib * 1024; @@ -39,7 +39,7 @@ def mock_version(open_mock, version): open_mock.assert_called_with("/etc/xensource-inventory") - with patch("__builtin__.open") as open_mock: + with patch("xcp.dom0.open") as open_mock: for host_gib, _, dom0_mib in test_values: mock_version(open_mock, '2.9.0') expected = dom0_mib * 1024; diff --git a/tests/test_ifrename_dynamic.py b/tests/test_ifrename_dynamic.py index 1cc95e39..0948d254 100644 --- a/tests/test_ifrename_dynamic.py +++ b/tests/test_ifrename_dynamic.py @@ -125,10 +125,10 @@ def test_pci_matching_invert(self): MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", kname="eth1", ppn="", label="")]) - self.assertEqual(dr.rules,[ + self.assertEqual(set(dr.rules), set([ MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth1"), MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth0") - ]) + ])) def test_pci_missing(self): diff --git a/tests/test_ifrename_logic.py b/tests/test_ifrename_logic.py index 3bb7a911..a2715334 100644 --- a/tests/test_ifrename_logic.py +++ b/tests/test_ifrename_logic.py @@ -518,6 +518,7 @@ def test_ibft_nic_to_ibft(self): class TestInputSanitisation(unittest.TestCase): + # pylint: disable=no-member def setUp(self): """ diff --git a/tests/test_ifrename_static.py b/tests/test_ifrename_static.py index 9b11e380..674decfe 100644 --- a/tests/test_ifrename_static.py +++ b/tests/test_ifrename_static.py @@ -375,10 +375,10 @@ def test_pci_matching(self): sr.generate(self.state) - self.assertEqual(sr.rules,[ - MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth1"), - MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth0") - ]) + self.assertEqual(set(sr.rules), set([ + MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth1"), + MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth0") + ])) def test_pci_matching_invert(self): @@ -389,10 +389,10 @@ def test_pci_matching_invert(self): sr.generate(self.state) - self.assertEqual(sr.rules,[ - MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth1"), - MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth0") - ]) + self.assertEqual(set(sr.rules), set([ + MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth1"), + MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth0") + ])) def test_pci_matching_mixed(self): @@ -403,10 +403,10 @@ def test_pci_matching_mixed(self): sr.generate(self.state) - self.assertEqual(sr.rules,[ - MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth0"), - MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth1") - ]) + self.assertEqual(set(sr.rules), set([ + MACPCI("c8:cb:b8:d3:0c:cf", "0000:04:00.0", tname="eth0"), + MACPCI("c8:cb:b8:d3:0c:ce", "0000:04:00.0", tname="eth1") + ])) def test_pci_missing(self): diff --git a/tests/test_repository.py b/tests/test_repository.py index 833627d0..8081c33a 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -1,22 +1,15 @@ import unittest +from parameterized import parameterized_class import xcp.accessor from xcp import repository from xcp.version import Version +@parameterized_class([{"url": "file://tests/data/repo/"}, + {"url": "https://updates.xcp-ng.org/netinstall/8.2.1"}]) class TestRepository(unittest.TestCase): - def test_http(self): - raise unittest.SkipTest("comment out if you really mean it") - a = xcp.accessor.createAccessor("https://updates.xcp-ng.org/netinstall/8.2.1", True) - repo_ver = repository.BaseRepository.getRepoVer(a) - self.assertEqual(repo_ver, Version([3, 2, 1])) - product_ver = repository.BaseRepository.getProductVersion(a) - self.assertEqual(product_ver, Version([8, 2, 1])) - repos = repository.BaseRepository.findRepositories(a) - self.assertEqual(len(repos), 1) - - def test_file(self): - a = xcp.accessor.createAccessor("file://tests/data/repo/", True) + def test_basicinfo(self): + a = xcp.accessor.createAccessor(self.url, True) repo_ver = repository.BaseRepository.getRepoVer(a) self.assertEqual(repo_ver, Version([3, 2, 1])) product_ver = repository.BaseRepository.getProductVersion(a) diff --git a/xcp/bootloader.py b/xcp/bootloader.py index a1d19709..81a61bd4 100644 --- a/xcp/bootloader.py +++ b/xcp/bootloader.py @@ -336,19 +336,19 @@ def create_label(title): try: for line in fh: l = line.strip() - menu_match = re.match("menuentry ['\"]([^']*)['\"](.*){", l) + menu_match = re.match(r"menuentry ['\"]([^']*)['\"](.*){", l) # Only parse unindented default and timeout lines to prevent # changing these lines in if statements. if l.startswith('set default=') and l == line.rstrip(): default = l.split('=')[1] - match = re.match("['\"](.*)['\"]$", default) + match = re.match(r"['\"](.*)['\"]$", default) if match: default = match.group(1) elif l.startswith('set timeout=') and l == line.rstrip(): timeout = int(l.split('=')[1]) * 10 elif l.startswith('serial'): - match = re.match("serial --unit=(\d+) --speed=(\d+)", l) + match = re.match(r"serial --unit=(\d+) --speed=(\d+)", l) if match: serial = { 'port': int(match.group(1)), diff --git a/xcp/cmd.py b/xcp/cmd.py index bbd94656..16cfb64f 100644 --- a/xcp/cmd.py +++ b/xcp/cmd.py @@ -24,15 +24,16 @@ """Command processing""" import subprocess +import six import xcp.logger as logger def runCmd(command, with_stdout = False, with_stderr = False, inputtext = None): - cmd = subprocess.Popen(command, bufsize = 1, - stdin = (inputtext and subprocess.PIPE or None), - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - shell = isinstance(command, str)) + cmd = subprocess.Popen(command, bufsize=1, + stdin=(inputtext and subprocess.PIPE or None), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=isinstance(command, six.string_types)) (out, err) = cmd.communicate(inputtext) rv = cmd.returncode diff --git a/xcp/cpiofile.py b/xcp/cpiofile.py index 7b2623fa..fb4d96f7 100755 --- a/xcp/cpiofile.py +++ b/xcp/cpiofile.py @@ -310,7 +310,7 @@ def _init_write_gz(self): self.__write(b"\037\213\010\010%s\002\377" % timestamp) if self.name.endswith(".gz"): self.name = self.name[:-3] - self.__write(self.name + NUL) + self.__write(six.ensure_binary(self.name) + NUL) def write(self, s): """Write string s to the stream. @@ -951,7 +951,7 @@ def __init__(self, name=None, mode="r", fileobj=None): self.mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode] if not fileobj: - fileobj = file(name, self.mode) + fileobj = bltn_open(name, self.mode) self._extfileobj = False else: if name is None and hasattr(fileobj, "name"): @@ -1109,7 +1109,7 @@ def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9): raise CompressionError("gzip module is not available") if fileobj is None: - fileobj = file(name, mode + "b") + fileobj = bltn_open(name, mode + "b") try: t = cls.cpioopen(name, mode, gzip.GzipFile(name, mode, compresslevel, fileobj)) @@ -1354,7 +1354,7 @@ def add(self, name, arcname=None, recursive=True): # Append the cpio header and data to the archive. if cpioinfo.isreg(): - f = file(name, "rb") + f = bltn_open(name, "rb") self.addfile(cpioinfo, f) f.close() @@ -1420,7 +1420,7 @@ def extractall(self, path=".", members=None): # Extract directory with a safe mode, so that # all files below can be extracted as well. try: - os.makedirs(os.path.join(path, cpioinfo.name), 0o777) + os.makedirs(os.path.join(path, six.ensure_text(cpioinfo.name)), 0o777) except EnvironmentError: pass directories.append(cpioinfo) @@ -1428,12 +1428,12 @@ def extractall(self, path=".", members=None): self.extract(cpioinfo, path) # Reverse sort directories. - directories.sort(lambda a, b: cmp(a.name, b.name)) + directories.sort(key=lambda x: x.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for cpioinfo in directories: - path = os.path.join(path, cpioinfo.name) + path = os.path.join(path, six.ensure_text(cpioinfo.name)) try: self.chown(cpioinfo, path) self.utime(cpioinfo, path) @@ -1462,7 +1462,7 @@ def extract(self, member, path=""): cpioinfo._link_path = path try: - self._extract_member(cpioinfo, os.path.join(path, cpioinfo.name)) + self._extract_member(cpioinfo, os.path.join(path, six.ensure_text(cpioinfo.name))) except EnvironmentError as e: if self.errorlevel > 0: raise @@ -1594,7 +1594,7 @@ def makefile(self, cpioinfo, targetpath): if extractinfo: source = self.extractfile(extractinfo) - target = file(targetpath, "wb") + target = bltn_open(targetpath, "wb") copyfileobj(source, target) source.close() target.close() @@ -1926,5 +1926,5 @@ def is_cpiofile(name): except CpioError: return False -def cpioOpen(*al, **ad): - return CpioFile.open(*al, **ad) +bltn_open = open +open = CpioFile.open diff --git a/xcp/dom0.py b/xcp/dom0.py index 086a683b..b8a46c3a 100644 --- a/xcp/dom0.py +++ b/xcp/dom0.py @@ -96,7 +96,7 @@ def default_memory(host_mem_kib): return default_memory_for_version(host_mem_kib, platform_version) -_size_and_unit_re = re.compile("^(-?\d+)([bkmg]?)$", re.IGNORECASE) +_size_and_unit_re = re.compile(r"^(-?\d+)([bkmg]?)$", re.IGNORECASE) def _parse_size_and_unit(s): m = _size_and_unit_re.match(s) diff --git a/xcp/logger.py b/xcp/logger.py index 03e8fe78..ca972b5c 100644 --- a/xcp/logger.py +++ b/xcp/logger.py @@ -33,6 +33,7 @@ import logging import logging.handlers +import six LOG = logging.getLogger() LOG.setLevel(logging.NOTSET) @@ -47,7 +48,7 @@ def openLog(lfile, level=logging.INFO): try: # if lfile is a string, assume we need to open() it - if isinstance(lfile, str): + if isinstance(lfile, six.string_types): h = open(lfile, 'a') if h.isatty(): handler = logging.StreamHandler(h) diff --git a/xcp/net/ifrename/logic.py b/xcp/net/ifrename/logic.py index e2a3484b..41e74c02 100644 --- a/xcp/net/ifrename/logic.py +++ b/xcp/net/ifrename/logic.py @@ -52,9 +52,9 @@ from xcp.logger import LOG from xcp.net.ifrename.macpci import MACPCI -VALID_CUR_STATE_KNAME = re.compile("^(?:eth[\d]+|side-[\d]+-eth[\d]+)$") -VALID_ETH_NAME = re.compile("^eth([\d])+$") -VALID_IBFT_NAME = re.compile("^ibft([\d])+$") +VALID_CUR_STATE_KNAME = re.compile(r"^(?:eth[\d]+|side-[\d]+-eth[\d]+)$") +VALID_ETH_NAME = re.compile(r"^eth([\d])+$") +VALID_IBFT_NAME = re.compile(r"^ibft([\d])+$") # util needs to import VALID_ETH_NAME from xcp.net.ifrename import util @@ -289,9 +289,10 @@ def rename_logic( static_rules, # Check that the function still has the same number of nics if len(lastnics) != len(newnics): - LOG.warn("multi-nic function %s had %d nics but now has %d. " - "Defering all until later for renaming" - % (fn, len(lastnics), len(newnics))) + LOG.warning( + "multi-nic function %s had %d nics but now has %d. " + "Defering all until later for renaming", + fn, len(lastnics), len(newnics)) continue # Check that all nics are still pending a rename diff --git a/xcp/net/ifrename/static.py b/xcp/net/ifrename/static.py index c1b9b100..bf503d54 100644 --- a/xcp/net/ifrename/static.py +++ b/xcp/net/ifrename/static.py @@ -89,7 +89,7 @@ class StaticRules(object): methods = ["mac", "pci", "ppn", "label", "guess"] validators = { "mac": VALID_MAC, "pci": VALID_PCI, - "ppn": re.compile("^(?:em\d+|p(?:ci)?\d+p\d+)$") + "ppn": re.compile(r"^(?:em\d+|p(?:ci)?\d+p\d+)$") } def __init__(self, path=None, fd=None): diff --git a/xcp/net/mac.py b/xcp/net/mac.py index 47e586c0..56ba4b7b 100644 --- a/xcp/net/mac.py +++ b/xcp/net/mac.py @@ -31,11 +31,13 @@ __author__ = "Andrew Cooper" import re +import six VALID_COLON_MAC = re.compile(r"^([\da-fA-F]{1,2}:){5}[\da-fA-F]{1,2}$") VALID_DASH_MAC = re.compile(r"^([\da-fA-F]{1,2}-){5}[\da-fA-F]{1,2}$") VALID_DOTQUAD_MAC = re.compile(r"^([\da-fA-F]{1,4}\.){2}[\da-fA-F]{1,4}$") +@six.python_2_unicode_compatible class MAC(object): """ Mac address object for manipulation and comparison @@ -59,27 +61,25 @@ def __init__(self, addr): self.octets = [] self.integer = -1 - if isinstance(addr, (str, unicode)): + if not isinstance(addr, six.string_types): + raise TypeError("String expected") - res = VALID_COLON_MAC.match(addr) - if res: - self._set_from_str_octets(addr.split(":")) - return + res = VALID_COLON_MAC.match(addr) + if res: + self._set_from_str_octets(addr.split(":")) + return - res = VALID_DASH_MAC.match(addr) - if res: - self._set_from_str_octets(addr.split("-")) - return + res = VALID_DASH_MAC.match(addr) + if res: + self._set_from_str_octets(addr.split("-")) + return - res = VALID_DOTQUAD_MAC.match(addr) - if res: - self._set_from_str_quads(addr.split(".")) - return + res = VALID_DOTQUAD_MAC.match(addr) + if res: + self._set_from_str_quads(addr.split(".")) + return - raise ValueError("Unrecognised MAC address '%s'" % addr) - - else: - raise TypeError("String expected") + raise ValueError("Unrecognised MAC address '%s'" % addr) def _set_from_str_octets(self, octets): @@ -122,9 +122,6 @@ def is_local(self): def __str__(self): - return unicode(self).encode('utf-8') - - def __unicode__(self): return ':'.join([ "%0.2x" % x for x in self.octets]) def __repr__(self): diff --git a/xcp/pci.py b/xcp/pci.py index 1c8e081d..1f911a6b 100644 --- a/xcp/pci.py +++ b/xcp/pci.py @@ -24,11 +24,12 @@ import os.path import subprocess import re +import six _SBDF = (r"(?:(?P [\da-dA-F]{4}):)?" # Segment (optional) - " (?P [\da-fA-F]{2}):" # Bus - " (?P [\da-fA-F]{2})\." # Device - " (?P[\da-fA-F])" # Function + r" (?P [\da-fA-F]{2}):" # Bus + r" (?P [\da-fA-F]{2})\." # Device + r" (?P[\da-fA-F])" # Function ) # Don't change the meaning of VALID_SBDF as some parties may be using it @@ -36,7 +37,7 @@ VALID_SBDFI = re.compile( r"^(?P%s)" - " (?:[[](?P[\d]{1,2})[]])?$" # Index (optional) + r" (?:[[](?P[\d]{1,2})[]])?$" # Index (optional) % _SBDF , re.X) @@ -66,48 +67,46 @@ def __init__(self, addr): self.function = -1 self.index = -1 - if isinstance(addr, (str, unicode)): - - res = VALID_SBDFI.match(addr) - if res: - groups = res.groupdict() + if not isinstance(addr, six.string_types): + raise TypeError("String expected") - if "segment" in groups and groups["segment"] is not None: - self.segment = int(groups["segment"], 16) - else: - self.segment = 0 + res = VALID_SBDFI.match(addr) + if res: + groups = res.groupdict() - self.bus = int(groups["bus"], 16) - if not ( 0 <= self.bus < 2**8 ): - raise ValueError("Bus '%d' out of range 0 <= bus < 256" - % (self.bus,)) + if "segment" in groups and groups["segment"] is not None: + self.segment = int(groups["segment"], 16) + else: + self.segment = 0 - self.device = int(groups["device"], 16) - if not ( 0 <= self.device < 2**5): - raise ValueError("Device '%d' out of range 0 <= device < 32" - % (self.device,)) + self.bus = int(groups["bus"], 16) + if not ( 0 <= self.bus < 2**8 ): + raise ValueError("Bus '%d' out of range 0 <= bus < 256" + % (self.bus,)) - self.function = int(groups["function"], 16) - if not ( 0 <= self.function < 2**3): - raise ValueError("Function '%d' out of range 0 <= device " - "< 8" % (self.function,)) + self.device = int(groups["device"], 16) + if not ( 0 <= self.device < 2**5): + raise ValueError("Device '%d' out of range 0 <= device < 32" + % (self.device,)) - if "index" in groups and groups["index"] is not None: - self.index = int(groups["index"]) - else: - self.index = 0 + self.function = int(groups["function"], 16) + if not ( 0 <= self.function < 2**3): + raise ValueError("Function '%d' out of range 0 <= device " + "< 8" % (self.function,)) - self.integer = (int(self.segment << 16 | - self.bus << 8 | - self.device << 3 | - self.function) << 8 | - self.index) - return + if "index" in groups and groups["index"] is not None: + self.index = int(groups["index"]) + else: + self.index = 0 - raise ValueError("Unrecognised PCI address '%s'" % addr) + self.integer = (int(self.segment << 16 | + self.bus << 8 | + self.device << 3 | + self.function) << 8 | + self.index) + return - else: - raise TypeError("String expected") + raise ValueError("Unrecognised PCI address '%s'" % addr) def __str__(self): @@ -277,7 +276,7 @@ def findByClass(self, cls, subcls = None): class, subclass [class1, class2, ... classN]""" if subcls: - assert isinstance(cls, str) + assert isinstance(cls, six.string_types) return [x for x in self.devs.values() if x['class'] == cls and x['subclass'] == subcls] else: assert isinstance(cls, list) diff --git a/xcp/repository.py b/xcp/repository.py index ca284647..35a2c08d 100644 --- a/xcp/repository.py +++ b/xcp/repository.py @@ -23,10 +23,12 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import md5 +from hashlib import md5 +import io import os.path import xml.dom.minidom -import ConfigParser +import configparser +import sys import six @@ -179,10 +181,17 @@ def _getVersion(cls, access, category): access.start() try: - treeinfofp = access.openAddress(cls.TREEINFO_FILENAME) - treeinfo = ConfigParser.SafeConfigParser() - treeinfo.readfp(treeinfofp) - treeinfofp.close() + rawtreeinfofp = access.openAddress(cls.TREEINFO_FILENAME) + if sys.version_info < (3, 0) or isinstance(rawtreeinfofp, io.TextIOBase): + # e.g. with FileAccessor + treeinfofp = rawtreeinfofp + else: + # e.g. with HTTPAccessor + treeinfofp = io.TextIOWrapper(rawtreeinfofp, encoding='utf-8') + treeinfo = configparser.ConfigParser() + treeinfo.read_file(treeinfofp) + treeinfofp = None + rawtreeinfofp.close() if treeinfo.has_section('system-v1'): ver_str = treeinfo.get('system-v1', category_map[category]) else: @@ -246,7 +255,7 @@ def findRepositories(cls, access): def __init__(self, access, base, is_group = False): BaseRepository.__init__(self, access, base) self.is_group = is_group - self._md5 = md5.new() + self._md5 = md5() self.requires = [] self.packages = [] @@ -288,7 +297,7 @@ def _parse_repofile(self, repofile): repofile.close() # update md5sum for repo - self._md5.update(repofile_contents) + self._md5.update(repofile_contents.encode()) # build xml doc object try: diff --git a/xcp/xmlunwrap.py b/xcp/xmlunwrap.py index 1487afab..824a52a2 100644 --- a/xcp/xmlunwrap.py +++ b/xcp/xmlunwrap.py @@ -22,65 +22,73 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """xmlunwrap - general methods to unwrap XML elements & attributes""" +from xml.dom.minidom import Element import six class XmlUnwrapError(Exception): pass -def getText(nodelist): +def getText(element): + # type:(Element) -> str + """Return the text of the element as stripped string""" rc = "" - for node in nodelist.childNodes: + for node in element.childNodes: if node.nodeType == node.TEXT_NODE: rc = rc + node.data - return rc.encode().strip() + if not isinstance(rc, str): # Python 2 only, otherwise it would return unicode + rc = rc.encode() + return rc.strip() def getElementsByTagName(el, tags, mandatory = False): matching = [] for tag in tags: matching.extend(el.getElementsByTagName(tag)) - if mandatory and len(matching) == 0: - raise XmlUnwrapError("Missing mandatory element %s" % tags[0]) + if mandatory and not matching: + raise XmlUnwrapError("Missing mandatory element " + tags[0]) return matching -def getStrAttribute(el, attrs, default = '', mandatory = False): +def getStrAttribute(el, attrs, default='', mandatory=False): + # type:(Element, list, str | None, bool) -> str | None matching = [] for attr in attrs: - val = el.getAttribute(attr).encode() + val = el.getAttribute(attr) if val != '': matching.append(val) - if len(matching) == 0: + if not matching: if mandatory: - raise XmlUnwrapError("Missing mandatory attribute %s" % attrs[0]) + raise XmlUnwrapError("Missing mandatory attribute " + attrs[0]) return default return matching[0] def getBoolAttribute(el, attrs, default = None): - mandatory = (default == None) - val = getStrAttribute(el, attrs, '', mandatory).lower() - if val == '': - return default - return val in ['yes', 'true'] + # type:(Element, list, bool | None) -> bool | None + mandatory = default is None + # Will raise an exception if attribute is not found and default is None: + val = getStrAttribute(el, attrs, '', mandatory).lower() # type: ignore + return default if val == '' else val in ['yes', 'true'] def getIntAttribute(el, attrs, default = None): - mandatory = (default == None) + # type:(Element, list, int | None) -> int | None + mandatory = default is None val = getStrAttribute(el, attrs, '', mandatory) if val == '': return default try: - int_val = int(val, 0) + int_val = int(val, 0) # type: ignore # (handled by raising XmlUnwarpError below) except Exception as e: - six.raise_from(XmlUnwrapError("Invalid integer value for %s" % attrs[0]), e) + six.raise_from(XmlUnwrapError("Invalid integer value for " + attrs[0]), e) return int_val def getMapAttribute(el, attrs, mapping, default = None): - mandatory = (default == None) + # type:(Element, list, list[list], str | None) -> str + mandatory = default is None k, v = zip(*mapping) key = getStrAttribute(el, attrs, default, mandatory) if key not in k: - raise XmlUnwrapError("Unexpected key %s for attribute" % key) + raise XmlUnwrapError("Unexpected key " + str(key) + "for attribute") k_list = list(k) return v[k_list.index(key)]