diff --git a/pyproject.toml b/pyproject.toml index 2f21011953..ce695fcad0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,194 @@ +# Build-system section [build-system] requires = ["setuptools>=40.8.0", "wheel"] build-backend = "setuptools.build_meta" + +# Black section +[tool.black] +force-exclude=""" +(tests/aggregate_tests.py| +tests/fast_server_exit.py| +tests/simple_https_server.py| +tests/simple_server.py| +tests/slow_retrieval_server.py| +tests/utils.py| +tests/test_utils.py| +tests/test_repository_lib.py| +tests/test_arbitrary_package_attack.py| +tests/test_developer_tool.py| +tests/test_download.py| +tests/test_endless_data_attack.py| +tests/test_extraneous_dependencies_attack.py| +tests/test_formats.py| +tests/test_indefinite_freeze_attack.py| +tests/test_key_revocation_integration.py| +tests/test_keydb.py| +tests/test_log.py| +tests/test_mirrors.py| +tests/test_mix_and_match_attack.py| +tests/test_multiple_repositories_integration.py| +tests/test_repository_tool.py| +tests/test_replay_attack.py| +tests/test_roledb.py| +tests/test_root_versioning_integration.py| +tests/test_sig.py| +tests/test_slow_retrieval_attack.py| +tests/test_tutorial.py| +tests/test_unittest_toolbox.py| +tests/test_updater.py| +tests/test_updater_root_rotation_integration.py| +tests/repository_data/generate_project_data.py| +tests/repository_data/generate.py)""" + +# isort section +[tool.isort] +skip = [ + "tests/fast_server_exit.py", + "tests/simple_https_server.py", + "tests/simple_server.py", + "tests/slow_retrieval_server.py", + "tests/utils.py", + "tests/test_utils.py", + "tests/test_repository_lib.py", + "tests/test_arbitrary_package_attack.py", + "tests/test_developer_tool.py", + "tests/test_download.py", + "tests/test_endless_data_attack.py", + "tests/test_extraneous_dependencies_attack.py", + "tests/test_formats.py", + "tests/test_indefinite_freeze_attack.py", + "tests/test_key_revocation_integration.py", + "tests/test_keydb.py", + "tests/test_log.py", + "tests/test_mirrors.py", + "tests/test_mix_and_match_attack.py", + "tests/test_multiple_repositories_integration.py", + "tests/test_repository_tool.py", + "tests/test_replay_attack.py", + "tests/test_roledb.py", + "tests/test_root_versioning_integration.py", + "tests/test_sig.py", + "tests/test_slow_retrieval_attack.py", + "tests/test_tutorial.py", + "tests/test_unittest_toolbox.py", + "tests/test_updater.py", + "tests/test_updater_root_rotation_integration.py", + "tests/repository_data/generate_project_data.py", + "tests/repository_data/generate.py", +] + +# Pylint section + +# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: +# https://github.com/secure-systems-lab/code-style-guidelines +# +# Based on Google Python Style Guide pylintrc and pylint defaults: +# https://google.github.io/styleguide/pylintrc +# http://pylint.pycqa.org/en/latest/technical_reference/features.html + +[tool.pylint.master] +ignore = [ + "aggregate_tests.py", + "fast_server_exit.py", + "simple_https_server.py", + "simple_server.py", + "slow_retrieval_server.py", + "utils.py", + "test_utils.py", + "test_repository_lib.py", + "test_arbitrary_package_attack.py", + "test_developer_tool.py", + "test_download.py", + "test_endless_data_attack.py", + "test_extraneous_dependencies_attack.py", + "test_formats.py", + "test_indefinite_freeze_attack.py", + "test_key_revocation_integration.py", + "test_keydb.py", + "test_log.py", + "test_mirrors.py", + "test_mix_and_match_attack.py", + "test_multiple_repositories_integration.py", + "test_repository_tool.py", + "test_replay_attack.py", + "test_roledb.py", + "test_root_versioning_integration.py", + "test_sig.py", + "test_slow_retrieval_attack.py", + "test_tutorial.py", + "test_unittest_toolbox.py", + "test_updater.py", + "test_updater_root_rotation_integration.py", + "generate_project_data.py", + "generate.py", +] + +[tool.pylint.message_control] +# Disable the message, report, category or checker with the given id(s). +# NOTE: To keep this config as short as possible we only disable checks that +# are currently in conflict with our code. If new code displeases the linter +# (for good reasons) consider updating this config file, or disable checks with. +disable=[ + "fixme", + "too-few-public-methods", + "too-many-arguments", + "duplicate-code" +] + +[tool.pylint.basic] +good-names = ["i","j","k","v","e","f","fn","fp","_type","_"] +# Regexes for allowed names are copied from the Google pylintrc +# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. +# If there are multiple groups it enfoces the prevalent naming style inside +# each modules. Names in the exempt capturing group are ignored. +function-rgx="""^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$""" +ethod-rgx="""(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$""" +argument-rgx="""^[a-z][a-z0-9_]*$""" +attr-rgx="""^_{0,2}[a-z][a-z0-9_]*$""" +class-attribute-rgx="""^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$""" +class-rgx="^_?[A-Z][a-zA-Z0-9]*$" +const-rgx="^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$" +inlinevar-rgx="^[a-z][a-z0-9_]*$" +module-rgx="^(_?[a-z][a-z0-9_]*|__init__)$" +no-docstring-rgx="(__.*__|main|test.*|.*test|.*Test)$" +variable-rgx="^[a-z][a-z0-9_]*$" +docstring-min-length=10 + +[tool.pylint.format] +ignore-long-lines='(?x)(^\s*(\#\ )??$|^\s*(from\s+\S+\s+)?import\s+.+$)' +indent-string=" " +indent-after-paren=4 +max-line-length=80 +single-line-if-stmt="yes" + +[tool.pylint.logging] +logging-format-style="old" + +[tool.pylint.miscellaneous] +notes="TODO" + +[tool.pylint.STRING] +check-quote-consistency="yes" + +# mypy section +# Read more here: https://mypy.readthedocs.io/en/stable/config_file.html#using-a-pyproject-toml-file +[tool.mypy] +warn_unused_configs = "True" +warn_redundant_casts = "True" +warn_unused_ignores = "True" +warn_unreachable = "True" +strict_equality = "True" +disallow_untyped_defs = "True" +disallow_untyped_calls = "True" +files = [ + "tuf/api/", + "tuf/ngclient", + "tuf/exceptions.py" +] + +[[tool.mypy.overrides]] +module = [ + "securesystemslib.*", + "urllib3.*" +] +ignore_missing_imports = "True" diff --git a/setup.cfg b/setup.cfg index f880a3893d..64fc786f15 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,21 +50,3 @@ ignore = .fossa.yml .readthedocs.yaml -[mypy] -warn_unused_configs = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_unreachable = True -strict_equality = True -disallow_untyped_defs = True -disallow_untyped_calls = True -files = - tuf/api/, - tuf/ngclient, - tuf/exceptions.py - -[mypy-securesystemslib.*] -ignore_missing_imports = True - -[mypy-urllib3.*] -ignore_missing_imports = True diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index d9e5885515..00d3d34c96 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -44,22 +44,22 @@ updater.refresh() """ -from collections import OrderedDict -from dataclasses import dataclass -from datetime import datetime, timedelta import logging import os import tempfile +from collections import OrderedDict +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, Iterator, List, Optional, Tuple +from urllib import parse + import securesystemslib.hash as sslib_hash from securesystemslib.keys import generate_ed25519_key from securesystemslib.signer import SSlibSigner -from typing import Dict, Iterator, List, Optional, Tuple -from urllib import parse -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES -from tuf.api.serialization.json import JSONSerializer -from tuf.exceptions import FetcherHTTPError from tuf.api.metadata import ( + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, DelegatedRole, Delegations, Key, @@ -67,25 +67,32 @@ MetaFile, Role, Root, - SPECIFICATION_VERSION, Snapshot, TargetFile, Targets, Timestamp, ) +from tuf.api.serialization.json import JSONSerializer +from tuf.exceptions import FetcherHTTPError from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) SPEC_VER = ".".join(SPECIFICATION_VERSION) + @dataclass class RepositoryTarget: """Contains actual target data and the related target metadata""" + data: bytes target_file: TargetFile + class RepositorySimulator(FetcherInterface): + """Simulates a repository that can be used for testing""" + + # pylint: disable=too-many-instance-attributes def __init__(self): self.md_root: Metadata[Root] = None self.md_timestamp: Metadata[Timestamp] = None @@ -132,6 +139,7 @@ def targets(self) -> Targets: return self.md_targets.signed def all_targets(self) -> Iterator[Tuple[str, Targets]]: + """Yield role name and signed portion of targets one by one""" yield "targets", self.md_targets.signed for role, md in self.md_delegates.items(): yield role, md.signed @@ -141,7 +149,7 @@ def create_key() -> Tuple[Key, SSlibSigner]: sslib_key = generate_ed25519_key() return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) - def add_signer(self, role:str, signer: SSlibSigner): + def add_signer(self, role: str, signer: SSlibSigner): if role not in self.signers: self.signers[role] = {} self.signers[role][signer.key_dict["keyid"]] = signer @@ -181,6 +189,7 @@ def publish_root(self): logger.debug("Published root v%d", self.root.version) def fetch(self, url: str) -> Iterator[bytes]: + """Fetches data from the given url and yeilds an Iterator""" if not self.root.consistent_snapshot: raise NotImplementedError("non-consistent snapshot not supported") path = parse.urlparse(url).path @@ -197,7 +206,7 @@ def fetch(self, url: str) -> Iterator[bytes]: elif path.startswith("/targets/"): # figure out target path and hash prefix target_path = path[len("/targets/") :] - dir_parts, sep , prefixed_filename = target_path.rpartition("/") + dir_parts, sep, prefixed_filename = target_path.rpartition("/") prefix, _, filename = prefixed_filename.partition(".") target_path = f"{dir_parts}{sep}{filename}" @@ -205,6 +214,8 @@ def fetch(self, url: str) -> Iterator[bytes]: else: raise FetcherHTTPError(f"Unknown path '{path}'", 404) + # Disable the redefined-builtin warning raised because of the hash func. + # pylint: disable-next=redefined-builtin def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes: """Return data for 'target_path', checking 'hash' if it is given. @@ -219,7 +230,9 @@ def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes: logger.debug("fetched target %s", target_path) return repo_target.data - def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + def _fetch_metadata( + self, role: str, version: Optional[int] = None + ) -> bytes: """Return signed metadata for 'role', using 'version' if it is given. If version is None, non-versioned metadata is being requested @@ -228,35 +241,35 @@ def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: # return a version previously serialized in publish_root() if version is None or version > len(self.signed_roots): raise FetcherHTTPError(f"Unknown root version {version}", 404) - logger.debug("fetched root version %d", role, version) + logger.debug("fetched root version %d", version) return self.signed_roots[version - 1] + + # sign and serialize the requested metadata + if role == "timestamp": + md: Metadata = self.md_timestamp + elif role == "snapshot": + md = self.md_snapshot + elif role == "targets": + md = self.md_targets else: - # sign and serialize the requested metadata - if role == "timestamp": - md: Metadata = self.md_timestamp - elif role == "snapshot": - md = self.md_snapshot - elif role == "targets": - md = self.md_targets - else: - md = self.md_delegates[role] - - if md is None: - raise FetcherHTTPError(f"Unknown role {role}", 404) - if version is not None and version != md.signed.version: - raise FetcherHTTPError(f"Unknown {role} version {version}", 404) - - md.signatures.clear() - for signer in self.signers[role].values(): - md.sign(signer, append=True) - - logger.debug( - "fetched %s v%d with %d sigs", - role, - md.signed.version, - len(self.signers[role]), - ) - return md.to_bytes(JSONSerializer()) + md = self.md_delegates[role] + + if md is None: + raise FetcherHTTPError(f"Unknown role {role}", 404) + if version is not None and version != md.signed.version: + raise FetcherHTTPError(f"Unknown {role} version {version}", 404) + + md.signatures.clear() + for signer in self.signers[role].values(): + md.sign(signer, append=True) + + logger.debug( + "fetched %s v%d with %d sigs", + role, + md.signed.version, + len(self.signers[role]), + ) + return md.to_bytes(JSONSerializer()) def _compute_hashes_and_length( self, role: str @@ -264,10 +277,11 @@ def _compute_hashes_and_length( data = self._fetch_metadata(role) digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) digest_object.update(data) - hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} + hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} return hashes, len(data) def update_timestamp(self): + """Update timestamp and assign snapshot ver to snapshot_meta ver""" self.timestamp.snapshot_meta.version = self.snapshot.version if self.compute_metafile_hashes_length: @@ -278,6 +292,7 @@ def update_timestamp(self): self.timestamp.version += 1 def update_snapshot(self): + """Update snapshot, assign targets versions and update timestamp""" for role, delegate in self.all_targets(): hashes = None length = None @@ -292,6 +307,7 @@ def update_snapshot(self): self.update_timestamp() def add_target(self, role: str, data: bytes, path: str): + """Create a target from data and add it to the target_files""" if role == "targets": targets = self.targets else: @@ -310,6 +326,7 @@ def add_delegation( paths: Optional[List[str]], hash_prefixes: Optional[List[str]], ): + """Add delegated target role to the repository""" if delegator_name == "targets": delegator = self.targets else: @@ -341,17 +358,18 @@ def write(self): print(f"Repository Simulator dumps in {self.dump_dir}") self.dump_version += 1 - dir = os.path.join(self.dump_dir, str(self.dump_version)) - os.makedirs(dir) + dest_dir = os.path.join(self.dump_dir, str(self.dump_version)) + os.makedirs(dest_dir) for ver in range(1, len(self.signed_roots) + 1): - with open(os.path.join(dir, f"{ver}.root.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f: f.write(self._fetch_metadata("root", ver)) for role in ["timestamp", "snapshot", "targets"]: - with open(os.path.join(dir, f"{role}.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) + # pylint: disable-next=consider-iterating-dictionary for role in self.md_delegates.keys(): - with open(os.path.join(dir, f"{role}.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) diff --git a/tests/test_api.py b/tests/test_api.py index d3b3eddec8..9ed486037f 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -7,59 +7,44 @@ """ import json -import sys import logging import os import shutil +import sys import tempfile import unittest - from datetime import datetime, timedelta + from dateutil.relativedelta import relativedelta +from securesystemslib import hash as sslib_hash +from securesystemslib.interface import ( + import_ed25519_privatekey_from_file, + import_ed25519_publickey_from_file, +) +from securesystemslib.keys import generate_ed25519_key +from securesystemslib.signer import Signature, SSlibSigner from tests import utils - from tuf import exceptions from tuf.api.metadata import ( + DelegatedRole, + Key, Metadata, + MetaFile, Root, Snapshot, - Timestamp, - Targets, - Key, - MetaFile, TargetFile, - DelegatedRole, -) - -from tuf.api.serialization import ( - DeserializationError -) - -from tuf.api.serialization.json import ( - JSONSerializer, - CanonicalJSONSerializer -) - -from securesystemslib.interface import ( - import_ed25519_publickey_from_file, - import_ed25519_privatekey_from_file -) - -from securesystemslib import hash as sslib_hash -from securesystemslib.signer import ( - SSlibSigner, - Signature -) - -from securesystemslib.keys import ( - generate_ed25519_key + Targets, + Timestamp, ) +from tuf.api.serialization import DeserializationError +from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer logger = logging.getLogger(__name__) class TestMetadata(unittest.TestCase): + """Tests for public API of all classes in tuf/api/metadata.py""" @classmethod def setUpClass(cls): @@ -70,62 +55,61 @@ def setUpClass(cls): cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) test_repo_data = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'repository_data') + os.path.dirname(os.path.realpath(__file__)), "repository_data" + ) - cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + cls.repo_dir = os.path.join(cls.temporary_directory, "repository") shutil.copytree( - os.path.join(test_repo_data, 'repository'), cls.repo_dir) + os.path.join(test_repo_data, "repository"), cls.repo_dir + ) - cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + cls.keystore_dir = os.path.join(cls.temporary_directory, "keystore") shutil.copytree( - os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + os.path.join(test_repo_data, "keystore"), cls.keystore_dir + ) # Load keys into memory cls.keystore = {} - for role in ['delegation', 'snapshot', 'targets', 'timestamp']: + for role in ["delegation", "snapshot", "targets", "timestamp"]: cls.keystore[role] = import_ed25519_privatekey_from_file( - os.path.join(cls.keystore_dir, role + '_key'), - password="password" + os.path.join(cls.keystore_dir, role + "_key"), + password="password", ) - @classmethod def tearDownClass(cls): # Remove the temporary repository directory, which should contain all # the metadata, targets, and key files generated for the test cases. shutil.rmtree(cls.temporary_directory) - def test_generic_read(self): for metadata, inner_metadata_cls in [ - ('root', Root), - ('snapshot', Snapshot), - ('timestamp', Timestamp), - ('targets', Targets)]: + ("root", Root), + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets), + ]: # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_file(path) - with open(path, 'rb') as f: - metadata_obj2 = Metadata.from_bytes(f.read()) + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + md_obj = Metadata.from_file(path) + with open(path, "rb") as f: + md_obj2 = Metadata.from_bytes(f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... - self.assertTrue( - isinstance(metadata_obj.signed, inner_metadata_cls)) - self.assertTrue( - isinstance(metadata_obj2.signed, inner_metadata_cls)) + self.assertTrue(isinstance(md_obj.signed, inner_metadata_cls)) + self.assertTrue(isinstance(md_obj2.signed, inner_metadata_cls)) # ... and return the same object (compared by dict representation) - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict()) + self.assertDictEqual(md_obj.to_dict(), md_obj2.to_dict()) # Assert that it chokes correctly on an unknown metadata type - bad_metadata_path = 'bad-metadata.json' - bad_metadata = {'signed': {'_type': 'bad-metadata'}} - bad_string = json.dumps(bad_metadata).encode('utf-8') - with open(bad_metadata_path, 'wb') as f: + bad_metadata_path = "bad-metadata.json" + bad_metadata = {"signed": {"_type": "bad-metadata"}} + bad_string = json.dumps(bad_metadata).encode("utf-8") + with open(bad_metadata_path, "wb") as f: f.write(bad_string) with self.assertRaises(DeserializationError): @@ -135,56 +119,46 @@ def test_generic_read(self): os.remove(bad_metadata_path) - def test_compact_json(self): - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_file(path) + path = os.path.join(self.repo_dir, "metadata", "targets.json") + md_obj = Metadata.from_file(path) self.assertTrue( - len(JSONSerializer(compact=True).serialize(metadata_obj)) < - len(JSONSerializer().serialize(metadata_obj))) - + len(JSONSerializer(compact=True).serialize(md_obj)) + < len(JSONSerializer().serialize(md_obj)) + ) def test_read_write_read_compare(self): - for metadata in ['root', 'snapshot', 'timestamp', 'targets']: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_file(path) - - path_2 = path + '.tmp' - metadata_obj.to_file(path_2) - metadata_obj_2 = Metadata.from_file(path_2) + for metadata in ["root", "snapshot", "timestamp", "targets"]: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + md_obj = Metadata.from_file(path) - self.assertDictEqual( - metadata_obj.to_dict(), - metadata_obj_2.to_dict()) + path_2 = path + ".tmp" + md_obj.to_file(path_2) + md_obj_2 = Metadata.from_file(path_2) + self.assertDictEqual(md_obj.to_dict(), md_obj_2.to_dict()) os.remove(path_2) - def test_to_from_bytes(self): for metadata in ["root", "snapshot", "timestamp", "targets"]: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - with open(path, 'rb') as f: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + with open(path, "rb") as f: metadata_bytes = f.read() - metadata_obj = Metadata.from_bytes(metadata_bytes) + md_obj = Metadata.from_bytes(metadata_bytes) # Comparate that from_bytes/to_bytes doesn't change the content # for two cases for the serializer: noncompact and compact. # Case 1: test noncompact by overriding the default serializer. - self.assertEqual( - metadata_obj.to_bytes(JSONSerializer()), metadata_bytes - ) + self.assertEqual(md_obj.to_bytes(JSONSerializer()), metadata_bytes) # Case 2: test compact by using the default serializer. - obj_bytes = metadata_obj.to_bytes() + obj_bytes = md_obj.to_bytes() metadata_obj_2 = Metadata.from_bytes(obj_bytes) - self.assertEqual( - metadata_obj_2.to_bytes(), obj_bytes - ) - + self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) def test_sign_verify(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') - root = Metadata[Root].from_file(root_path).signed + path = os.path.join(self.repo_dir, "metadata", "root.json") + root = Metadata[Root].from_file(path).signed # Locate the public keys we need from root targets_keyid = next(iter(root.roles["targets"].keyids)) @@ -195,74 +169,74 @@ def test_sign_verify(self): timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_file(path) + path = os.path.join(self.repo_dir, "metadata", "targets.json") + md_obj = Metadata.from_file(path) # ... it has a single existing signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... which is valid for the correct key. - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - snapshot_key.verify_signature(metadata_obj) + snapshot_key.verify_signature(md_obj) # Test verifying with explicitly set serializer - targets_key.verify_signature(metadata_obj, CanonicalJSONSerializer()) + targets_key.verify_signature(md_obj, CanonicalJSONSerializer()) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj, JSONSerializer()) + targets_key.verify_signature(md_obj, JSONSerializer()) - sslib_signer = SSlibSigner(self.keystore['snapshot']) + sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... - sig = metadata_obj.sign(sslib_signer, append=True) + sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and - self.assertEqual(len(metadata_obj.signatures), 2) + self.assertEqual(len(md_obj.signatures), 2) # ... both are valid for the corresponding keys. - targets_key.verify_signature(metadata_obj) - snapshot_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) + snapshot_key.verify_signature(md_obj) # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore['timestamp']) + sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... - metadata_obj.sign(sslib_signer, append=False) + md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... valid for that key. - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) - # Test failure on unknown scheme (securesystemslib UnsupportedAlgorithmError) + # Test failure on unknown scheme (securesystemslib + # UnsupportedAlgorithmError) scheme = timestamp_key.scheme timestamp_key.scheme = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.scheme = scheme # Test failure on broken public key data (securesystemslib CryptoError) public = timestamp_key.keyval["public"] timestamp_key.keyval["public"] = "ffff" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.keyval["public"] = public # Test failure with invalid signature (securesystemslib FormatError) - sig = metadata_obj.signatures[timestamp_keyid] + sig = md_obj.signatures[timestamp_keyid] correct_sig = sig.signature sig.signature = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) # Test failure with valid but incorrect signature - sig.signature = "ff"*64 + sig.signature = "ff" * 64 with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) sig.signature = correct_sig def test_metadata_base(self): - # Use of Snapshot is arbitrary, we're just testing the base class features - # with real data - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + # Use of Snapshot is arbitrary, we're just testing the base class + # features with real data + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") md = Metadata.from_file(snapshot_path) self.assertEqual(md.signed.version, 1) @@ -295,33 +269,35 @@ def test_metadata_base(self): # Test deserializing metadata with non-unique signatures: data = md.to_dict() - data["signatures"].append({"keyid": data["signatures"][0]["keyid"], "sig": "foo"}) + data["signatures"].append( + {"keyid": data["signatures"][0]["keyid"], "sig": "foo"} + ) with self.assertRaises(ValueError): Metadata.from_dict(data) - def test_metadata_snapshot(self): - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + hashes = { + "sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095" # pylint: disable=line-too-long + } fileinfo = MetaFile(2, 123, hashes) self.assertNotEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - snapshot.signed.update('role1', fileinfo) + snapshot.signed.update("role1", fileinfo) self.assertEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - def test_metadata_timestamp(self): timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) @@ -345,7 +321,9 @@ def test_metadata_timestamp(self): # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + hashes = { + "sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc" # pylint: disable=line-too-long + } fileinfo = MetaFile(2, 520, hashes) self.assertNotEqual( @@ -356,102 +334,99 @@ def test_metadata_timestamp(self): timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict() ) - def test_metadata_verify_delegate(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - role1_path = os.path.join( - self.repo_dir, 'metadata', 'role1.json') + role1_path = os.path.join(self.repo_dir, "metadata", "role1.json") role1 = Metadata[Targets].from_file(role1_path) - role2_path = os.path.join( - self.repo_dir, 'metadata', 'role2.json') + role2_path = os.path.join(self.repo_dir, "metadata", "role2.json") role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate('root', root) - root.verify_delegate('snapshot', snapshot) - root.verify_delegate('targets', targets) - targets.verify_delegate('role1', role1) - role1.verify_delegate('role2', role2) + root.verify_delegate("root", root) + root.verify_delegate("snapshot", snapshot) + root.verify_delegate("targets", targets) + targets.verify_delegate("role1", role1) + role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate('snapshot', snapshot) + snapshot.verify_delegate("snapshot", snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): - root.verify_delegate('role1', role1) + root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate('targets', targets) + targets.verify_delegate("targets", targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): - role2.verify_delegate('role1', role1) + role2.verify_delegate("role1", role1) # verify fails when delegate content is modified expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('timestamp', snapshot) + root.verify_delegate("timestamp", snapshot) # Add a key to snapshot role, make sure the new sig fails to verify ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) - snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff"*64) + snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles['snapshot'].threshold = 2 + root.signed.roles["snapshot"].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore['timestamp']), append=True) - root.verify_delegate('snapshot', snapshot) - + snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) + root.verify_delegate("snapshot", snapshot) def test_key_class(self): # Test if from_securesystemslib_key removes the private key from keyval # of a securesystemslib key dictionary. sslib_key = generate_ed25519_key() key = Key.from_securesystemslib_key(sslib_key) - self.assertFalse('private' in key.keyval.keys()) - + self.assertFalse("private" in key.keyval.keys()) def test_root_add_key_and_remove_key(self): - root_path = os.path.join( - self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) # Create a new key - root_key2 = import_ed25519_publickey_from_file( - os.path.join(self.keystore_dir, 'root_key2.pub')) - keyid = root_key2['keyid'] - key_metadata = Key(keyid, root_key2['keytype'], root_key2['scheme'], - root_key2['keyval']) + root_key2 = import_ed25519_publickey_from_file( + os.path.join(self.keystore_dir, "root_key2.pub") + ) + keyid = root_key2["keyid"] + key_metadata = Key( + keyid, + root_key2["keytype"], + root_key2["scheme"], + root_key2["keyval"], + ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key('root', key_metadata) + root.signed.add_key("root", key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles['root'].keyids) + self.assertIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -459,31 +434,31 @@ def test_root_add_key_and_remove_key(self): root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles['root'].keyids.copy() - root.signed.add_key('root', key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles['root'].keyids) + pre_add_keyid = root.signed.roles["root"].keyids.copy() + root.signed.add_key("root", key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) # Add the same key to targets role as well - root.signed.add_key('targets', key_metadata) + root.signed.add_key("targets", key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key('root', keyid) - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + root.signed.remove_key("root", keyid) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key('targets', keyid) - self.assertNotIn(keyid, root.signed.roles['targets'].keyids) + root.signed.remove_key("targets", keyid) + self.assertNotIn(keyid, root.signed.roles["targets"].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key('root', 'nosuchkey') + root.signed.remove_key("root", "nosuchkey") with self.assertRaises(ValueError): - root.signed.remove_key('nosuchrole', keyid) + root.signed.remove_key("nosuchrole", keyid) def test_is_target_in_pathpattern(self): supported_use_cases = [ @@ -500,33 +475,33 @@ def test_is_target_in_pathpattern(self): ] for targetpath, pathpattern in supported_use_cases: self.assertTrue( + # pylint: disable-next=protected-access DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) invalid_use_cases = [ ("targets/foo.tgz", "*.tgz"), - ("/foo.tgz", "*.tgz",), + ("/foo.tgz", "*.tgz"), ("targets/foo.tgz", "*"), ("foo-version-alpha.tgz", "foo-version-?.tgz"), ("foo//bar", "*/bar"), - ("foo/bar", "f?/bar") + ("foo/bar", "f?/bar"), ] for targetpath, pathpattern in invalid_use_cases: self.assertFalse( + # pylint: disable-next=protected-access DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) - def test_metadata_targets(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - # Create a fileinfo dict representing what we expect the updated data to be - filename = 'file2.txt' + # Create a fileinfo dict representing the expected updated data. + filename = "file2.txt" hashes = { - "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", - "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0" + "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", # pylint: disable=line-too-long + "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0", # pylint: disable=line-too-long } fileinfo = TargetFile(length=28, hashes=hashes, path=filename) @@ -543,26 +518,27 @@ def test_metadata_targets(self): ) def test_targets_key_api(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets: Targets = Metadata[Targets].from_file(targets_path).signed # Add a new delegated role "role2" in targets - delegated_role = DelegatedRole.from_dict({ + delegated_role = DelegatedRole.from_dict( + { "keyids": [], "name": "role2", "paths": ["fn3", "fn4"], "terminating": False, - "threshold": 1 - }) + "threshold": 1, + } + ) targets.delegations.roles["role2"] = delegated_role key_dict = { "keytype": "ed25519", "keyval": { - "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" + "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" # pylint: disable=line-too-long }, - "scheme": "ed25519" + "scheme": "ed25519", } key = Key.from_dict("id2", key_dict) @@ -618,19 +594,18 @@ def test_targets_key_api(self): targets.remove_key("role1", key.keyid) self.assertTrue(targets.delegations is None) - - def test_length_and_hash_validation(self): + def test_length_and_hash_validation(self): # Test metadata files' hash and length verification. # Use timestamp to get a MetaFile object and snapshot # for untrusted metadata file to verify. timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) snapshot_metafile = timestamp.signed.snapshot_meta - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") with open(snapshot_path, "rb") as file: # test with data as a file object @@ -643,36 +618,49 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = snapshot_metafile.length snapshot_metafile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) snapshot_metafile.length = expected_length - snapshot_metafile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) - snapshot_metafile.hashes = {'unsupported-alg': "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + "unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # pylint: disable=line-too-long + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # Test wrong algorithm format (sslib.FormatError) - snapshot_metafile.hashes = { 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # pylint: disable=line-too-long + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # test optional length and hashes snapshot_metafile.length = None snapshot_metafile.hashes = None snapshot_metafile.verify_length_and_hashes(data) - # Test target files' hash and length verification - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - file1_targetfile = targets.signed.targets['file1.txt'] - filepath = os.path.join( - self.repo_dir, 'targets', 'file1.txt') + file1_targetfile = targets.signed.targets["file1.txt"] + filepath = os.path.join(self.repo_dir, "targets", "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -680,50 +668,58 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = file1_targetfile.length file1_targetfile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) file1_targetfile.length = expected_length - file1_targetfile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + file1_targetfile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) def test_targetfile_from_file(self): # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") targetfile_from_file = TargetFile.from_file( - file_path, file_path, ['sha256'] + file_path, file_path, ["sha256"] ) with open(file_path, "rb") as file: targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, 'targets', 'file123.txt') + file_path = os.path.join(self.repo_dir, "targets", "file123.txt") self.assertRaises( - FileNotFoundError, - TargetFile.from_file, - file_path, + FileNotFoundError, + TargetFile.from_file, + file_path, file_path, - [sslib_hash.DEFAULT_HASH_ALGORITHM] + [sslib_hash.DEFAULT_HASH_ALGORITHM], ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") self.assertRaises( exceptions.UnsupportedAlgorithmError, - TargetFile.from_file, - file_path, + TargetFile.from_file, file_path, - ['123'] + file_path, + ["123"], ) def test_targetfile_from_data(self): data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') - + target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + # Test with a valid hash algorithm - targetfile_from_data = TargetFile.from_data(target_file_path, data, ['sha256']) + targetfile_from_data = TargetFile.from_data( + target_file_path, data, ["sha256"] + ) targetfile_from_data.verify_length_and_hashes(data) # Test with no algorithms specified @@ -753,7 +749,8 @@ def test_is_delegated_role(self): self.assertFalse(role.is_delegated_path("a/non-matching path")) self.assertTrue(role.is_delegated_path("a/path")) + # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index bf94f252d8..d03fab375e 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -6,126 +6,131 @@ """Unit test for RequestsFetcher. """ +import io import logging +import math import os -import io import sys -import unittest import tempfile -import math +import unittest import tuf import tuf.exceptions import tuf.requests_fetcher -import tuf.unittest_toolbox as unittest_toolbox - from tests import utils +from tuf import unittest_toolbox logger = logging.getLogger(__name__) class TestFetcher(unittest_toolbox.Modified_TestCase): - def setUp(self): - """ - Create a temporary file and launch a simple server in the - current working directory. - """ - - unittest_toolbox.Modified_TestCase.setUp(self) - - # Making a temporary file. - current_dir = os.getcwd() - target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, 'r') - self.file_contents = self.target_fileobj.read() - self.file_length = len(self.file_contents) - - # Launch a SimpleHTTPServer (serves files in the current dir). - self.server_process_handler = utils.TestServerProcess(log=logger) - - rel_target_filepath = os.path.basename(target_filepath) - self.url = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ - + str(self.server_process_handler.port) + '/' + rel_target_filepath - - # Create a temporary file where the target file chunks are written - # during fetching - self.temp_file = tempfile.TemporaryFile() - self.fetcher = tuf.requests_fetcher.RequestsFetcher() - - - # Stop server process and perform clean up. - def tearDown(self): - # Cleans the resources and flush the logged lines (if any). - self.server_process_handler.clean() - - self.target_fileobj.close() - self.temp_file.close() - - # Remove temporary directory - unittest_toolbox.Modified_TestCase.tearDown(self) - - - # Test: Normal case. - def test_fetch(self): - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - - # Test if fetcher downloads file up to a required length - def test_fetch_restricted_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length-4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length-4) - - - # Test that fetcher does not download more than actual file length - def test_fetch_upper_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length+4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length) - - - # Test incorrect URL parsing - def test_url_parsing(self): - with self.assertRaises(tuf.exceptions.URLParsingError): - self.fetcher.fetch(self.random_string(), self.file_length) - - - # Test: Normal case with url data downloaded in more than one chunk - def test_fetch_in_chunks(self): - # Set smaller chunk size to ensure that the file will be downloaded - # in more than one chunk - default_chunk_size = tuf.settings.CHUNK_SIZE - tuf.settings.CHUNK_SIZE = 4 - - # expected_chunks_count: 3 - expected_chunks_count = math.ceil(self.file_length/tuf.settings.CHUNK_SIZE) - self.assertEqual(expected_chunks_count, 3) - - chunks_count = 0 - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - chunks_count+=1 - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - # Check that we calculate chunks as expected - self.assertEqual(chunks_count, expected_chunks_count) - - # Restore default settings - tuf.settings.CHUNK_SIZE = default_chunk_size - + """Unit tests for RequestFetcher""" + + def setUp(self): + """ + Create a temporary file and launch a simple server in the + current working directory. + """ + + unittest_toolbox.Modified_TestCase.setUp(self) + + # Making a temporary file. + current_dir = os.getcwd() + target_filepath = self.make_temp_data_file(directory=current_dir) + # pylint: disable-next=consider-using-with + self.target_fileobj = open(target_filepath, "r", encoding="utf8") + self.file_contents = self.target_fileobj.read() + self.file_length = len(self.file_contents) + + # Launch a SimpleHTTPServer (serves files in the current dir). + self.server_process_handler = utils.TestServerProcess(log=logger) + + rel_target_filepath = os.path.basename(target_filepath) + self.url = ( + "http://" + + utils.TEST_HOST_ADDRESS + + ":" + + str(self.server_process_handler.port) + + "/" + + rel_target_filepath + ) + + # Create a temporary file where the target file chunks are written + # during fetching + # pylint: disable-next=consider-using-with + self.temp_file = tempfile.TemporaryFile() + self.fetcher = tuf.requests_fetcher.RequestsFetcher() + + # Stop server process and perform clean up. + def tearDown(self): + # Cleans the resources and flush the logged lines (if any). + self.server_process_handler.clean() + + self.target_fileobj.close() + self.temp_file.close() + + # Remove temporary directory + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Test: Normal case. + def test_fetch(self): + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + + # Test if fetcher downloads file up to a required length + def test_fetch_restricted_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length - 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length - 4) + + # Test that fetcher does not download more than actual file length + def test_fetch_upper_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length + 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length) + + # Test incorrect URL parsing + def test_url_parsing(self): + with self.assertRaises(tuf.exceptions.URLParsingError): + self.fetcher.fetch(self.random_string(), self.file_length) + + # Test: Normal case with url data downloaded in more than one chunk + def test_fetch_in_chunks(self): + # Set smaller chunk size to ensure that the file will be downloaded + # in more than one chunk + default_chunk_size = tuf.settings.CHUNK_SIZE + tuf.settings.CHUNK_SIZE = 4 + + # expected_chunks_count: 3 + expected_chunks_count = math.ceil( + self.file_length / tuf.settings.CHUNK_SIZE + ) + self.assertEqual(expected_chunks_count, 3) + + chunks_count = 0 + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + chunks_count += 1 + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + # Check that we calculate chunks as expected + self.assertEqual(chunks_count, expected_chunks_count) + + # Restore default settings + tuf.settings.CHUNK_SIZE = default_chunk_size # Run unit test. -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index d384ee3132..4d10c30202 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -8,23 +8,25 @@ import io import logging +import math import os import sys -import unittest import tempfile -import math -import urllib3.exceptions +import unittest +from unittest.mock import Mock, patch + import requests +import urllib3.exceptions from tests import utils from tuf import exceptions, unittest_toolbox from tuf.ngclient._internal.requests_fetcher import RequestsFetcher -from unittest.mock import Mock, patch logger = logging.getLogger(__name__) class TestFetcher(unittest_toolbox.Modified_TestCase): + """Test RequestsFetcher class""" @classmethod def setUpClass(cls): @@ -48,11 +50,16 @@ def setUp(self): current_dir = os.getcwd() target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, "r") + # pylint: disable-next=consider-using-with + self.target_fileobj = open(target_filepath, "r", encoding="utf8") self.file_contents = self.target_fileobj.read() self.file_length = len(self.file_contents) self.rel_target_filepath = os.path.basename(target_filepath) - self.url = f"http://{utils.TEST_HOST_ADDRESS}:{str(self.server_process_handler.port)}/{self.rel_target_filepath}" + self.url_prefix = ( + f"http://{utils.TEST_HOST_ADDRESS}:" + f"{str(self.server_process_handler.port)}" + ) + self.url = f"{self.url_prefix}/{self.rel_target_filepath}" # Instantiate a concrete instance of FetcherInterface self.fetcher = RequestsFetcher() @@ -106,15 +113,19 @@ def test_url_parsing(self): # File not found error def test_http_error(self): with self.assertRaises(exceptions.FetcherHTTPError) as cm: - self.url = f"http://{utils.TEST_HOST_ADDRESS}:{str(self.server_process_handler.port)}/non-existing-path" + self.url = f"{self.url_prefix}/non-existing-path" self.fetcher.fetch(self.url) self.assertEqual(cm.exception.status_code, 404) # Response read timeout error - @patch.object(requests.Session, 'get') + @patch.object(requests.Session, "get") def test_response_read_timeout(self, mock_session_get): mock_response = Mock() - attr = {'raw.read.side_effect': urllib3.exceptions.ReadTimeoutError(None, None, "Read timed out.")} + attr = { + "raw.read.side_effect": urllib3.exceptions.ReadTimeoutError( + None, None, "Read timed out." + ) + } mock_response.configure_mock(**attr) mock_session_get.return_value = mock_response @@ -123,7 +134,9 @@ def test_response_read_timeout(self, mock_session_get): mock_response.raw.read.assert_called_once() # Read/connect session timeout error - @patch.object(requests.Session, 'get', side_effect=urllib3.exceptions.TimeoutError) + @patch.object( + requests.Session, "get", side_effect=urllib3.exceptions.TimeoutError + ) def test_session_get_timeout(self, mock_session_get): with self.assertRaises(exceptions.SlowRetrievalError): self.fetcher.fetch(self.url) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 1ae70e1fcc..a14ec91684 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -6,33 +6,36 @@ """ +import copy import json -import sys import logging +import sys import unittest -import copy - from typing import Dict from tests import utils - from tuf.api.metadata import ( - Root, - Snapshot, - Timestamp, - Targets, + DelegatedRole, + Delegations, Key, - Role, MetaFile, + Role, + Root, + Snapshot, TargetFile, - Delegations, - DelegatedRole, + Targets, + Timestamp, ) +# This file is a special case where having longer lines helps the +# readability of the test cases data. +# pylint: disable=line-too-long + logger = logging.getLogger(__name__) class TestSerialization(unittest.TestCase): + """Test serialization for all classes in tuf/api/metadata.py""" # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. @@ -41,28 +44,17 @@ class TestSerialization(unittest.TestCase): "no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no expires": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "meta": {}}', - "empty str _type": - '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "empty str spec_version": - '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "_type wrong type": - '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version wrong type": - '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "invalid spec_version str": - '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "two digit spec_version": - '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "no digit spec_version": - '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "different major spec_version": - '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version below 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "wrong datetime string": - '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', + "empty str _type": '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "empty str spec_version": '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "_type wrong type": '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version wrong type": '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "invalid spec_version str": '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "two digit spec_version": '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "no digit spec_version": '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "different major spec_version": '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version 0": '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version below 0": '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "wrong datetime string": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', } @utils.run_sub_tests_with_dataset(invalid_signed) @@ -71,7 +63,6 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(copy.deepcopy(case_dict)) - valid_keys: utils.DataSet = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', @@ -87,7 +78,6 @@ def test_valid_key_serialization(self, test_case_data: str): key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: utils.DataSet = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', @@ -120,7 +110,6 @@ def test_invalid_role_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(copy.deepcopy(case_dict)) - valid_roles: utils.DataSet = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', @@ -134,7 +123,6 @@ def test_role_serialization(self, test_case_data: str): role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: utils.DataSet = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ @@ -183,7 +171,6 @@ def test_root_serialization(self, test_case_data: str): root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_roots: utils.DataSet = { "invalid role name": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ @@ -236,12 +223,13 @@ def test_invalid_root_serialization(self, test_case_data: Dict[str, str]): } @utils.run_sub_tests_with_dataset(invalid_metafiles) - def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_metafile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(copy.deepcopy(case_dict)) - valid_metafiles: utils.DataSet = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', @@ -261,12 +249,13 @@ def test_metafile_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_timestamps) - def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_timestamp_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(copy.deepcopy(case_dict)) - valid_timestamps: utils.DataSet = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', @@ -280,7 +269,6 @@ def test_timestamp_serialization(self, test_case_data: str): timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: utils.DataSet = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ @@ -300,23 +288,18 @@ def test_snapshot_serialization(self, test_case_data: str): snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. - "no hash prefix attribute": - '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ + "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', - "no path attribute": - '{"keyids": ["keyid"], "name": "a", "terminating": false, \ + "no path attribute": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": ["h1", "h2"], "threshold": 99}', "empty paths": '{"keyids": ["keyid"], "name": "a", "paths": [], \ "terminating": false, "threshold": 1}', "empty path_hash_prefixes": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": [], "threshold": 99}', - "unrecognized field": - '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', - "many keyids": - '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ + "unrecognized field": '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', + "many keyids": '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', } @@ -326,13 +309,10 @@ def test_delegated_role_serialization(self, test_case_data: str): deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. - "missing hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', - "both hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ + "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', + "both hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', } @@ -342,9 +322,8 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str): with self.assertRaises(ValueError): DelegatedRole.from_dict(copy.copy(case_dict)) - invalid_delegations: utils.DataSet = { - "empty delegations": '{}', + "empty delegations": "{}", "bad keys": '{"keys": "foo", \ "roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}', "bad roles": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ @@ -363,18 +342,15 @@ def test_invalid_delegation_serialization(self, test_case_data: str): with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(copy.deepcopy(case_dict)) - valid_delegations: utils.DataSet = { - "all": - '{"keys": { \ + "all": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ {"keyids": ["keyid2"], "name": "b", "terminating": true, "paths": ["fn2"], "threshold": 4} ] \ }', - "unrecognized field": - '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ + "unrecognized field": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \ "foo": "bar"}', "empty keys and roles": '{"keys": {}, \ @@ -388,7 +364,6 @@ def test_delegation_serialization(self, test_case_data: str): delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: utils.DataSet = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}' @@ -397,12 +372,13 @@ def test_delegation_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_targetfiles) - def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_targetfile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") - valid_targetfiles: utils.DataSet = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', @@ -417,7 +393,6 @@ def test_targetfile_serialization(self, test_case_data: str): target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: utils.DataSet = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ @@ -452,6 +427,6 @@ def test_targets_serialization(self, test_case_data): # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 925b16a935..9a8708fcfa 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,42 +1,44 @@ +"""Unit tests for tuf/ngclient/_internal/trusted_metadata_set.py""" import logging -from typing import Optional, Callable import os import sys import unittest from datetime import datetime +from typing import Callable, Optional +from securesystemslib.interface import ( + import_ed25519_privatekey_from_file, + import_rsa_privatekey_from_file, +) +from securesystemslib.signer import SSlibSigner + +from tests import utils from tuf import exceptions from tuf.api.metadata import ( Metadata, - Signed, + MetaFile, Root, - Timestamp, + Signed, Snapshot, - MetaFile, - Targets + Targets, + Timestamp, ) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet -from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import( - import_ed25519_privatekey_from_file, - import_rsa_privatekey_from_file -) - -from tests import utils - logger = logging.getLogger(__name__) +# pylint: disable-next=too-many-public-methods class TestTrustedMetadataSet(unittest.TestCase): + """Tests for all public API of the TrustedMetadataSet class""" def modify_metadata( - self, rolename: str, modification_func: Callable[["Signed"], None] + self, rolename: str, modification_func: Callable[[Signed], None] ) -> bytes: """Instantiate metadata from rolename type, call modification_func and sign it again with self.keystore[rolename] signer. Attributes: - rolename: A denoting the name of the metadata which will be modified. + rolename: Ddenoting the name of the metadata which will be modified. modification_func: Function that will be called to modify the signed portion of metadata bytes. """ @@ -48,24 +50,29 @@ def modify_metadata( @classmethod def setUpClass(cls): cls.repo_dir = os.path.join( - os.getcwd(), 'repository_data', 'repository', 'metadata' + os.getcwd(), "repository_data", "repository", "metadata" ) cls.metadata = {} - for md in ["root", "timestamp", "snapshot", "targets", "role1", "role2"]: + for md in [ + "root", + "timestamp", + "snapshot", + "targets", + "role1", + "role2", + ]: with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: cls.metadata[md] = f.read() - keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore') + keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + '_key'), - password="password" + os.path.join(keystore_dir, "root" + "_key"), password="password" ) cls.keystore["root"] = SSlibSigner(root_key_dict) for role in ["delegation", "snapshot", "targets", "timestamp"]: key_dict = import_ed25519_privatekey_from_file( - os.path.join(keystore_dir, role + '_key'), - password="password" + os.path.join(keystore_dir, role + "_key"), password="password" ) cls.keystore[role] = SSlibSigner(key_dict) @@ -80,7 +87,6 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - def _update_all_besides_targets( self, timestamp_bytes: Optional[bytes] = None, @@ -103,7 +109,6 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def test_update(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) self.trusted_set.update_snapshot(self.metadata["snapshot"]) @@ -224,7 +229,7 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self): def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) - + # intermediate root can be expired root = self.modify_metadata("root", root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) @@ -232,12 +237,11 @@ def root_expired_modifier(root: Root) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) - def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - + timestamp = self.modify_metadata("timestamp", version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): @@ -261,7 +265,9 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: timestamp.expires = datetime(1970, 1, 1) # expired intermediate timestamp is loaded but raises - timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_expired_modifier + ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) @@ -291,7 +297,9 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self): def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 - timestamp = self.modify_metadata("timestamp", timestamp_version_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_version_modifier + ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise @@ -302,9 +310,9 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) - def test_update_snapshot_file_removed_from_meta(self): self._update_all_besides_targets(self.metadata["timestamp"]) + def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] @@ -327,6 +335,7 @@ def version_meta_modifier(snapshot: Snapshot) -> None: def test_update_snapshot_expired_new_snapshot(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) + def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) @@ -406,6 +415,6 @@ def target_expired_modifier(target: Targets) -> None: # TODO test updating over initial metadata (new keys, newer timestamp, etc) -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index 0dad301ecd..c1cc7f4ba9 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -5,22 +5,21 @@ """Test ngclient Updater key rotation handling""" -from dataclasses import dataclass -from typing import List, Optional, Type import os import sys import tempfile import unittest +from dataclasses import dataclass +from typing import List, Optional, Type from securesystemslib.signer import SSlibSigner -from tuf.api.metadata import Key -from tuf.exceptions import UnsignedMetadataError -from tuf.ngclient import Updater - from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset +from tuf.api.metadata import Key +from tuf.exceptions import UnsignedMetadataError +from tuf.ngclient import Updater @dataclass @@ -33,6 +32,7 @@ class RootVersion: class TestUpdaterKeyRotations(unittest.TestCase): """Test ngclient root rotation handling""" + # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 69fbeef8cd..5c781da09a 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -6,30 +6,33 @@ """Test Updater class """ +import logging import os import shutil -import tempfile -import logging import sys -from typing import List +import tempfile import unittest -import tuf.unittest_toolbox as unittest_toolbox +from typing import List + +from securesystemslib.interface import import_rsa_privatekey_from_file +from securesystemslib.signer import SSlibSigner from tests import utils +from tuf import exceptions, ngclient, unittest_toolbox from tuf.api.metadata import Metadata, TargetFile -from tuf import exceptions, ngclient -from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import import_rsa_privatekey_from_file logger = logging.getLogger(__name__) class TestUpdater(unittest_toolbox.Modified_TestCase): + """Test the Updater class from tuf.ngclient.updater.py""" + @classmethod def setUpClass(cls): - # Create a temporary directory to store the repository, metadata, and target - # files. 'temporary_directory' must be deleted in TearDownModule() so that - # temporary files are always removed, even when exceptions occur. + # Create a temporary directory to store the repository, metadata, and + # target files. 'temporary_directory' must be deleted in + # TearDownModule() so that temporary files are always removed, even when + # exceptions occur. cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) # Needed because in some tests simple_server.py cannot be found. @@ -50,8 +53,8 @@ def tearDownClass(cls): # Cleans the resources and flush the logged lines (if any). cls.server_process_handler.clean() - # Remove the temporary repository directory, which should contain all the - # metadata, targets, and key files generated for the test cases + # Remove the temporary repository directory, which should contain all + # the metadata, targets, and key files generated for the test cases shutil.rmtree(cls.temporary_directory) def setUp(self): @@ -59,15 +62,15 @@ def setUp(self): unittest_toolbox.Modified_TestCase.setUp(self) # Copy the original repository files provided in the test folder so that - # any modifications made to repository files are restricted to the copies. + # any modifications are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf.tests/'. original_repository_files = os.path.join(os.getcwd(), "repository_data") temporary_repository_root = self.make_temp_directory( directory=self.temporary_directory ) - # The original repository, keystore, and client directories will be copied - # for each test case. + # The original repository, keystore, and client directories will be + # copied for each test case. original_repository = os.path.join( original_repository_files, "repository" ) @@ -118,7 +121,7 @@ def setUp(self): repository_dir=self.client_directory, metadata_base_url=self.metadata_url, target_dir=self.dl_dir, - target_base_url=self.targets_url + target_base_url=self.targets_url, ) def tearDown(self): @@ -182,6 +185,7 @@ def _assert_files(self, roles: List[str]): client_files = sorted(os.listdir(self.client_directory)) self.assertEqual(client_files, expected_files) + # pylint: disable=protected-access def test_refresh_on_consistent_targets(self): # Generate a new root version where consistent_snapshot is set to true def consistent_snapshot_modifier(root): @@ -191,15 +195,16 @@ def consistent_snapshot_modifier(root): consistent_snapshot_modifier, bump_version=True ) updater = ngclient.Updater( - self.client_directory, self.metadata_url, self.dl_dir, self.targets_url + self.client_directory, + self.metadata_url, + self.dl_dir, + self.targets_url, ) # All metadata is in local directory already updater.refresh() # Make sure that consistent snapshot is enabled - self.assertTrue( - updater._trusted_set.root.signed.consistent_snapshot - ) + self.assertTrue(updater._trusted_set.root.signed.consistent_snapshot) # Get targetinfos, assert cache does not contain the files info1 = updater.get_targetinfo("file1.txt") @@ -226,7 +231,7 @@ def consistent_snapshot_modifier(root): self.assertEqual(path, os.path.join(self.dl_dir, info3.path)) def test_refresh_and_download(self): - # Test refresh without consistent targets - targets without hash prefixes. + # Test refresh without consistent targets - targets without hash prefix. # top-level targets are already in local cache (but remove others) os.remove(os.path.join(self.client_directory, "role1.json")) @@ -252,9 +257,7 @@ def test_refresh_and_download(self): self.updater.download_target(info1) path = self.updater.find_cached_target(info1) self.assertEqual(path, os.path.join(self.dl_dir, info1.path)) - self.assertIsNone( - self.updater.find_cached_target(info3) - ) + self.assertIsNone(self.updater.find_cached_target(info3)) self.updater.download_target(info3) path = self.updater.find_cached_target(info1) @@ -275,13 +278,15 @@ def test_refresh_with_only_local_root(self): self._assert_files(["root", "snapshot", "targets", "timestamp"]) # Get targetinfo for 'file3.txt' listed in the delegated role1 - targetinfo3 = self.updater.get_targetinfo("file3.txt") + self.updater.get_targetinfo("file3.txt") expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] self._assert_files(expected_files) def test_both_target_urls_not_set(self): # target_base_url = None and Updater._target_base_url = None - updater = ngclient.Updater(self.client_directory, self.metadata_url, self.dl_dir) + updater = ngclient.Updater( + self.client_directory, self.metadata_url, self.dl_dir + ) info = TargetFile(1, {"sha256": ""}, "targetpath") with self.assertRaises(ValueError): updater.download_target(info) @@ -315,6 +320,7 @@ def test_length_hash_mismatch(self): targetinfo.hashes = {"sha256": "abcd"} self.updater.download_target(targetinfo) + # pylint: disable=protected-access def test_updating_root(self): # Bump root version, resign and refresh self._modify_repository_root(lambda root: None, bump_version=True) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index 65727aa8c4..99c99454f9 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -9,23 +9,24 @@ import os import sys import tempfile -from tuf.api.metadata import SPECIFICATION_VERSION, Targets -from typing import Optional, Tuple -from tuf.exceptions import UnsignedMetadataError, BadVersionNumberError import unittest - -from tuf.ngclient import Updater +from typing import Optional, Tuple from tests import utils from tests.repository_simulator import RepositorySimulator -from securesystemslib import hash as sslib_hash +from tuf.api.metadata import SPECIFICATION_VERSION, Targets +from tuf.exceptions import BadVersionNumberError, UnsignedMetadataError +from tuf.ngclient import Updater class TestUpdater(unittest.TestCase): + """Test ngclient Updater using the repository simulator""" + # set dump_dir to trigger repository state dumps - dump_dir:Optional[str] = None + dump_dir: Optional[str] = None def setUp(self): + # pylint: disable-next=consider-using-with self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") @@ -35,12 +36,14 @@ def setUp(self): # Setup the repository, bootstrap client root.json self.sim = RepositorySimulator() with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000) + root = self.sim.download_bytes( + "https://example.com/metadata/1.root.json", 100000 + ) f.write(root) if self.dump_dir is not None: # create test specific dump directory - name = self.id().split('.')[-1] + name = self.id().split(".")[-1] self.sim.dump_dir = os.path.join(self.dump_dir, name) os.mkdir(self.sim.dump_dir) @@ -48,6 +51,7 @@ def tearDown(self): self.temp_dir.cleanup() def _run_refresh(self) -> Updater: + """Creates a new updater and runs refresh.""" if self.sim.dump_dir is not None: self.sim.write() @@ -56,7 +60,7 @@ def _run_refresh(self) -> Updater: "https://example.com/metadata/", self.targets_dir, "https://example.com/targets/", - self.sim + self.sim, ) updater.refresh() return updater @@ -85,7 +89,11 @@ def test_refresh(self): targets: utils.DataSet = { "standard case": ("targetpath", b"content", "targetpath"), "non-asci case": ("åäö", b"more content", "%C3%A5%C3%A4%C3%B6"), - "subdirectory case": ("a/b/c/targetpath", b"dir target content", "a%2Fb%2Fc%2Ftargetpath"), + "subdirectory case": ( + "a/b/c/targetpath", + b"dir target content", + "a%2Fb%2Fc%2Ftargetpath", + ), } @utils.run_sub_tests_with_dataset(targets) @@ -132,14 +140,17 @@ def test_fishy_rolenames(self): "": ".json", ".": "..json", "/": "%2F.json", - "ö": "%C3%B6.json" + "ö": "%C3%B6.json", } # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) + # pylint: disable-next=consider-iterating-dictionary for role in roles_to_filenames.keys(): - self.sim.add_delegation("targets", role, targets, False, ["*"], None) + self.sim.add_delegation( + "targets", role, targets, False, ["*"], None + ) self.sim.update_snapshot() updater = self._run_refresh() @@ -208,7 +219,7 @@ def test_snapshot_rollback_with_local_snapshot_hash_mismatch(self): self.sim.update_snapshot() self._run_refresh() - # The new targets should have a lower version than the local trusted one. + # The new targets must have a lower version than the local trusted one. self.sim.targets.version = 1 self.sim.update_snapshot() diff --git a/tox.ini b/tox.ini index b8359b7772..dbed7172fe 100644 --- a/tox.ini +++ b/tox.ini @@ -41,9 +41,9 @@ changedir = {toxinidir} commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) - black --check --diff --line-length 80 tuf/api tuf/ngclient - isort --check --diff --line-length 80 --profile black -p tuf tuf/api tuf/ngclient - pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc + black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ + isort --check --diff --line-length 80 --profile black --sp pyproject.toml -p tuf tuf/api tuf/ngclient tests/ + pylint -j 0 tuf/api tuf/ngclient tests/ --rcfile=pyproject.toml # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc deleted file mode 100644 index 01139a8804..0000000000 --- a/tuf/api/pylintrc +++ /dev/null @@ -1,53 +0,0 @@ -# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: -# https://github.com/secure-systems-lab/code-style-guidelines -# -# Based on Google Python Style Guide pylintrc and pylint defaults: -# https://google.github.io/styleguide/pylintrc -# http://pylint.pycqa.org/en/latest/technical_reference/features.html - -[MESSAGES CONTROL] -# Disable the message, report, category or checker with the given id(s). -# NOTE: To keep this config as short as possible we only disable checks that -# are currently in conflict with our code. If new code displeases the linter -# (for good reasons) consider updating this config file, or disable checks with -# 'pylint: disable=XYZ' comments. -disable=fixme, - too-few-public-methods, - too-many-arguments, - -[BASIC] -good-names=i,j,k,v,e,f,fn,fp,_type -# Regexes for allowed names are copied from the Google pylintrc -# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. -# If there are multiple groups it enfoces the prevalent naming style inside -# each modules. Names in the exempt capturing group are ignored. -function-rgx=^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$ -method-rgx=(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$ -argument-rgx=^[a-z][a-z0-9_]*$ -attr-rgx=^_{0,2}[a-z][a-z0-9_]*$ -class-attribute-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ -class-rgx=^_?[A-Z][a-zA-Z0-9]*$ -const-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ -inlinevar-rgx=^[a-z][a-z0-9_]*$ -module-rgx=^(_?[a-z][a-z0-9_]*|__init__)$ -no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$ -variable-rgx=^[a-z][a-z0-9_]*$ -docstring-min-length=10 - -[FORMAT] -ignore-long-lines=(?x)( - ^\s*(\#\ )??$| - ^\s*(from\s+\S+\s+)?import\s+.+$) -indent-string=" " -indent-after-paren=4 -max-line-length=80 -single-line-if-stmt=yes - -[LOGGING] -logging-format-style=old - -[MISCELLANEOUS] -notes=TODO - -[STRING] -check-quote-consistency=yes