From dd02b2a49fa4ee1cd36061e82d771b42281ca2ad Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 19 Oct 2021 15:09:14 +0300 Subject: [PATCH 01/12] Linting tests with black and exclude old tests Currently, we are using 4 tools: black, pylint, isort and mypy. We want to run all 4 of them on our tests for the new codebase to make it more readable and maintainable, but we don't want to run it on the old test files as they are following the old style guidelines. In order to achieve that we want to use an exclusion list instead of an inclusion list as the new code base will be developed in the future and new test files will appear. On another hand, we don't expect any more additional test files against the old code, so this list is static. I decided to hardcode the names we want to exclude because that way we won't have to remove the Git history, as opposed to renaming or moving the old test files. Even though the list is big, around 30 files, I think this solution is fine as this list will only contain files testing the old code meaning the list will have static content. Signed-off-by: Martin Vrachev --- pyproject.toml | 36 ++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2f21011953..1dd4ef13f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,39 @@ [build-system] requires = ["setuptools>=40.8.0", "wheel"] build-backend = "setuptools.build_meta" + +[tool.black] +force-exclude=""" +(tests/aggregate_tests.py| +tests/fast_server_exit.py| +tests/simple_https_server.py| +tests/simple_server.py| +tests/slow_retrieval_server.py| +tests/utils.py| +tests/test_utils.py| +tests/test_repository_lib.py| +tests/test_arbitrary_package_attack.py| +tests/test_developer_tool.py| +tests/test_download.py| +tests/test_endless_data_attack.py| +tests/test_extraneous_dependencies_attack.py| +tests/test_formats.py| +tests/test_indefinite_freeze_attack.py| +tests/test_key_revocation_integration.py| +tests/test_keydb.py| +tests/test_log.py| +tests/test_mirrors.py| +tests/test_mix_and_match_attack.py| +tests/test_multiple_repositories_integration.py| +tests/test_repository_tool.py| +tests/test_replay_attack.py| +tests/test_roledb.py| +tests/test_root_versioning_integration.py| +tests/test_sig.py| +tests/test_slow_retrieval_attack.py| +tests/test_tutorial.py| +tests/test_unittest_toolbox.py| +tests/test_updater.py| +tests/test_updater_root_rotation_integration.py| +tests/repository_data/generate_project_data.py| +tests/repository_data/generate.py)""" diff --git a/tox.ini b/tox.ini index b8359b7772..181cd3a743 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ changedir = {toxinidir} commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) - black --check --diff --line-length 80 tuf/api tuf/ngclient + black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ isort --check --diff --line-length 80 --profile black -p tuf tuf/api tuf/ngclient pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc From 1994bdf9f944ba53accfe8a9db1dd8c8902b169a Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 12 Oct 2021 16:14:24 +0300 Subject: [PATCH 02/12] Apply black on the tests of the new code All of the changes included are a result of applying black on our tests on the new code. Signed-off-by: Martin Vrachev --- tests/repository_simulator.py | 13 +- tests/test_api.py | 367 ++++++++++++++------------- tests/test_fetcher.py | 204 +++++++-------- tests/test_fetcher_ng.py | 13 +- tests/test_metadata_serialization.py | 88 +++---- tests/test_trusted_metadata_set.py | 51 ++-- tests/test_updater_key_rotations.py | 1 + tests/test_updater_ng.py | 19 +- tests/test_updater_with_simulator.py | 22 +- 9 files changed, 393 insertions(+), 385 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index d9e5885515..a71d301fb5 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -79,12 +79,15 @@ SPEC_VER = ".".join(SPECIFICATION_VERSION) + @dataclass class RepositoryTarget: """Contains actual target data and the related target metadata""" + data: bytes target_file: TargetFile + class RepositorySimulator(FetcherInterface): def __init__(self): self.md_root: Metadata[Root] = None @@ -141,7 +144,7 @@ def create_key() -> Tuple[Key, SSlibSigner]: sslib_key = generate_ed25519_key() return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key) - def add_signer(self, role:str, signer: SSlibSigner): + def add_signer(self, role: str, signer: SSlibSigner): if role not in self.signers: self.signers[role] = {} self.signers[role][signer.key_dict["keyid"]] = signer @@ -197,7 +200,7 @@ def fetch(self, url: str) -> Iterator[bytes]: elif path.startswith("/targets/"): # figure out target path and hash prefix target_path = path[len("/targets/") :] - dir_parts, sep , prefixed_filename = target_path.rpartition("/") + dir_parts, sep, prefixed_filename = target_path.rpartition("/") prefix, _, filename = prefixed_filename.partition(".") target_path = f"{dir_parts}{sep}{filename}" @@ -219,7 +222,9 @@ def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes: logger.debug("fetched target %s", target_path) return repo_target.data - def _fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + def _fetch_metadata( + self, role: str, version: Optional[int] = None + ) -> bytes: """Return signed metadata for 'role', using 'version' if it is given. If version is None, non-versioned metadata is being requested @@ -264,7 +269,7 @@ def _compute_hashes_and_length( data = self._fetch_metadata(role) digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) digest_object.update(data) - hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} + hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} return hashes, len(data) def update_timestamp(self): diff --git a/tests/test_api.py b/tests/test_api.py index d3b3eddec8..ff7b60a498 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -32,35 +32,24 @@ DelegatedRole, ) -from tuf.api.serialization import ( - DeserializationError -) +from tuf.api.serialization import DeserializationError -from tuf.api.serialization.json import ( - JSONSerializer, - CanonicalJSONSerializer -) +from tuf.api.serialization.json import JSONSerializer, CanonicalJSONSerializer from securesystemslib.interface import ( import_ed25519_publickey_from_file, - import_ed25519_privatekey_from_file + import_ed25519_privatekey_from_file, ) from securesystemslib import hash as sslib_hash -from securesystemslib.signer import ( - SSlibSigner, - Signature -) +from securesystemslib.signer import SSlibSigner, Signature -from securesystemslib.keys import ( - generate_ed25519_key -) +from securesystemslib.keys import generate_ed25519_key logger = logging.getLogger(__name__) class TestMetadata(unittest.TestCase): - @classmethod def setUpClass(cls): # Create a temporary directory to store the repository, metadata, and @@ -70,62 +59,65 @@ def setUpClass(cls): cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) test_repo_data = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'repository_data') + os.path.dirname(os.path.realpath(__file__)), "repository_data" + ) - cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + cls.repo_dir = os.path.join(cls.temporary_directory, "repository") shutil.copytree( - os.path.join(test_repo_data, 'repository'), cls.repo_dir) + os.path.join(test_repo_data, "repository"), cls.repo_dir + ) - cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + cls.keystore_dir = os.path.join(cls.temporary_directory, "keystore") shutil.copytree( - os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + os.path.join(test_repo_data, "keystore"), cls.keystore_dir + ) # Load keys into memory cls.keystore = {} - for role in ['delegation', 'snapshot', 'targets', 'timestamp']: + for role in ["delegation", "snapshot", "targets", "timestamp"]: cls.keystore[role] = import_ed25519_privatekey_from_file( - os.path.join(cls.keystore_dir, role + '_key'), - password="password" + os.path.join(cls.keystore_dir, role + "_key"), + password="password", ) - @classmethod def tearDownClass(cls): # Remove the temporary repository directory, which should contain all # the metadata, targets, and key files generated for the test cases. shutil.rmtree(cls.temporary_directory) - def test_generic_read(self): for metadata, inner_metadata_cls in [ - ('root', Root), - ('snapshot', Snapshot), - ('timestamp', Timestamp), - ('targets', Targets)]: + ("root", Root), + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets), + ]: # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") metadata_obj = Metadata.from_file(path) - with open(path, 'rb') as f: + with open(path, "rb") as f: metadata_obj2 = Metadata.from_bytes(f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... + self.assertTrue(isinstance(metadata_obj.signed, inner_metadata_cls)) self.assertTrue( - isinstance(metadata_obj.signed, inner_metadata_cls)) - self.assertTrue( - isinstance(metadata_obj2.signed, inner_metadata_cls)) + isinstance(metadata_obj2.signed, inner_metadata_cls) + ) # ... and return the same object (compared by dict representation) self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict()) + metadata_obj.to_dict(), metadata_obj2.to_dict() + ) # Assert that it chokes correctly on an unknown metadata type - bad_metadata_path = 'bad-metadata.json' - bad_metadata = {'signed': {'_type': 'bad-metadata'}} - bad_string = json.dumps(bad_metadata).encode('utf-8') - with open(bad_metadata_path, 'wb') as f: + bad_metadata_path = "bad-metadata.json" + bad_metadata = {"signed": {"_type": "bad-metadata"}} + bad_string = json.dumps(bad_metadata).encode("utf-8") + with open(bad_metadata_path, "wb") as f: f.write(bad_string) with self.assertRaises(DeserializationError): @@ -135,35 +127,33 @@ def test_generic_read(self): os.remove(bad_metadata_path) - def test_compact_json(self): - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + path = os.path.join(self.repo_dir, "metadata", "targets.json") metadata_obj = Metadata.from_file(path) self.assertTrue( - len(JSONSerializer(compact=True).serialize(metadata_obj)) < - len(JSONSerializer().serialize(metadata_obj))) - + len(JSONSerializer(compact=True).serialize(metadata_obj)) + < len(JSONSerializer().serialize(metadata_obj)) + ) def test_read_write_read_compare(self): - for metadata in ['root', 'snapshot', 'timestamp', 'targets']: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + for metadata in ["root", "snapshot", "timestamp", "targets"]: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") metadata_obj = Metadata.from_file(path) - path_2 = path + '.tmp' + path_2 = path + ".tmp" metadata_obj.to_file(path_2) metadata_obj_2 = Metadata.from_file(path_2) self.assertDictEqual( - metadata_obj.to_dict(), - metadata_obj_2.to_dict()) + metadata_obj.to_dict(), metadata_obj_2.to_dict() + ) os.remove(path_2) - def test_to_from_bytes(self): for metadata in ["root", "snapshot", "timestamp", "targets"]: - path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - with open(path, 'rb') as f: + path = os.path.join(self.repo_dir, "metadata", metadata + ".json") + with open(path, "rb") as f: metadata_bytes = f.read() metadata_obj = Metadata.from_bytes(metadata_bytes) # Comparate that from_bytes/to_bytes doesn't change the content @@ -177,13 +167,10 @@ def test_to_from_bytes(self): # Case 2: test compact by using the default serializer. obj_bytes = metadata_obj.to_bytes() metadata_obj_2 = Metadata.from_bytes(obj_bytes) - self.assertEqual( - metadata_obj_2.to_bytes(), obj_bytes - ) - + self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) def test_sign_verify(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path).signed # Locate the public keys we need from root @@ -195,7 +182,7 @@ def test_sign_verify(self): timestamp_key = root.keys[timestamp_keyid] # Load sample metadata (targets) and assert ... - path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + path = os.path.join(self.repo_dir, "metadata", "targets.json") metadata_obj = Metadata.from_file(path) # ... it has a single existing signature, @@ -210,7 +197,7 @@ def test_sign_verify(self): with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(metadata_obj, JSONSerializer()) - sslib_signer = SSlibSigner(self.keystore['snapshot']) + sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... sig = metadata_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and @@ -221,7 +208,7 @@ def test_sign_verify(self): # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) - sslib_signer = SSlibSigner(self.keystore['timestamp']) + sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... metadata_obj.sign(sslib_signer, append=False) # ... there now is only one signature, @@ -253,7 +240,7 @@ def test_sign_verify(self): timestamp_key.verify_signature(metadata_obj) # Test failure with valid but incorrect signature - sig.signature = "ff"*64 + sig.signature = "ff" * 64 with self.assertRaises(exceptions.UnsignedMetadataError): timestamp_key.verify_signature(metadata_obj) sig.signature = correct_sig @@ -261,8 +248,7 @@ def test_sign_verify(self): def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features # with real data - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") md = Metadata.from_file(snapshot_path) self.assertEqual(md.signed.version, 1) @@ -295,33 +281,35 @@ def test_metadata_base(self): # Test deserializing metadata with non-unique signatures: data = md.to_dict() - data["signatures"].append({"keyid": data["signatures"][0]["keyid"], "sig": "foo"}) + data["signatures"].append( + {"keyid": data["signatures"][0]["keyid"], "sig": "foo"} + ) with self.assertRaises(ValueError): Metadata.from_dict(data) - def test_metadata_snapshot(self): - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + hashes = { + "sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095" + } fileinfo = MetaFile(2, 123, hashes) self.assertNotEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - snapshot.signed.update('role1', fileinfo) + snapshot.signed.update("role1", fileinfo) self.assertEqual( - snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() + snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict() ) - def test_metadata_timestamp(self): timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) @@ -345,7 +333,9 @@ def test_metadata_timestamp(self): # Create a MetaFile instance representing what we expect # the updated data to be. - hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + hashes = { + "sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc" + } fileinfo = MetaFile(2, 520, hashes) self.assertNotEqual( @@ -356,102 +346,99 @@ def test_metadata_timestamp(self): timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict() ) - def test_metadata_verify_delegate(self): - root_path = os.path.join(self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") snapshot = Metadata[Snapshot].from_file(snapshot_path) - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - role1_path = os.path.join( - self.repo_dir, 'metadata', 'role1.json') + role1_path = os.path.join(self.repo_dir, "metadata", "role1.json") role1 = Metadata[Targets].from_file(role1_path) - role2_path = os.path.join( - self.repo_dir, 'metadata', 'role2.json') + role2_path = os.path.join(self.repo_dir, "metadata", "role2.json") role2 = Metadata[Targets].from_file(role2_path) # test the expected delegation tree - root.verify_delegate('root', root) - root.verify_delegate('snapshot', snapshot) - root.verify_delegate('targets', targets) - targets.verify_delegate('role1', role1) - role1.verify_delegate('role2', role2) + root.verify_delegate("root", root) + root.verify_delegate("snapshot", snapshot) + root.verify_delegate("targets", targets) + targets.verify_delegate("role1", role1) + role1.verify_delegate("role2", role2) # only root and targets can verify delegates with self.assertRaises(TypeError): - snapshot.verify_delegate('snapshot', snapshot) + snapshot.verify_delegate("snapshot", snapshot) # verify fails for roles that are not delegated by delegator with self.assertRaises(ValueError): - root.verify_delegate('role1', role1) + root.verify_delegate("role1", role1) with self.assertRaises(ValueError): - targets.verify_delegate('targets', targets) + targets.verify_delegate("targets", targets) # verify fails when delegator has no delegations with self.assertRaises(ValueError): - role2.verify_delegate('role1', role1) + role2.verify_delegate("role1", role1) # verify fails when delegate content is modified expires = snapshot.signed.expires snapshot.signed.bump_expiration() with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) snapshot.signed.expires = expires # verify fails if roles keys do not sign the metadata with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('timestamp', snapshot) + root.verify_delegate("timestamp", snapshot) # Add a key to snapshot role, make sure the new sig fails to verify ts_keyid = next(iter(root.signed.roles["timestamp"].keyids)) root.signed.add_key("snapshot", root.signed.keys[ts_keyid]) - snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff"*64) + snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64) # verify succeeds if threshold is reached even if some signatures # fail to verify - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify fails if threshold of signatures is not reached - root.signed.roles['snapshot'].threshold = 2 + root.signed.roles["snapshot"].threshold = 2 with self.assertRaises(exceptions.UnsignedMetadataError): - root.verify_delegate('snapshot', snapshot) + root.verify_delegate("snapshot", snapshot) # verify succeeds when we correct the new signature and reach the # threshold of 2 keys - snapshot.sign(SSlibSigner(self.keystore['timestamp']), append=True) - root.verify_delegate('snapshot', snapshot) - + snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True) + root.verify_delegate("snapshot", snapshot) def test_key_class(self): # Test if from_securesystemslib_key removes the private key from keyval # of a securesystemslib key dictionary. sslib_key = generate_ed25519_key() key = Key.from_securesystemslib_key(sslib_key) - self.assertFalse('private' in key.keyval.keys()) - + self.assertFalse("private" in key.keyval.keys()) def test_root_add_key_and_remove_key(self): - root_path = os.path.join( - self.repo_dir, 'metadata', 'root.json') + root_path = os.path.join(self.repo_dir, "metadata", "root.json") root = Metadata[Root].from_file(root_path) # Create a new key - root_key2 = import_ed25519_publickey_from_file( - os.path.join(self.keystore_dir, 'root_key2.pub')) - keyid = root_key2['keyid'] - key_metadata = Key(keyid, root_key2['keytype'], root_key2['scheme'], - root_key2['keyval']) + root_key2 = import_ed25519_publickey_from_file( + os.path.join(self.keystore_dir, "root_key2.pub") + ) + keyid = root_key2["keyid"] + key_metadata = Key( + keyid, + root_key2["keytype"], + root_key2["scheme"], + root_key2["keyval"], + ) # Assert that root does not contain the new key - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertNotIn(keyid, root.signed.keys) # Add new root key - root.signed.add_key('root', key_metadata) + root.signed.add_key("root", key_metadata) # Assert that key is added - self.assertIn(keyid, root.signed.roles['root'].keyids) + self.assertIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Confirm that the newly added key does not break @@ -459,31 +446,31 @@ def test_root_add_key_and_remove_key(self): root.to_dict() # Try adding the same key again and assert its ignored. - pre_add_keyid = root.signed.roles['root'].keyids.copy() - root.signed.add_key('root', key_metadata) - self.assertEqual(pre_add_keyid, root.signed.roles['root'].keyids) + pre_add_keyid = root.signed.roles["root"].keyids.copy() + root.signed.add_key("root", key_metadata) + self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids) # Add the same key to targets role as well - root.signed.add_key('targets', key_metadata) + root.signed.add_key("targets", key_metadata) # Add the same key to a nonexistent role. with self.assertRaises(ValueError): root.signed.add_key("nosuchrole", key_metadata) # Remove the key from root role (targets role still uses it) - root.signed.remove_key('root', keyid) - self.assertNotIn(keyid, root.signed.roles['root'].keyids) + root.signed.remove_key("root", keyid) + self.assertNotIn(keyid, root.signed.roles["root"].keyids) self.assertIn(keyid, root.signed.keys) # Remove the key from targets as well - root.signed.remove_key('targets', keyid) - self.assertNotIn(keyid, root.signed.roles['targets'].keyids) + root.signed.remove_key("targets", keyid) + self.assertNotIn(keyid, root.signed.roles["targets"].keyids) self.assertNotIn(keyid, root.signed.keys) with self.assertRaises(ValueError): - root.signed.remove_key('root', 'nosuchkey') + root.signed.remove_key("root", "nosuchkey") with self.assertRaises(ValueError): - root.signed.remove_key('nosuchrole', keyid) + root.signed.remove_key("nosuchrole", keyid) def test_is_target_in_pathpattern(self): supported_use_cases = [ @@ -505,28 +492,26 @@ def test_is_target_in_pathpattern(self): invalid_use_cases = [ ("targets/foo.tgz", "*.tgz"), - ("/foo.tgz", "*.tgz",), + ("/foo.tgz", "*.tgz"), ("targets/foo.tgz", "*"), ("foo-version-alpha.tgz", "foo-version-?.tgz"), ("foo//bar", "*/bar"), - ("foo/bar", "f?/bar") + ("foo/bar", "f?/bar"), ] for targetpath, pathpattern in invalid_use_cases: self.assertFalse( DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) - def test_metadata_targets(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) # Create a fileinfo dict representing what we expect the updated data to be - filename = 'file2.txt' + filename = "file2.txt" hashes = { "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", - "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0" + "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0", } fileinfo = TargetFile(length=28, hashes=hashes, path=filename) @@ -543,18 +528,19 @@ def test_metadata_targets(self): ) def test_targets_key_api(self): - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets: Targets = Metadata[Targets].from_file(targets_path).signed # Add a new delegated role "role2" in targets - delegated_role = DelegatedRole.from_dict({ + delegated_role = DelegatedRole.from_dict( + { "keyids": [], "name": "role2", "paths": ["fn3", "fn4"], "terminating": False, - "threshold": 1 - }) + "threshold": 1, + } + ) targets.delegations.roles["role2"] = delegated_role key_dict = { @@ -562,7 +548,7 @@ def test_targets_key_api(self): "keyval": { "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" }, - "scheme": "ed25519" + "scheme": "ed25519", } key = Key.from_dict("id2", key_dict) @@ -618,19 +604,18 @@ def test_targets_key_api(self): targets.remove_key("role1", key.keyid) self.assertTrue(targets.delegations is None) - - def test_length_and_hash_validation(self): + def test_length_and_hash_validation(self): # Test metadata files' hash and length verification. # Use timestamp to get a MetaFile object and snapshot # for untrusted metadata file to verify. timestamp_path = os.path.join( - self.repo_dir, 'metadata', 'timestamp.json') + self.repo_dir, "metadata", "timestamp.json" + ) timestamp = Metadata[Timestamp].from_file(timestamp_path) snapshot_metafile = timestamp.signed.snapshot_meta - snapshot_path = os.path.join( - self.repo_dir, 'metadata', 'snapshot.json') + snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") with open(snapshot_path, "rb") as file: # test with data as a file object @@ -643,36 +628,49 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = snapshot_metafile.length snapshot_metafile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) snapshot_metafile.length = expected_length - snapshot_metafile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) - snapshot_metafile.hashes = {'unsupported-alg': "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + "unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # Test wrong algorithm format (sslib.FormatError) - snapshot_metafile.hashes = { 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"} - self.assertRaises(exceptions.LengthOrHashMismatchError, - snapshot_metafile.verify_length_and_hashes, data) + snapshot_metafile.hashes = { + 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + } + self.assertRaises( + exceptions.LengthOrHashMismatchError, + snapshot_metafile.verify_length_and_hashes, + data, + ) # test optional length and hashes snapshot_metafile.length = None snapshot_metafile.hashes = None snapshot_metafile.verify_length_and_hashes(data) - # Test target files' hash and length verification - targets_path = os.path.join( - self.repo_dir, 'metadata', 'targets.json') + targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - file1_targetfile = targets.signed.targets['file1.txt'] - filepath = os.path.join( - self.repo_dir, 'targets', 'file1.txt') + file1_targetfile = targets.signed.targets["file1.txt"] + filepath = os.path.join(self.repo_dir, "targets", "file1.txt") with open(filepath, "rb") as file1: file1_targetfile.verify_length_and_hashes(file1) @@ -680,50 +678,58 @@ def test_length_and_hash_validation(self): # test exceptions expected_length = file1_targetfile.length file1_targetfile.length = 2345 - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) file1_targetfile.length = expected_length - file1_targetfile.hashes = {'sha256': 'incorrecthash'} - self.assertRaises(exceptions.LengthOrHashMismatchError, - file1_targetfile.verify_length_and_hashes, file1) + file1_targetfile.hashes = {"sha256": "incorrecthash"} + self.assertRaises( + exceptions.LengthOrHashMismatchError, + file1_targetfile.verify_length_and_hashes, + file1, + ) def test_targetfile_from_file(self): # Test with an existing file and valid hash algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") targetfile_from_file = TargetFile.from_file( - file_path, file_path, ['sha256'] + file_path, file_path, ["sha256"] ) with open(file_path, "rb") as file: targetfile_from_file.verify_length_and_hashes(file) # Test with a non-existing file - file_path = os.path.join(self.repo_dir, 'targets', 'file123.txt') + file_path = os.path.join(self.repo_dir, "targets", "file123.txt") self.assertRaises( - FileNotFoundError, - TargetFile.from_file, - file_path, + FileNotFoundError, + TargetFile.from_file, + file_path, file_path, - [sslib_hash.DEFAULT_HASH_ALGORITHM] + [sslib_hash.DEFAULT_HASH_ALGORITHM], ) # Test with an unsupported algorithm - file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') + file_path = os.path.join(self.repo_dir, "targets", "file1.txt") self.assertRaises( exceptions.UnsupportedAlgorithmError, - TargetFile.from_file, - file_path, + TargetFile.from_file, + file_path, file_path, - ['123'] + ["123"], ) def test_targetfile_from_data(self): data = b"Inline test content" - target_file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt') - + target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt") + # Test with a valid hash algorithm - targetfile_from_data = TargetFile.from_data(target_file_path, data, ['sha256']) + targetfile_from_data = TargetFile.from_data( + target_file_path, data, ["sha256"] + ) targetfile_from_data.verify_length_and_hashes(data) # Test with no algorithms specified @@ -753,7 +759,8 @@ def test_is_delegated_role(self): self.assertFalse(role.is_delegated_path("a/non-matching path")) self.assertTrue(role.is_delegated_path("a/path")) + # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index bf94f252d8..c575f40f3a 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -25,107 +25,109 @@ class TestFetcher(unittest_toolbox.Modified_TestCase): - def setUp(self): - """ - Create a temporary file and launch a simple server in the - current working directory. - """ - - unittest_toolbox.Modified_TestCase.setUp(self) - - # Making a temporary file. - current_dir = os.getcwd() - target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, 'r') - self.file_contents = self.target_fileobj.read() - self.file_length = len(self.file_contents) - - # Launch a SimpleHTTPServer (serves files in the current dir). - self.server_process_handler = utils.TestServerProcess(log=logger) - - rel_target_filepath = os.path.basename(target_filepath) - self.url = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ - + str(self.server_process_handler.port) + '/' + rel_target_filepath - - # Create a temporary file where the target file chunks are written - # during fetching - self.temp_file = tempfile.TemporaryFile() - self.fetcher = tuf.requests_fetcher.RequestsFetcher() - - - # Stop server process and perform clean up. - def tearDown(self): - # Cleans the resources and flush the logged lines (if any). - self.server_process_handler.clean() - - self.target_fileobj.close() - self.temp_file.close() - - # Remove temporary directory - unittest_toolbox.Modified_TestCase.tearDown(self) - - - # Test: Normal case. - def test_fetch(self): - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - - # Test if fetcher downloads file up to a required length - def test_fetch_restricted_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length-4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length-4) - - - # Test that fetcher does not download more than actual file length - def test_fetch_upper_length(self): - for chunk in self.fetcher.fetch(self.url, self.file_length+4): - self.temp_file.write(chunk) - - self.temp_file.seek(0, io.SEEK_END) - self.assertEqual(self.temp_file.tell(), self.file_length) - - - # Test incorrect URL parsing - def test_url_parsing(self): - with self.assertRaises(tuf.exceptions.URLParsingError): - self.fetcher.fetch(self.random_string(), self.file_length) - - - # Test: Normal case with url data downloaded in more than one chunk - def test_fetch_in_chunks(self): - # Set smaller chunk size to ensure that the file will be downloaded - # in more than one chunk - default_chunk_size = tuf.settings.CHUNK_SIZE - tuf.settings.CHUNK_SIZE = 4 - - # expected_chunks_count: 3 - expected_chunks_count = math.ceil(self.file_length/tuf.settings.CHUNK_SIZE) - self.assertEqual(expected_chunks_count, 3) - - chunks_count = 0 - for chunk in self.fetcher.fetch(self.url, self.file_length): - self.temp_file.write(chunk) - chunks_count+=1 - - self.temp_file.seek(0) - temp_file_data = self.temp_file.read().decode('utf-8') - self.assertEqual(self.file_contents, temp_file_data) - # Check that we calculate chunks as expected - self.assertEqual(chunks_count, expected_chunks_count) - - # Restore default settings - tuf.settings.CHUNK_SIZE = default_chunk_size - + def setUp(self): + """ + Create a temporary file and launch a simple server in the + current working directory. + """ + + unittest_toolbox.Modified_TestCase.setUp(self) + + # Making a temporary file. + current_dir = os.getcwd() + target_filepath = self.make_temp_data_file(directory=current_dir) + self.target_fileobj = open(target_filepath, "r") + self.file_contents = self.target_fileobj.read() + self.file_length = len(self.file_contents) + + # Launch a SimpleHTTPServer (serves files in the current dir). + self.server_process_handler = utils.TestServerProcess(log=logger) + + rel_target_filepath = os.path.basename(target_filepath) + self.url = ( + "http://" + + utils.TEST_HOST_ADDRESS + + ":" + + str(self.server_process_handler.port) + + "/" + + rel_target_filepath + ) + + # Create a temporary file where the target file chunks are written + # during fetching + self.temp_file = tempfile.TemporaryFile() + self.fetcher = tuf.requests_fetcher.RequestsFetcher() + + # Stop server process and perform clean up. + def tearDown(self): + # Cleans the resources and flush the logged lines (if any). + self.server_process_handler.clean() + + self.target_fileobj.close() + self.temp_file.close() + + # Remove temporary directory + unittest_toolbox.Modified_TestCase.tearDown(self) + + # Test: Normal case. + def test_fetch(self): + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + + # Test if fetcher downloads file up to a required length + def test_fetch_restricted_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length - 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length - 4) + + # Test that fetcher does not download more than actual file length + def test_fetch_upper_length(self): + for chunk in self.fetcher.fetch(self.url, self.file_length + 4): + self.temp_file.write(chunk) + + self.temp_file.seek(0, io.SEEK_END) + self.assertEqual(self.temp_file.tell(), self.file_length) + + # Test incorrect URL parsing + def test_url_parsing(self): + with self.assertRaises(tuf.exceptions.URLParsingError): + self.fetcher.fetch(self.random_string(), self.file_length) + + # Test: Normal case with url data downloaded in more than one chunk + def test_fetch_in_chunks(self): + # Set smaller chunk size to ensure that the file will be downloaded + # in more than one chunk + default_chunk_size = tuf.settings.CHUNK_SIZE + tuf.settings.CHUNK_SIZE = 4 + + # expected_chunks_count: 3 + expected_chunks_count = math.ceil( + self.file_length / tuf.settings.CHUNK_SIZE + ) + self.assertEqual(expected_chunks_count, 3) + + chunks_count = 0 + for chunk in self.fetcher.fetch(self.url, self.file_length): + self.temp_file.write(chunk) + chunks_count += 1 + + self.temp_file.seek(0) + temp_file_data = self.temp_file.read().decode("utf-8") + self.assertEqual(self.file_contents, temp_file_data) + # Check that we calculate chunks as expected + self.assertEqual(chunks_count, expected_chunks_count) + + # Restore default settings + tuf.settings.CHUNK_SIZE = default_chunk_size # Run unit test. -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index d384ee3132..652fa4cbb8 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -25,7 +25,6 @@ class TestFetcher(unittest_toolbox.Modified_TestCase): - @classmethod def setUpClass(cls): # Launch a SimpleHTTPServer (serves files in the current dir). @@ -111,10 +110,14 @@ def test_http_error(self): self.assertEqual(cm.exception.status_code, 404) # Response read timeout error - @patch.object(requests.Session, 'get') + @patch.object(requests.Session, "get") def test_response_read_timeout(self, mock_session_get): mock_response = Mock() - attr = {'raw.read.side_effect': urllib3.exceptions.ReadTimeoutError(None, None, "Read timed out.")} + attr = { + "raw.read.side_effect": urllib3.exceptions.ReadTimeoutError( + None, None, "Read timed out." + ) + } mock_response.configure_mock(**attr) mock_session_get.return_value = mock_response @@ -123,7 +126,9 @@ def test_response_read_timeout(self, mock_session_get): mock_response.raw.read.assert_called_once() # Read/connect session timeout error - @patch.object(requests.Session, 'get', side_effect=urllib3.exceptions.TimeoutError) + @patch.object( + requests.Session, "get", side_effect=urllib3.exceptions.TimeoutError + ) def test_session_get_timeout(self, mock_session_get): with self.assertRaises(exceptions.SlowRetrievalError): self.fetcher.fetch(self.url) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 1ae70e1fcc..9dccf8f7f6 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -41,28 +41,17 @@ class TestSerialization(unittest.TestCase): "no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no expires": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "meta": {}}', - "empty str _type": - '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "empty str spec_version": - '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "_type wrong type": - '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version wrong type": - '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "invalid spec_version str": - '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "two digit spec_version": - '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "no digit spec_version": - '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "different major spec_version": - '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "version below 0": - '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', - "wrong datetime string": - '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', + "empty str _type": '{"_type": "", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "empty str spec_version": '{"_type": "signed", "spec_version": "", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "_type wrong type": '{"_type": "foo", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version wrong type": '{"_type": "signed", "spec_version": "1.0.0", "version": "a", "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "invalid spec_version str": '{"_type": "signed", "spec_version": "abc", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "two digit spec_version": '{"_type": "signed", "spec_version": "1.2.a", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "no digit spec_version": '{"_type": "signed", "spec_version": "a.b.c", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "different major spec_version": '{"_type": "signed", "spec_version": "0.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version 0": '{"_type": "signed", "spec_version": "1.0.0", "version": 0, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "version below 0": '{"_type": "signed", "spec_version": "1.0.0", "version": -1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', + "wrong datetime string": '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', } @utils.run_sub_tests_with_dataset(invalid_signed) @@ -71,7 +60,6 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(copy.deepcopy(case_dict)) - valid_keys: utils.DataSet = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', @@ -87,7 +75,6 @@ def test_valid_key_serialization(self, test_case_data: str): key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: utils.DataSet = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', @@ -120,7 +107,6 @@ def test_invalid_role_serialization(self, test_case_data: Dict[str, str]): with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(copy.deepcopy(case_dict)) - valid_roles: utils.DataSet = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', @@ -134,7 +120,6 @@ def test_role_serialization(self, test_case_data: str): role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: utils.DataSet = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ @@ -183,7 +168,6 @@ def test_root_serialization(self, test_case_data: str): root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_roots: utils.DataSet = { "invalid role name": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ @@ -236,12 +220,13 @@ def test_invalid_root_serialization(self, test_case_data: Dict[str, str]): } @utils.run_sub_tests_with_dataset(invalid_metafiles) - def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_metafile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(copy.deepcopy(case_dict)) - valid_metafiles: utils.DataSet = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', @@ -261,12 +246,13 @@ def test_metafile_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_timestamps) - def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_timestamp_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(copy.deepcopy(case_dict)) - valid_timestamps: utils.DataSet = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', @@ -280,7 +266,6 @@ def test_timestamp_serialization(self, test_case_data: str): timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: utils.DataSet = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ @@ -300,23 +285,18 @@ def test_snapshot_serialization(self, test_case_data: str): snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. - "no hash prefix attribute": - '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ + "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', - "no path attribute": - '{"keyids": ["keyid"], "name": "a", "terminating": false, \ + "no path attribute": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": ["h1", "h2"], "threshold": 99}', "empty paths": '{"keyids": ["keyid"], "name": "a", "paths": [], \ "terminating": false, "threshold": 1}', "empty path_hash_prefixes": '{"keyids": ["keyid"], "name": "a", "terminating": false, \ "path_hash_prefixes": [], "threshold": 99}', - "unrecognized field": - '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', - "many keyids": - '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ + "unrecognized field": '{"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3, "foo": "bar"}', + "many keyids": '{"keyids": ["keyid1", "keyid2"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', } @@ -326,13 +306,10 @@ def test_delegated_role_serialization(self, test_case_data: str): deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. - "missing hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', - "both hash prefixes and paths": - '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ + "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', + "both hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', } @@ -342,9 +319,8 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str): with self.assertRaises(ValueError): DelegatedRole.from_dict(copy.copy(case_dict)) - invalid_delegations: utils.DataSet = { - "empty delegations": '{}', + "empty delegations": "{}", "bad keys": '{"keys": "foo", \ "roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}', "bad roles": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ @@ -363,18 +339,15 @@ def test_invalid_delegation_serialization(self, test_case_data: str): with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(copy.deepcopy(case_dict)) - valid_delegations: utils.DataSet = { - "all": - '{"keys": { \ + "all": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ {"keyids": ["keyid2"], "name": "b", "terminating": true, "paths": ["fn2"], "threshold": 4} ] \ }', - "unrecognized field": - '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ + "unrecognized field": '{"keys": {"keyid" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}}, \ "roles": [ {"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": true, "threshold": 3} ], \ "foo": "bar"}', "empty keys and roles": '{"keys": {}, \ @@ -388,7 +361,6 @@ def test_delegation_serialization(self, test_case_data: str): delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: utils.DataSet = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}' @@ -397,12 +369,13 @@ def test_delegation_serialization(self, test_case_data: str): } @utils.run_sub_tests_with_dataset(invalid_targetfiles) - def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): + def test_invalid_targetfile_serialization( + self, test_case_data: Dict[str, str] + ): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") - valid_targetfiles: utils.DataSet = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', @@ -417,7 +390,6 @@ def test_targetfile_serialization(self, test_case_data: str): target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: utils.DataSet = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ @@ -452,6 +424,6 @@ def test_targets_serialization(self, test_case_data): # Run unit test. -if __name__ == '__main__': +if __name__ == "__main__": utils.configure_test_logging(sys.argv) unittest.main() diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 925b16a935..12e429b25a 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -13,22 +13,22 @@ Timestamp, Snapshot, MetaFile, - Targets + Targets, ) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import( +from securesystemslib.interface import ( import_ed25519_privatekey_from_file, - import_rsa_privatekey_from_file + import_rsa_privatekey_from_file, ) from tests import utils logger = logging.getLogger(__name__) -class TestTrustedMetadataSet(unittest.TestCase): +class TestTrustedMetadataSet(unittest.TestCase): def modify_metadata( self, rolename: str, modification_func: Callable[["Signed"], None] ) -> bytes: @@ -48,24 +48,29 @@ def modify_metadata( @classmethod def setUpClass(cls): cls.repo_dir = os.path.join( - os.getcwd(), 'repository_data', 'repository', 'metadata' + os.getcwd(), "repository_data", "repository", "metadata" ) cls.metadata = {} - for md in ["root", "timestamp", "snapshot", "targets", "role1", "role2"]: + for md in [ + "root", + "timestamp", + "snapshot", + "targets", + "role1", + "role2", + ]: with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: cls.metadata[md] = f.read() - keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore') + keystore_dir = os.path.join(os.getcwd(), "repository_data", "keystore") cls.keystore = {} root_key_dict = import_rsa_privatekey_from_file( - os.path.join(keystore_dir, "root" + '_key'), - password="password" + os.path.join(keystore_dir, "root" + "_key"), password="password" ) cls.keystore["root"] = SSlibSigner(root_key_dict) for role in ["delegation", "snapshot", "targets", "timestamp"]: key_dict = import_ed25519_privatekey_from_file( - os.path.join(keystore_dir, role + '_key'), - password="password" + os.path.join(keystore_dir, role + "_key"), password="password" ) cls.keystore[role] = SSlibSigner(key_dict) @@ -80,7 +85,6 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - def _update_all_besides_targets( self, timestamp_bytes: Optional[bytes] = None, @@ -103,7 +107,6 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def test_update(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) self.trusted_set.update_snapshot(self.metadata["snapshot"]) @@ -224,7 +227,7 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self): def test_root_expired_final_root(self): def root_expired_modifier(root: Root) -> None: root.expires = datetime(1970, 1, 1) - + # intermediate root can be expired root = self.modify_metadata("root", root_expired_modifier) tmp_trusted_set = TrustedMetadataSet(root) @@ -232,12 +235,11 @@ def root_expired_modifier(root: Root) -> None: with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata["timestamp"]) - def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 - + timestamp = self.modify_metadata("timestamp", version_modifier) self.trusted_set.update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): @@ -261,7 +263,9 @@ def timestamp_expired_modifier(timestamp: Timestamp) -> None: timestamp.expires = datetime(1970, 1, 1) # expired intermediate timestamp is loaded but raises - timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_expired_modifier + ) with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_timestamp(timestamp) @@ -291,7 +295,9 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self): def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.snapshot_meta.version = 2 - timestamp = self.modify_metadata("timestamp", timestamp_version_modifier) + timestamp = self.modify_metadata( + "timestamp", timestamp_version_modifier + ) self.trusted_set.update_timestamp(timestamp) # if intermediate snapshot version is incorrect, load it but also raise @@ -302,9 +308,9 @@ def timestamp_version_modifier(timestamp: Timestamp) -> None: with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) - def test_update_snapshot_file_removed_from_meta(self): self._update_all_besides_targets(self.metadata["timestamp"]) + def remove_file_from_meta(snapshot: Snapshot) -> None: del snapshot.meta["targets.json"] @@ -327,6 +333,7 @@ def version_meta_modifier(snapshot: Snapshot) -> None: def test_update_snapshot_expired_new_snapshot(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) + def snapshot_expired_modifier(snapshot: Snapshot) -> None: snapshot.expires = datetime(1970, 1, 1) @@ -406,6 +413,6 @@ def target_expired_modifier(target: Targets) -> None: # TODO test updating over initial metadata (new keys, newer timestamp, etc) -if __name__ == '__main__': - utils.configure_test_logging(sys.argv) - unittest.main() +if __name__ == "__main__": + utils.configure_test_logging(sys.argv) + unittest.main() diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index 0dad301ecd..eb51b945da 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -33,6 +33,7 @@ class RootVersion: class TestUpdaterKeyRotations(unittest.TestCase): """Test ngclient root rotation handling""" + # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 69fbeef8cd..d67c9403c3 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -118,7 +118,7 @@ def setUp(self): repository_dir=self.client_directory, metadata_base_url=self.metadata_url, target_dir=self.dl_dir, - target_base_url=self.targets_url + target_base_url=self.targets_url, ) def tearDown(self): @@ -191,15 +191,16 @@ def consistent_snapshot_modifier(root): consistent_snapshot_modifier, bump_version=True ) updater = ngclient.Updater( - self.client_directory, self.metadata_url, self.dl_dir, self.targets_url + self.client_directory, + self.metadata_url, + self.dl_dir, + self.targets_url, ) # All metadata is in local directory already updater.refresh() # Make sure that consistent snapshot is enabled - self.assertTrue( - updater._trusted_set.root.signed.consistent_snapshot - ) + self.assertTrue(updater._trusted_set.root.signed.consistent_snapshot) # Get targetinfos, assert cache does not contain the files info1 = updater.get_targetinfo("file1.txt") @@ -252,9 +253,7 @@ def test_refresh_and_download(self): self.updater.download_target(info1) path = self.updater.find_cached_target(info1) self.assertEqual(path, os.path.join(self.dl_dir, info1.path)) - self.assertIsNone( - self.updater.find_cached_target(info3) - ) + self.assertIsNone(self.updater.find_cached_target(info3)) self.updater.download_target(info3) path = self.updater.find_cached_target(info1) @@ -281,7 +280,9 @@ def test_refresh_with_only_local_root(self): def test_both_target_urls_not_set(self): # target_base_url = None and Updater._target_base_url = None - updater = ngclient.Updater(self.client_directory, self.metadata_url, self.dl_dir) + updater = ngclient.Updater( + self.client_directory, self.metadata_url, self.dl_dir + ) info = TargetFile(1, {"sha256": ""}, "targetpath") with self.assertRaises(ValueError): updater.download_target(info) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index 65727aa8c4..dd655e11d3 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -23,7 +23,7 @@ class TestUpdater(unittest.TestCase): # set dump_dir to trigger repository state dumps - dump_dir:Optional[str] = None + dump_dir: Optional[str] = None def setUp(self): self.temp_dir = tempfile.TemporaryDirectory() @@ -35,12 +35,14 @@ def setUp(self): # Setup the repository, bootstrap client root.json self.sim = RepositorySimulator() with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f: - root = self.sim.download_bytes("https://example.com/metadata/1.root.json", 100000) + root = self.sim.download_bytes( + "https://example.com/metadata/1.root.json", 100000 + ) f.write(root) if self.dump_dir is not None: # create test specific dump directory - name = self.id().split('.')[-1] + name = self.id().split(".")[-1] self.sim.dump_dir = os.path.join(self.dump_dir, name) os.mkdir(self.sim.dump_dir) @@ -56,7 +58,7 @@ def _run_refresh(self) -> Updater: "https://example.com/metadata/", self.targets_dir, "https://example.com/targets/", - self.sim + self.sim, ) updater.refresh() return updater @@ -85,7 +87,11 @@ def test_refresh(self): targets: utils.DataSet = { "standard case": ("targetpath", b"content", "targetpath"), "non-asci case": ("åäö", b"more content", "%C3%A5%C3%A4%C3%B6"), - "subdirectory case": ("a/b/c/targetpath", b"dir target content", "a%2Fb%2Fc%2Ftargetpath"), + "subdirectory case": ( + "a/b/c/targetpath", + b"dir target content", + "a%2Fb%2Fc%2Ftargetpath", + ), } @utils.run_sub_tests_with_dataset(targets) @@ -132,14 +138,16 @@ def test_fishy_rolenames(self): "": ".json", ".": "..json", "/": "%2F.json", - "ö": "%C3%B6.json" + "ö": "%C3%B6.json", } # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) for role in roles_to_filenames.keys(): - self.sim.add_delegation("targets", role, targets, False, ["*"], None) + self.sim.add_delegation( + "targets", role, targets, False, ["*"], None + ) self.sim.update_snapshot() updater = self._run_refresh() From fc772654c673f8c5884127389db648de709c3b1e Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 12 Oct 2021 16:46:52 +0300 Subject: [PATCH 03/12] Rename & simplify a couple of tests in test_api.py Signed-off-by: Martin Vrachev --- tests/test_api.py | 77 +++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 43 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index ff7b60a498..73e270ac3c 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -97,21 +97,17 @@ def test_generic_read(self): # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string path = os.path.join(self.repo_dir, "metadata", metadata + ".json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) with open(path, "rb") as f: - metadata_obj2 = Metadata.from_bytes(f.read()) + md_obj2 = Metadata.from_bytes(f.read()) # Assert that both methods instantiate the right inner class for # each metadata type and ... - self.assertTrue(isinstance(metadata_obj.signed, inner_metadata_cls)) - self.assertTrue( - isinstance(metadata_obj2.signed, inner_metadata_cls) - ) + self.assertTrue(isinstance(md_obj.signed, inner_metadata_cls)) + self.assertTrue(isinstance(md_obj2.signed, inner_metadata_cls)) # ... and return the same object (compared by dict representation) - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj2.to_dict() - ) + self.assertDictEqual(md_obj.to_dict(), md_obj2.to_dict()) # Assert that it chokes correctly on an unknown metadata type bad_metadata_path = "bad-metadata.json" @@ -129,24 +125,21 @@ def test_generic_read(self): def test_compact_json(self): path = os.path.join(self.repo_dir, "metadata", "targets.json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) self.assertTrue( - len(JSONSerializer(compact=True).serialize(metadata_obj)) - < len(JSONSerializer().serialize(metadata_obj)) + len(JSONSerializer(compact=True).serialize(md_obj)) + < len(JSONSerializer().serialize(md_obj)) ) def test_read_write_read_compare(self): for metadata in ["root", "snapshot", "timestamp", "targets"]: path = os.path.join(self.repo_dir, "metadata", metadata + ".json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) path_2 = path + ".tmp" - metadata_obj.to_file(path_2) - metadata_obj_2 = Metadata.from_file(path_2) - - self.assertDictEqual( - metadata_obj.to_dict(), metadata_obj_2.to_dict() - ) + md_obj.to_file(path_2) + md_obj_2 = Metadata.from_file(path_2) + self.assertDictEqual(md_obj.to_dict(), md_obj_2.to_dict()) os.remove(path_2) @@ -155,17 +148,15 @@ def test_to_from_bytes(self): path = os.path.join(self.repo_dir, "metadata", metadata + ".json") with open(path, "rb") as f: metadata_bytes = f.read() - metadata_obj = Metadata.from_bytes(metadata_bytes) + md_obj = Metadata.from_bytes(metadata_bytes) # Comparate that from_bytes/to_bytes doesn't change the content # for two cases for the serializer: noncompact and compact. # Case 1: test noncompact by overriding the default serializer. - self.assertEqual( - metadata_obj.to_bytes(JSONSerializer()), metadata_bytes - ) + self.assertEqual(md_obj.to_bytes(JSONSerializer()), metadata_bytes) # Case 2: test compact by using the default serializer. - obj_bytes = metadata_obj.to_bytes() + obj_bytes = md_obj.to_bytes() metadata_obj_2 = Metadata.from_bytes(obj_bytes) self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) @@ -183,66 +174,66 @@ def test_sign_verify(self): # Load sample metadata (targets) and assert ... path = os.path.join(self.repo_dir, "metadata", "targets.json") - metadata_obj = Metadata.from_file(path) + md_obj = Metadata.from_file(path) # ... it has a single existing signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... which is valid for the correct key. - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - snapshot_key.verify_signature(metadata_obj) + snapshot_key.verify_signature(md_obj) # Test verifying with explicitly set serializer - targets_key.verify_signature(metadata_obj, CanonicalJSONSerializer()) + targets_key.verify_signature(md_obj, CanonicalJSONSerializer()) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj, JSONSerializer()) + targets_key.verify_signature(md_obj, JSONSerializer()) sslib_signer = SSlibSigner(self.keystore["snapshot"]) # Append a new signature with the unrelated key and assert that ... - sig = metadata_obj.sign(sslib_signer, append=True) + sig = md_obj.sign(sslib_signer, append=True) # ... there are now two signatures, and - self.assertEqual(len(metadata_obj.signatures), 2) + self.assertEqual(len(md_obj.signatures), 2) # ... both are valid for the corresponding keys. - targets_key.verify_signature(metadata_obj) - snapshot_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) + snapshot_key.verify_signature(md_obj) # ... the returned (appended) signature is for snapshot key self.assertEqual(sig.keyid, snapshot_keyid) sslib_signer = SSlibSigner(self.keystore["timestamp"]) # Create and assign (don't append) a new signature and assert that ... - metadata_obj.sign(sslib_signer, append=False) + md_obj.sign(sslib_signer, append=False) # ... there now is only one signature, - self.assertEqual(len(metadata_obj.signatures), 1) + self.assertEqual(len(md_obj.signatures), 1) # ... valid for that key. - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) with self.assertRaises(exceptions.UnsignedMetadataError): - targets_key.verify_signature(metadata_obj) + targets_key.verify_signature(md_obj) # Test failure on unknown scheme (securesystemslib UnsupportedAlgorithmError) scheme = timestamp_key.scheme timestamp_key.scheme = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.scheme = scheme # Test failure on broken public key data (securesystemslib CryptoError) public = timestamp_key.keyval["public"] timestamp_key.keyval["public"] = "ffff" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) timestamp_key.keyval["public"] = public # Test failure with invalid signature (securesystemslib FormatError) - sig = metadata_obj.signatures[timestamp_keyid] + sig = md_obj.signatures[timestamp_keyid] correct_sig = sig.signature sig.signature = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) # Test failure with valid but incorrect signature sig.signature = "ff" * 64 with self.assertRaises(exceptions.UnsignedMetadataError): - timestamp_key.verify_signature(metadata_obj) + timestamp_key.verify_signature(md_obj) sig.signature = correct_sig def test_metadata_base(self): From c0ab00a31ed1bf483e8f4f42623395feb01446cc Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 19 Oct 2021 15:22:19 +0300 Subject: [PATCH 04/12] Linting tests with isort and exclude old tests Currently, we are using 4 tools: black, pylint, isort and mypy. We want to run all 4 of them on our tests for the new codebase to make it more readable and maintainable, but we don't want to run it on the old test files as they are following the old style guidelines. I am using the same approach as the one used for black - create an exclusion list from old test files we want to ignore. I didn't used moved or renamed the old test files as that will remove the Git history. Signed-off-by: Martin Vrachev --- pyproject.toml | 36 ++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1dd4ef13f4..5ca0e890b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,3 +37,39 @@ tests/test_updater.py| tests/test_updater_root_rotation_integration.py| tests/repository_data/generate_project_data.py| tests/repository_data/generate.py)""" + +[tool.isort] +skip = [ + "tests/fast_server_exit.py", + "tests/simple_https_server.py", + "tests/simple_server.py", + "tests/slow_retrieval_server.py", + "tests/utils.py", + "tests/test_utils.py", + "tests/test_repository_lib.py", + "tests/test_arbitrary_package_attack.py", + "tests/test_developer_tool.py", + "tests/test_download.py", + "tests/test_endless_data_attack.py", + "tests/test_extraneous_dependencies_attack.py", + "tests/test_formats.py", + "tests/test_indefinite_freeze_attack.py", + "tests/test_key_revocation_integration.py", + "tests/test_keydb.py", + "tests/test_log.py", + "tests/test_mirrors.py", + "tests/test_mix_and_match_attack.py", + "tests/test_multiple_repositories_integration.py", + "tests/test_repository_tool.py", + "tests/test_replay_attack.py", + "tests/test_roledb.py", + "tests/test_root_versioning_integration.py", + "tests/test_sig.py", + "tests/test_slow_retrieval_attack.py", + "tests/test_tutorial.py", + "tests/test_unittest_toolbox.py", + "tests/test_updater.py", + "tests/test_updater_root_rotation_integration.py", + "tests/repository_data/generate_project_data.py", + "tests/repository_data/generate.py", +] diff --git a/tox.ini b/tox.ini index 181cd3a743..c74251e0fb 100644 --- a/tox.ini +++ b/tox.ini @@ -42,7 +42,7 @@ commands = # Use different configs for new (tuf/api/*) and legacy code # TODO: configure black and isort args in pyproject.toml (see #1161) black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ - isort --check --diff --line-length 80 --profile black -p tuf tuf/api tuf/ngclient + isort --check --diff --line-length 80 --profile black --sp pyproject.toml -p tuf tuf/api tuf/ngclient tests/ pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does From 0281a874df950e488d1d3bcae49eab778d49a27e Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 12 Oct 2021 16:54:10 +0300 Subject: [PATCH 05/12] Apply isort on the tests of the new code All of the changes included are a result of applying isort on our tests on the new code. Signed-off-by: Martin Vrachev --- tests/repository_simulator.py | 19 ++++++++------- tests/test_api.py | 36 ++++++++++++---------------- tests/test_fetcher.py | 7 +++--- tests/test_fetcher_ng.py | 9 +++---- tests/test_metadata_serialization.py | 20 +++++++--------- tests/test_trusted_metadata_set.py | 23 +++++++++--------- tests/test_updater_key_rotations.py | 11 ++++----- tests/test_updater_ng.py | 15 ++++++------ tests/test_updater_with_simulator.py | 10 ++++---- 9 files changed, 70 insertions(+), 80 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index a71d301fb5..f5f4209bef 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -44,22 +44,22 @@ updater.refresh() """ -from collections import OrderedDict -from dataclasses import dataclass -from datetime import datetime, timedelta import logging import os import tempfile +from collections import OrderedDict +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, Iterator, List, Optional, Tuple +from urllib import parse + import securesystemslib.hash as sslib_hash from securesystemslib.keys import generate_ed25519_key from securesystemslib.signer import SSlibSigner -from typing import Dict, Iterator, List, Optional, Tuple -from urllib import parse -from tuf.api.metadata import TOP_LEVEL_ROLE_NAMES -from tuf.api.serialization.json import JSONSerializer -from tuf.exceptions import FetcherHTTPError from tuf.api.metadata import ( + SPECIFICATION_VERSION, + TOP_LEVEL_ROLE_NAMES, DelegatedRole, Delegations, Key, @@ -67,12 +67,13 @@ MetaFile, Role, Root, - SPECIFICATION_VERSION, Snapshot, TargetFile, Targets, Timestamp, ) +from tuf.api.serialization.json import JSONSerializer +from tuf.exceptions import FetcherHTTPError, RepositoryError from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) diff --git a/tests/test_api.py b/tests/test_api.py index 73e270ac3c..654b4b69b3 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -7,44 +7,38 @@ """ import json -import sys import logging import os import shutil +import sys import tempfile import unittest - from datetime import datetime, timedelta + from dateutil.relativedelta import relativedelta +from securesystemslib import hash as sslib_hash +from securesystemslib.interface import ( + import_ed25519_privatekey_from_file, + import_ed25519_publickey_from_file, +) +from securesystemslib.keys import generate_ed25519_key +from securesystemslib.signer import Signature, SSlibSigner from tests import utils - from tuf import exceptions from tuf.api.metadata import ( + DelegatedRole, + Key, Metadata, + MetaFile, Root, Snapshot, - Timestamp, - Targets, - Key, - MetaFile, TargetFile, - DelegatedRole, + Targets, + Timestamp, ) - from tuf.api.serialization import DeserializationError - -from tuf.api.serialization.json import JSONSerializer, CanonicalJSONSerializer - -from securesystemslib.interface import ( - import_ed25519_publickey_from_file, - import_ed25519_privatekey_from_file, -) - -from securesystemslib import hash as sslib_hash -from securesystemslib.signer import SSlibSigner, Signature - -from securesystemslib.keys import generate_ed25519_key +from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer logger = logging.getLogger(__name__) diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index c575f40f3a..3bc1db6e7a 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -6,19 +6,18 @@ """Unit test for RequestsFetcher. """ +import io import logging +import math import os -import io import sys -import unittest import tempfile -import math +import unittest import tuf import tuf.exceptions import tuf.requests_fetcher import tuf.unittest_toolbox as unittest_toolbox - from tests import utils logger = logging.getLogger(__name__) diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index 652fa4cbb8..b23d2de75e 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -8,18 +8,19 @@ import io import logging +import math import os import sys -import unittest import tempfile -import math -import urllib3.exceptions +import unittest +from unittest.mock import Mock, patch + import requests +import urllib3.exceptions from tests import utils from tuf import exceptions, unittest_toolbox from tuf.ngclient._internal.requests_fetcher import RequestsFetcher -from unittest.mock import Mock, patch logger = logging.getLogger(__name__) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 9dccf8f7f6..9cb6bf12d0 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -6,27 +6,25 @@ """ +import copy import json -import sys import logging +import sys import unittest -import copy - from typing import Dict from tests import utils - from tuf.api.metadata import ( - Root, - Snapshot, - Timestamp, - Targets, + DelegatedRole, + Delegations, Key, - Role, MetaFile, + Role, + Root, + Snapshot, TargetFile, - Delegations, - DelegatedRole, + Targets, + Timestamp, ) logger = logging.getLogger(__name__) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 12e429b25a..52a28ebb84 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,30 +1,29 @@ import logging -from typing import Optional, Callable import os import sys import unittest from datetime import datetime +from typing import Callable, Optional +from securesystemslib.interface import ( + import_ed25519_privatekey_from_file, + import_rsa_privatekey_from_file, +) +from securesystemslib.signer import SSlibSigner + +from tests import utils from tuf import exceptions from tuf.api.metadata import ( Metadata, - Signed, + MetaFile, Root, - Timestamp, + Signed, Snapshot, - MetaFile, Targets, + Timestamp, ) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet -from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import ( - import_ed25519_privatekey_from_file, - import_rsa_privatekey_from_file, -) - -from tests import utils - logger = logging.getLogger(__name__) diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index eb51b945da..c1cc7f4ba9 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -5,22 +5,21 @@ """Test ngclient Updater key rotation handling""" -from dataclasses import dataclass -from typing import List, Optional, Type import os import sys import tempfile import unittest +from dataclasses import dataclass +from typing import List, Optional, Type from securesystemslib.signer import SSlibSigner -from tuf.api.metadata import Key -from tuf.exceptions import UnsignedMetadataError -from tuf.ngclient import Updater - from tests import utils from tests.repository_simulator import RepositorySimulator from tests.utils import run_sub_tests_with_dataset +from tuf.api.metadata import Key +from tuf.exceptions import UnsignedMetadataError +from tuf.ngclient import Updater @dataclass diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index d67c9403c3..a96ad9e6ba 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -6,20 +6,21 @@ """Test Updater class """ +import logging import os import shutil -import tempfile -import logging import sys -from typing import List +import tempfile import unittest -import tuf.unittest_toolbox as unittest_toolbox +from typing import List +from securesystemslib.interface import import_rsa_privatekey_from_file +from securesystemslib.signer import SSlibSigner + +import tuf.unittest_toolbox as unittest_toolbox from tests import utils -from tuf.api.metadata import Metadata, TargetFile from tuf import exceptions, ngclient -from securesystemslib.signer import SSlibSigner -from securesystemslib.interface import import_rsa_privatekey_from_file +from tuf.api.metadata import Metadata, TargetFile logger = logging.getLogger(__name__) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index dd655e11d3..957d10b6b5 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -9,16 +9,14 @@ import os import sys import tempfile -from tuf.api.metadata import SPECIFICATION_VERSION, Targets -from typing import Optional, Tuple -from tuf.exceptions import UnsignedMetadataError, BadVersionNumberError import unittest - -from tuf.ngclient import Updater +from typing import Optional, Tuple from tests import utils from tests.repository_simulator import RepositorySimulator -from securesystemslib import hash as sslib_hash +from tuf.api.metadata import SPECIFICATION_VERSION, Targets +from tuf.exceptions import BadVersionNumberError, UnsignedMetadataError +from tuf.ngclient import Updater class TestUpdater(unittest.TestCase): From d7ca7dee0ddd4c4bb0c202adf70cf789e7fce473 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 19 Oct 2021 15:32:07 +0300 Subject: [PATCH 06/12] Linting tests with pylint and exclude old tests Currently, we are using 4 tools: black, pylint, isort and mypy. We want to run all 4 of them on our tests for the new codebase to make it more readable and maintainable, but we don't want to run it on the old test files as they are following the old style guidelines. I am using the same approach as the one used for black - create an exclusion list from old test files we want to ignore. I didn't used moved or renamed the old test files as that will remove the Git history. Signed-off-by: Martin Vrachev --- tox.ini | 2 +- tuf/api/pylintrc | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index c74251e0fb..2334fe5da2 100644 --- a/tox.ini +++ b/tox.ini @@ -43,7 +43,7 @@ commands = # TODO: configure black and isort args in pyproject.toml (see #1161) black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ isort --check --diff --line-length 80 --profile black --sp pyproject.toml -p tuf tuf/api tuf/ngclient tests/ - pylint -j 0 tuf/api tuf/ngclient --rcfile=tuf/api/pylintrc + pylint -j 0 tuf/api tuf/ngclient tests/ --rcfile=tuf/api/pylintrc # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index 01139a8804..9a0f53ca5b 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -5,6 +5,42 @@ # https://google.github.io/styleguide/pylintrc # http://pylint.pycqa.org/en/latest/technical_reference/features.html +[MASTER] +ignore-paths=tests/aggregate_tests.py, + tests/fast_server_exit.py, + tests/simple_https_server.py, + tests/simple_server.py, + tests/slow_retrieval_server.py, + tests/utils.py, + tests/test_utils.py, + tests/test_log.py, + tests/test_unittest_toolbox.py, + tests/test_repository_lib.py, + tests/test_repository_tool.py, + tests/test_arbitrary_package_attack.py, + tests/test_endless_data_attack.py, + tests/test_developer_tool.py, + tests/test_sig.py, + tests/test_updater.py, + tests/test_download.py, + tests/test_extraneous_dependencies_attack.py, + tests/test_formats.py, + tests/test_indefinite_freeze_attack.py, + tests/test_key_revocation_integration.py, + tests/test_keydb.py, + tests/test_roledb.py, + tests/test_mirrors.py, + tests/test_mix_and_match_attack.py, + tests/test_replay_attack.py, + tests/test_multiple_repositories_integration.py, + tests/test_root_versioning_integration.py, + tests/test_slow_retrieval_attack.py, + tests/test_tutorial.py, + tests/test_updater_root_rotation_integration.py, + tests/repository_data/generate_project_data.py, + tests/repository_data/generate.py + + [MESSAGES CONTROL] # Disable the message, report, category or checker with the given id(s). # NOTE: To keep this config as short as possible we only disable checks that From 95939a5b4a9e8de0e2654ddfb917b94def5358a2 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Wed, 13 Oct 2021 10:47:44 +0300 Subject: [PATCH 07/12] Pylint: disable duplicate-code for the new code Pylint reported a couple of warnings flagged as "duplicate-code". We were truly duplicating code - one of the examples was when we imported the same objects from tuf/api/metadata.py: MetaFile, Role, Root, Snapshot, TargetFile, Targets, and Timestamp in two separate modules. So, I thought we do want to be repetitive here and include that code at both modules. The problem is that besides importing the above classes the modules imported other classes from tuf.api.metadata.py and there was no way to disable this check. I searched and found out that this is a known problem: https://github.com/PyCQA/pylint/issues/214. That's why the only solution I see is to disable this warning temporarily and hoping that one day when this issue is fixed we will remember to turn it on again. Signed-off-by: Martin Vrachev --- tuf/api/pylintrc | 1 + 1 file changed, 1 insertion(+) diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index 9a0f53ca5b..bb2db15d86 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -50,6 +50,7 @@ ignore-paths=tests/aggregate_tests.py, disable=fixme, too-few-public-methods, too-many-arguments, + duplicate-code [BASIC] good-names=i,j,k,v,e,f,fn,fp,_type From dc65834c96c3e66e2c1ed34fb22d3370bf073cc1 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Wed, 13 Oct 2021 19:13:43 +0300 Subject: [PATCH 08/12] Pylint: disable line-too-long for a test file Disable pylint line-too-long warning as test_metadata_serialization.py is a special case where having longer lines helps the readibility of the test cases data. Signed-off-by: Martin Vrachev --- tests/test_metadata_serialization.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 9cb6bf12d0..43d1de256c 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -27,6 +27,10 @@ Timestamp, ) +# This file is a special case where having longer lines helps the +# readability of the test cases data. +# pylint: disable=line-too-long + logger = logging.getLogger(__name__) From 2fff0d8b2ab9c2cc2c9b5c115efa206b3845a2b0 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Wed, 13 Oct 2021 19:56:53 +0300 Subject: [PATCH 09/12] Address pylint warnings on tests for the new code Address or disable pylint warnings raised on all test files inside tests/ testing the code of the new implementation. Signed-off-by: Martin Vrachev --- tests/repository_simulator.py | 78 ++++++++++++++++------------ tests/test_api.py | 31 ++++++----- tests/test_fetcher.py | 8 ++- tests/test_fetcher_ng.py | 13 +++-- tests/test_metadata_serialization.py | 1 + tests/test_trusted_metadata_set.py | 9 ++-- tests/test_updater_ng.py | 28 +++++----- tests/test_updater_with_simulator.py | 7 ++- 8 files changed, 108 insertions(+), 67 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index f5f4209bef..00d3d34c96 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -73,7 +73,7 @@ Timestamp, ) from tuf.api.serialization.json import JSONSerializer -from tuf.exceptions import FetcherHTTPError, RepositoryError +from tuf.exceptions import FetcherHTTPError from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) @@ -90,6 +90,9 @@ class RepositoryTarget: class RepositorySimulator(FetcherInterface): + """Simulates a repository that can be used for testing""" + + # pylint: disable=too-many-instance-attributes def __init__(self): self.md_root: Metadata[Root] = None self.md_timestamp: Metadata[Timestamp] = None @@ -136,6 +139,7 @@ def targets(self) -> Targets: return self.md_targets.signed def all_targets(self) -> Iterator[Tuple[str, Targets]]: + """Yield role name and signed portion of targets one by one""" yield "targets", self.md_targets.signed for role, md in self.md_delegates.items(): yield role, md.signed @@ -185,6 +189,7 @@ def publish_root(self): logger.debug("Published root v%d", self.root.version) def fetch(self, url: str) -> Iterator[bytes]: + """Fetches data from the given url and yeilds an Iterator""" if not self.root.consistent_snapshot: raise NotImplementedError("non-consistent snapshot not supported") path = parse.urlparse(url).path @@ -209,6 +214,8 @@ def fetch(self, url: str) -> Iterator[bytes]: else: raise FetcherHTTPError(f"Unknown path '{path}'", 404) + # Disable the redefined-builtin warning raised because of the hash func. + # pylint: disable-next=redefined-builtin def _fetch_target(self, target_path: str, hash: Optional[str]) -> bytes: """Return data for 'target_path', checking 'hash' if it is given. @@ -234,35 +241,35 @@ def _fetch_metadata( # return a version previously serialized in publish_root() if version is None or version > len(self.signed_roots): raise FetcherHTTPError(f"Unknown root version {version}", 404) - logger.debug("fetched root version %d", role, version) + logger.debug("fetched root version %d", version) return self.signed_roots[version - 1] + + # sign and serialize the requested metadata + if role == "timestamp": + md: Metadata = self.md_timestamp + elif role == "snapshot": + md = self.md_snapshot + elif role == "targets": + md = self.md_targets else: - # sign and serialize the requested metadata - if role == "timestamp": - md: Metadata = self.md_timestamp - elif role == "snapshot": - md = self.md_snapshot - elif role == "targets": - md = self.md_targets - else: - md = self.md_delegates[role] - - if md is None: - raise FetcherHTTPError(f"Unknown role {role}", 404) - if version is not None and version != md.signed.version: - raise FetcherHTTPError(f"Unknown {role} version {version}", 404) - - md.signatures.clear() - for signer in self.signers[role].values(): - md.sign(signer, append=True) - - logger.debug( - "fetched %s v%d with %d sigs", - role, - md.signed.version, - len(self.signers[role]), - ) - return md.to_bytes(JSONSerializer()) + md = self.md_delegates[role] + + if md is None: + raise FetcherHTTPError(f"Unknown role {role}", 404) + if version is not None and version != md.signed.version: + raise FetcherHTTPError(f"Unknown {role} version {version}", 404) + + md.signatures.clear() + for signer in self.signers[role].values(): + md.sign(signer, append=True) + + logger.debug( + "fetched %s v%d with %d sigs", + role, + md.signed.version, + len(self.signers[role]), + ) + return md.to_bytes(JSONSerializer()) def _compute_hashes_and_length( self, role: str @@ -274,6 +281,7 @@ def _compute_hashes_and_length( return hashes, len(data) def update_timestamp(self): + """Update timestamp and assign snapshot ver to snapshot_meta ver""" self.timestamp.snapshot_meta.version = self.snapshot.version if self.compute_metafile_hashes_length: @@ -284,6 +292,7 @@ def update_timestamp(self): self.timestamp.version += 1 def update_snapshot(self): + """Update snapshot, assign targets versions and update timestamp""" for role, delegate in self.all_targets(): hashes = None length = None @@ -298,6 +307,7 @@ def update_snapshot(self): self.update_timestamp() def add_target(self, role: str, data: bytes, path: str): + """Create a target from data and add it to the target_files""" if role == "targets": targets = self.targets else: @@ -316,6 +326,7 @@ def add_delegation( paths: Optional[List[str]], hash_prefixes: Optional[List[str]], ): + """Add delegated target role to the repository""" if delegator_name == "targets": delegator = self.targets else: @@ -347,17 +358,18 @@ def write(self): print(f"Repository Simulator dumps in {self.dump_dir}") self.dump_version += 1 - dir = os.path.join(self.dump_dir, str(self.dump_version)) - os.makedirs(dir) + dest_dir = os.path.join(self.dump_dir, str(self.dump_version)) + os.makedirs(dest_dir) for ver in range(1, len(self.signed_roots) + 1): - with open(os.path.join(dir, f"{ver}.root.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f: f.write(self._fetch_metadata("root", ver)) for role in ["timestamp", "snapshot", "targets"]: - with open(os.path.join(dir, f"{role}.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) + # pylint: disable-next=consider-iterating-dictionary for role in self.md_delegates.keys(): - with open(os.path.join(dir, f"{role}.json"), "wb") as f: + with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f: f.write(self._fetch_metadata(role)) diff --git a/tests/test_api.py b/tests/test_api.py index 654b4b69b3..9ed486037f 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -44,6 +44,8 @@ class TestMetadata(unittest.TestCase): + """Tests for public API of all classes in tuf/api/metadata.py""" + @classmethod def setUpClass(cls): # Create a temporary directory to store the repository, metadata, and @@ -155,8 +157,8 @@ def test_to_from_bytes(self): self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes) def test_sign_verify(self): - root_path = os.path.join(self.repo_dir, "metadata", "root.json") - root = Metadata[Root].from_file(root_path).signed + path = os.path.join(self.repo_dir, "metadata", "root.json") + root = Metadata[Root].from_file(path).signed # Locate the public keys we need from root targets_keyid = next(iter(root.roles["targets"].keyids)) @@ -203,7 +205,8 @@ def test_sign_verify(self): with self.assertRaises(exceptions.UnsignedMetadataError): targets_key.verify_signature(md_obj) - # Test failure on unknown scheme (securesystemslib UnsupportedAlgorithmError) + # Test failure on unknown scheme (securesystemslib + # UnsupportedAlgorithmError) scheme = timestamp_key.scheme timestamp_key.scheme = "foo" with self.assertRaises(exceptions.UnsignedMetadataError): @@ -231,8 +234,8 @@ def test_sign_verify(self): sig.signature = correct_sig def test_metadata_base(self): - # Use of Snapshot is arbitrary, we're just testing the base class features - # with real data + # Use of Snapshot is arbitrary, we're just testing the base class + # features with real data snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json") md = Metadata.from_file(snapshot_path) @@ -279,7 +282,7 @@ def test_metadata_snapshot(self): # Create a MetaFile instance representing what we expect # the updated data to be. hashes = { - "sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095" + "sha256": "c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095" # pylint: disable=line-too-long } fileinfo = MetaFile(2, 123, hashes) @@ -319,7 +322,7 @@ def test_metadata_timestamp(self): # Create a MetaFile instance representing what we expect # the updated data to be. hashes = { - "sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc" + "sha256": "0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc" # pylint: disable=line-too-long } fileinfo = MetaFile(2, 520, hashes) @@ -472,6 +475,7 @@ def test_is_target_in_pathpattern(self): ] for targetpath, pathpattern in supported_use_cases: self.assertTrue( + # pylint: disable-next=protected-access DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) @@ -485,6 +489,7 @@ def test_is_target_in_pathpattern(self): ] for targetpath, pathpattern in invalid_use_cases: self.assertFalse( + # pylint: disable-next=protected-access DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern) ) @@ -492,11 +497,11 @@ def test_metadata_targets(self): targets_path = os.path.join(self.repo_dir, "metadata", "targets.json") targets = Metadata[Targets].from_file(targets_path) - # Create a fileinfo dict representing what we expect the updated data to be + # Create a fileinfo dict representing the expected updated data. filename = "file2.txt" hashes = { - "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", - "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0", + "sha256": "141f740f53781d1ca54b8a50af22cbf74e44c21a998fa2a8a05aaac2c002886b", # pylint: disable=line-too-long + "sha512": "ef5beafa16041bcdd2937140afebd485296cd54f7348ecd5a4d035c09759608de467a7ac0eb58753d0242df873c305e8bffad2454aa48f44480f15efae1cacd0", # pylint: disable=line-too-long } fileinfo = TargetFile(length=28, hashes=hashes, path=filename) @@ -531,7 +536,7 @@ def test_targets_key_api(self): key_dict = { "keytype": "ed25519", "keyval": { - "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" + "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" # pylint: disable=line-too-long }, "scheme": "ed25519", } @@ -628,7 +633,7 @@ def test_length_and_hash_validation(self): ) snapshot_metafile.hashes = { - "unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + "unsupported-alg": "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # pylint: disable=line-too-long } self.assertRaises( exceptions.LengthOrHashMismatchError, @@ -638,7 +643,7 @@ def test_length_and_hash_validation(self): # Test wrong algorithm format (sslib.FormatError) snapshot_metafile.hashes = { - 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" + 256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # pylint: disable=line-too-long } self.assertRaises( exceptions.LengthOrHashMismatchError, diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 3bc1db6e7a..d03fab375e 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -17,13 +17,15 @@ import tuf import tuf.exceptions import tuf.requests_fetcher -import tuf.unittest_toolbox as unittest_toolbox from tests import utils +from tuf import unittest_toolbox logger = logging.getLogger(__name__) class TestFetcher(unittest_toolbox.Modified_TestCase): + """Unit tests for RequestFetcher""" + def setUp(self): """ Create a temporary file and launch a simple server in the @@ -35,7 +37,8 @@ def setUp(self): # Making a temporary file. current_dir = os.getcwd() target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, "r") + # pylint: disable-next=consider-using-with + self.target_fileobj = open(target_filepath, "r", encoding="utf8") self.file_contents = self.target_fileobj.read() self.file_length = len(self.file_contents) @@ -54,6 +57,7 @@ def setUp(self): # Create a temporary file where the target file chunks are written # during fetching + # pylint: disable-next=consider-using-with self.temp_file = tempfile.TemporaryFile() self.fetcher = tuf.requests_fetcher.RequestsFetcher() diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index b23d2de75e..4d10c30202 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -26,6 +26,8 @@ class TestFetcher(unittest_toolbox.Modified_TestCase): + """Test RequestsFetcher class""" + @classmethod def setUpClass(cls): # Launch a SimpleHTTPServer (serves files in the current dir). @@ -48,11 +50,16 @@ def setUp(self): current_dir = os.getcwd() target_filepath = self.make_temp_data_file(directory=current_dir) - self.target_fileobj = open(target_filepath, "r") + # pylint: disable-next=consider-using-with + self.target_fileobj = open(target_filepath, "r", encoding="utf8") self.file_contents = self.target_fileobj.read() self.file_length = len(self.file_contents) self.rel_target_filepath = os.path.basename(target_filepath) - self.url = f"http://{utils.TEST_HOST_ADDRESS}:{str(self.server_process_handler.port)}/{self.rel_target_filepath}" + self.url_prefix = ( + f"http://{utils.TEST_HOST_ADDRESS}:" + f"{str(self.server_process_handler.port)}" + ) + self.url = f"{self.url_prefix}/{self.rel_target_filepath}" # Instantiate a concrete instance of FetcherInterface self.fetcher = RequestsFetcher() @@ -106,7 +113,7 @@ def test_url_parsing(self): # File not found error def test_http_error(self): with self.assertRaises(exceptions.FetcherHTTPError) as cm: - self.url = f"http://{utils.TEST_HOST_ADDRESS}:{str(self.server_process_handler.port)}/non-existing-path" + self.url = f"{self.url_prefix}/non-existing-path" self.fetcher.fetch(self.url) self.assertEqual(cm.exception.status_code, 404) diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 43d1de256c..a14ec91684 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -35,6 +35,7 @@ class TestSerialization(unittest.TestCase): + """Test serialization for all classes in tuf/api/metadata.py""" # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 52a28ebb84..9a8708fcfa 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,3 +1,4 @@ +"""Unit tests for tuf/ngclient/_internal/trusted_metadata_set.py""" import logging import os import sys @@ -26,16 +27,18 @@ logger = logging.getLogger(__name__) - +# pylint: disable-next=too-many-public-methods class TestTrustedMetadataSet(unittest.TestCase): + """Tests for all public API of the TrustedMetadataSet class""" + def modify_metadata( - self, rolename: str, modification_func: Callable[["Signed"], None] + self, rolename: str, modification_func: Callable[[Signed], None] ) -> bytes: """Instantiate metadata from rolename type, call modification_func and sign it again with self.keystore[rolename] signer. Attributes: - rolename: A denoting the name of the metadata which will be modified. + rolename: Ddenoting the name of the metadata which will be modified. modification_func: Function that will be called to modify the signed portion of metadata bytes. """ diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index a96ad9e6ba..5c781da09a 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -17,20 +17,22 @@ from securesystemslib.interface import import_rsa_privatekey_from_file from securesystemslib.signer import SSlibSigner -import tuf.unittest_toolbox as unittest_toolbox from tests import utils -from tuf import exceptions, ngclient +from tuf import exceptions, ngclient, unittest_toolbox from tuf.api.metadata import Metadata, TargetFile logger = logging.getLogger(__name__) class TestUpdater(unittest_toolbox.Modified_TestCase): + """Test the Updater class from tuf.ngclient.updater.py""" + @classmethod def setUpClass(cls): - # Create a temporary directory to store the repository, metadata, and target - # files. 'temporary_directory' must be deleted in TearDownModule() so that - # temporary files are always removed, even when exceptions occur. + # Create a temporary directory to store the repository, metadata, and + # target files. 'temporary_directory' must be deleted in + # TearDownModule() so that temporary files are always removed, even when + # exceptions occur. cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) # Needed because in some tests simple_server.py cannot be found. @@ -51,8 +53,8 @@ def tearDownClass(cls): # Cleans the resources and flush the logged lines (if any). cls.server_process_handler.clean() - # Remove the temporary repository directory, which should contain all the - # metadata, targets, and key files generated for the test cases + # Remove the temporary repository directory, which should contain all + # the metadata, targets, and key files generated for the test cases shutil.rmtree(cls.temporary_directory) def setUp(self): @@ -60,15 +62,15 @@ def setUp(self): unittest_toolbox.Modified_TestCase.setUp(self) # Copy the original repository files provided in the test folder so that - # any modifications made to repository files are restricted to the copies. + # any modifications are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf.tests/'. original_repository_files = os.path.join(os.getcwd(), "repository_data") temporary_repository_root = self.make_temp_directory( directory=self.temporary_directory ) - # The original repository, keystore, and client directories will be copied - # for each test case. + # The original repository, keystore, and client directories will be + # copied for each test case. original_repository = os.path.join( original_repository_files, "repository" ) @@ -183,6 +185,7 @@ def _assert_files(self, roles: List[str]): client_files = sorted(os.listdir(self.client_directory)) self.assertEqual(client_files, expected_files) + # pylint: disable=protected-access def test_refresh_on_consistent_targets(self): # Generate a new root version where consistent_snapshot is set to true def consistent_snapshot_modifier(root): @@ -228,7 +231,7 @@ def consistent_snapshot_modifier(root): self.assertEqual(path, os.path.join(self.dl_dir, info3.path)) def test_refresh_and_download(self): - # Test refresh without consistent targets - targets without hash prefixes. + # Test refresh without consistent targets - targets without hash prefix. # top-level targets are already in local cache (but remove others) os.remove(os.path.join(self.client_directory, "role1.json")) @@ -275,7 +278,7 @@ def test_refresh_with_only_local_root(self): self._assert_files(["root", "snapshot", "targets", "timestamp"]) # Get targetinfo for 'file3.txt' listed in the delegated role1 - targetinfo3 = self.updater.get_targetinfo("file3.txt") + self.updater.get_targetinfo("file3.txt") expected_files = ["role1", "root", "snapshot", "targets", "timestamp"] self._assert_files(expected_files) @@ -317,6 +320,7 @@ def test_length_hash_mismatch(self): targetinfo.hashes = {"sha256": "abcd"} self.updater.download_target(targetinfo) + # pylint: disable=protected-access def test_updating_root(self): # Bump root version, resign and refresh self._modify_repository_root(lambda root: None, bump_version=True) diff --git a/tests/test_updater_with_simulator.py b/tests/test_updater_with_simulator.py index 957d10b6b5..99c99454f9 100644 --- a/tests/test_updater_with_simulator.py +++ b/tests/test_updater_with_simulator.py @@ -20,10 +20,13 @@ class TestUpdater(unittest.TestCase): + """Test ngclient Updater using the repository simulator""" + # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None def setUp(self): + # pylint: disable-next=consider-using-with self.temp_dir = tempfile.TemporaryDirectory() self.metadata_dir = os.path.join(self.temp_dir.name, "metadata") self.targets_dir = os.path.join(self.temp_dir.name, "targets") @@ -48,6 +51,7 @@ def tearDown(self): self.temp_dir.cleanup() def _run_refresh(self) -> Updater: + """Creates a new updater and runs refresh.""" if self.sim.dump_dir is not None: self.sim.write() @@ -142,6 +146,7 @@ def test_fishy_rolenames(self): # Add new delegated targets, update the snapshot spec_version = ".".join(SPECIFICATION_VERSION) targets = Targets(1, spec_version, self.sim.safe_expiry, {}, None) + # pylint: disable-next=consider-iterating-dictionary for role in roles_to_filenames.keys(): self.sim.add_delegation( "targets", role, targets, False, ["*"], None @@ -214,7 +219,7 @@ def test_snapshot_rollback_with_local_snapshot_hash_mismatch(self): self.sim.update_snapshot() self._run_refresh() - # The new targets should have a lower version than the local trusted one. + # The new targets must have a lower version than the local trusted one. self.sim.targets.version = 1 self.sim.update_snapshot() From 1a5bde9180923d8b7f43b78e69722fe21d2079d3 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Thu, 21 Oct 2021 16:15:58 +0300 Subject: [PATCH 10/12] Pylint config: add _ as a good variable name _ is often used when a function returns multiple values and you need a sub-portion of them. Then, those values that are unnecessary can be named _. Currently, pylint warns us that this is not a good variable name, so fix that. Signed-off-by: Martin Vrachev --- tuf/api/pylintrc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc index bb2db15d86..e745a377f9 100644 --- a/tuf/api/pylintrc +++ b/tuf/api/pylintrc @@ -53,7 +53,7 @@ disable=fixme, duplicate-code [BASIC] -good-names=i,j,k,v,e,f,fn,fp,_type +good-names=i,j,k,v,e,f,fn,fp,_type,_ # Regexes for allowed names are copied from the Google pylintrc # NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. # If there are multiple groups it enfoces the prevalent naming style inside From 98f16c9574dec884e6cdab1162eed1cee7c9682f Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Mon, 25 Oct 2021 17:10:30 +0300 Subject: [PATCH 11/12] Move pylint config for new code in pyproject.toml Move configuration for pylint linting tool over the refactored code in pyproject.toml. This is done in order to have one unified place where we will have the 30+ lines exclusion list of old test files we don't want to lint. Signed-off-by: Martin Vrachev --- pyproject.toml | 96 ++++++++++++++++++++++++++++++++++++++++++++++++ tox.ini | 2 +- tuf/api/pylintrc | 90 --------------------------------------------- 3 files changed, 97 insertions(+), 91 deletions(-) delete mode 100644 tuf/api/pylintrc diff --git a/pyproject.toml b/pyproject.toml index 5ca0e890b4..1c97624cfd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,9 @@ +# Build-system section [build-system] requires = ["setuptools>=40.8.0", "wheel"] build-backend = "setuptools.build_meta" +# Black section [tool.black] force-exclude=""" (tests/aggregate_tests.py| @@ -38,6 +40,7 @@ tests/test_updater_root_rotation_integration.py| tests/repository_data/generate_project_data.py| tests/repository_data/generate.py)""" +# isort section [tool.isort] skip = [ "tests/fast_server_exit.py", @@ -73,3 +76,96 @@ skip = [ "tests/repository_data/generate_project_data.py", "tests/repository_data/generate.py", ] + +# Pylint section + +# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: +# https://github.com/secure-systems-lab/code-style-guidelines +# +# Based on Google Python Style Guide pylintrc and pylint defaults: +# https://google.github.io/styleguide/pylintrc +# http://pylint.pycqa.org/en/latest/technical_reference/features.html + +[tool.pylint.master] +ignore = [ + "aggregate_tests.py", + "fast_server_exit.py", + "simple_https_server.py", + "simple_server.py", + "slow_retrieval_server.py", + "utils.py", + "test_utils.py", + "test_repository_lib.py", + "test_arbitrary_package_attack.py", + "test_developer_tool.py", + "test_download.py", + "test_endless_data_attack.py", + "test_extraneous_dependencies_attack.py", + "test_formats.py", + "test_indefinite_freeze_attack.py", + "test_key_revocation_integration.py", + "test_keydb.py", + "test_log.py", + "test_mirrors.py", + "test_mix_and_match_attack.py", + "test_multiple_repositories_integration.py", + "test_repository_tool.py", + "test_replay_attack.py", + "test_roledb.py", + "test_root_versioning_integration.py", + "test_sig.py", + "test_slow_retrieval_attack.py", + "test_tutorial.py", + "test_unittest_toolbox.py", + "test_updater.py", + "test_updater_root_rotation_integration.py", + "generate_project_data.py", + "generate.py", +] + +[tool.pylint.message_control] +# Disable the message, report, category or checker with the given id(s). +# NOTE: To keep this config as short as possible we only disable checks that +# are currently in conflict with our code. If new code displeases the linter +# (for good reasons) consider updating this config file, or disable checks with. +disable=[ + "fixme", + "too-few-public-methods", + "too-many-arguments", + "duplicate-code" +] + +[tool.pylint.basic] +good-names = ["i","j","k","v","e","f","fn","fp","_type","_"] +# Regexes for allowed names are copied from the Google pylintrc +# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. +# If there are multiple groups it enfoces the prevalent naming style inside +# each modules. Names in the exempt capturing group are ignored. +function-rgx="""^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$""" +ethod-rgx="""(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$""" +argument-rgx="""^[a-z][a-z0-9_]*$""" +attr-rgx="""^_{0,2}[a-z][a-z0-9_]*$""" +class-attribute-rgx="""^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$""" +class-rgx="^_?[A-Z][a-zA-Z0-9]*$" +const-rgx="^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$" +inlinevar-rgx="^[a-z][a-z0-9_]*$" +module-rgx="^(_?[a-z][a-z0-9_]*|__init__)$" +no-docstring-rgx="(__.*__|main|test.*|.*test|.*Test)$" +variable-rgx="^[a-z][a-z0-9_]*$" +docstring-min-length=10 + +[tool.pylint.format] +ignore-long-lines='(?x)(^\s*(\#\ )??$|^\s*(from\s+\S+\s+)?import\s+.+$)' +indent-string=" " +indent-after-paren=4 +max-line-length=80 +single-line-if-stmt="yes" + +[tool.pylint.logging] +logging-format-style="old" + +[tool.pylint.miscellaneous] +notes="TODO" + +[tool.pylint.STRING] +check-quote-consistency="yes" diff --git a/tox.ini b/tox.ini index 2334fe5da2..dbed7172fe 100644 --- a/tox.ini +++ b/tox.ini @@ -43,7 +43,7 @@ commands = # TODO: configure black and isort args in pyproject.toml (see #1161) black --check --diff --line-length 80 --config pyproject.toml tests/ tuf/api tuf/ngclient tests/ isort --check --diff --line-length 80 --profile black --sp pyproject.toml -p tuf tuf/api tuf/ngclient tests/ - pylint -j 0 tuf/api tuf/ngclient tests/ --rcfile=tuf/api/pylintrc + pylint -j 0 tuf/api tuf/ngclient tests/ --rcfile=pyproject.toml # NOTE: Contrary to what the pylint docs suggest, ignoring full paths does # work, unfortunately each subdirectory has to be ignored explicitly. diff --git a/tuf/api/pylintrc b/tuf/api/pylintrc deleted file mode 100644 index e745a377f9..0000000000 --- a/tuf/api/pylintrc +++ /dev/null @@ -1,90 +0,0 @@ -# Minimal pylint configuration file for Secure Systems Lab Python Style Guide: -# https://github.com/secure-systems-lab/code-style-guidelines -# -# Based on Google Python Style Guide pylintrc and pylint defaults: -# https://google.github.io/styleguide/pylintrc -# http://pylint.pycqa.org/en/latest/technical_reference/features.html - -[MASTER] -ignore-paths=tests/aggregate_tests.py, - tests/fast_server_exit.py, - tests/simple_https_server.py, - tests/simple_server.py, - tests/slow_retrieval_server.py, - tests/utils.py, - tests/test_utils.py, - tests/test_log.py, - tests/test_unittest_toolbox.py, - tests/test_repository_lib.py, - tests/test_repository_tool.py, - tests/test_arbitrary_package_attack.py, - tests/test_endless_data_attack.py, - tests/test_developer_tool.py, - tests/test_sig.py, - tests/test_updater.py, - tests/test_download.py, - tests/test_extraneous_dependencies_attack.py, - tests/test_formats.py, - tests/test_indefinite_freeze_attack.py, - tests/test_key_revocation_integration.py, - tests/test_keydb.py, - tests/test_roledb.py, - tests/test_mirrors.py, - tests/test_mix_and_match_attack.py, - tests/test_replay_attack.py, - tests/test_multiple_repositories_integration.py, - tests/test_root_versioning_integration.py, - tests/test_slow_retrieval_attack.py, - tests/test_tutorial.py, - tests/test_updater_root_rotation_integration.py, - tests/repository_data/generate_project_data.py, - tests/repository_data/generate.py - - -[MESSAGES CONTROL] -# Disable the message, report, category or checker with the given id(s). -# NOTE: To keep this config as short as possible we only disable checks that -# are currently in conflict with our code. If new code displeases the linter -# (for good reasons) consider updating this config file, or disable checks with -# 'pylint: disable=XYZ' comments. -disable=fixme, - too-few-public-methods, - too-many-arguments, - duplicate-code - -[BASIC] -good-names=i,j,k,v,e,f,fn,fp,_type,_ -# Regexes for allowed names are copied from the Google pylintrc -# NOTE: Pylint captures regex name groups such as 'snake_case' or 'camel_case'. -# If there are multiple groups it enfoces the prevalent naming style inside -# each modules. Names in the exempt capturing group are ignored. -function-rgx=^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$ -method-rgx=(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$ -argument-rgx=^[a-z][a-z0-9_]*$ -attr-rgx=^_{0,2}[a-z][a-z0-9_]*$ -class-attribute-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ -class-rgx=^_?[A-Z][a-zA-Z0-9]*$ -const-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$ -inlinevar-rgx=^[a-z][a-z0-9_]*$ -module-rgx=^(_?[a-z][a-z0-9_]*|__init__)$ -no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$ -variable-rgx=^[a-z][a-z0-9_]*$ -docstring-min-length=10 - -[FORMAT] -ignore-long-lines=(?x)( - ^\s*(\#\ )??$| - ^\s*(from\s+\S+\s+)?import\s+.+$) -indent-string=" " -indent-after-paren=4 -max-line-length=80 -single-line-if-stmt=yes - -[LOGGING] -logging-format-style=old - -[MISCELLANEOUS] -notes=TODO - -[STRING] -check-quote-consistency=yes From 17fdcbe4d30c5d1cf004b777440be281815354e5 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Mon, 25 Oct 2021 17:37:53 +0300 Subject: [PATCH 12/12] Move mypy config for new code in pyproject.toml Move configuration for mypy linting tool over the refactored code in pyproject.toml. This is done in order to have one unified place where we will have the 30+ lines exclusion list of old test files we don't want to lint. Signed-off-by: Martin Vrachev --- pyproject.toml | 23 +++++++++++++++++++++++ setup.cfg | 18 ------------------ 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1c97624cfd..ce695fcad0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -169,3 +169,26 @@ notes="TODO" [tool.pylint.STRING] check-quote-consistency="yes" + +# mypy section +# Read more here: https://mypy.readthedocs.io/en/stable/config_file.html#using-a-pyproject-toml-file +[tool.mypy] +warn_unused_configs = "True" +warn_redundant_casts = "True" +warn_unused_ignores = "True" +warn_unreachable = "True" +strict_equality = "True" +disallow_untyped_defs = "True" +disallow_untyped_calls = "True" +files = [ + "tuf/api/", + "tuf/ngclient", + "tuf/exceptions.py" +] + +[[tool.mypy.overrides]] +module = [ + "securesystemslib.*", + "urllib3.*" +] +ignore_missing_imports = "True" diff --git a/setup.cfg b/setup.cfg index f880a3893d..64fc786f15 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,21 +50,3 @@ ignore = .fossa.yml .readthedocs.yaml -[mypy] -warn_unused_configs = True -warn_redundant_casts = True -warn_unused_ignores = True -warn_unreachable = True -strict_equality = True -disallow_untyped_defs = True -disallow_untyped_calls = True -files = - tuf/api/, - tuf/ngclient, - tuf/exceptions.py - -[mypy-securesystemslib.*] -ignore_missing_imports = True - -[mypy-urllib3.*] -ignore_missing_imports = True