diff --git a/tests/test_updater_rework.py b/tests/test_updater_rework.py index b564fbf57e..3ea9d08622 100644 --- a/tests/test_updater_rework.py +++ b/tests/test_updater_rework.py @@ -79,8 +79,6 @@ def setUp(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.setUp(self) - self.repository_name = 'test_repository1' - # Copy the original repository files provided in the test folder so that # any modifications made to repository files are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf.tests/'. @@ -92,7 +90,7 @@ def setUp(self): # for each test case. original_repository = os.path.join(original_repository_files, 'repository') original_keystore = os.path.join(original_repository_files, 'keystore') - original_client = os.path.join(original_repository_files, 'client') + original_client = os.path.join(original_repository_files, 'client', 'test_repository1', 'metadata', 'current') # Save references to the often-needed client repository directories. # Test cases need these references to access metadata and target files. @@ -101,12 +99,7 @@ def setUp(self): self.keystore_directory = \ os.path.join(temporary_repository_root, 'keystore') - self.client_directory = os.path.join(temporary_repository_root, - 'client') - self.client_metadata = os.path.join(self.client_directory, - self.repository_name, 'metadata') - self.client_metadata_current = os.path.join(self.client_metadata, - 'current') + self.client_directory = os.path.join(temporary_repository_root, 'client') # Copy the original 'repository', 'client', and 'keystore' directories # to the temporary repository the test cases can use. @@ -119,25 +112,14 @@ def setUp(self): url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \ + str(self.server_process_handler.port) + repository_basepath - # Setting 'tuf.settings.repository_directory' with the temporary client - # directory copied from the original repository files. - tuf.settings.repositories_directory = self.client_directory - metadata_url = f"{url_prefix}/metadata/" targets_url = f"{url_prefix}/targets/" # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. - self.repository_updater = updater.Updater(self.repository_name, + self.repository_updater = updater.Updater(self.client_directory, metadata_url, targets_url) - # Metadata role keys are needed by the test cases to make changes to the - # repository (e.g., adding a new target file to 'targets.json' and then - # requesting a refresh()). - self.role_keys = _load_role_keys(self.keystore_directory) - - - def tearDown(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.tearDown(self) @@ -145,24 +127,10 @@ def tearDown(self): # Logs stdout and stderr from the sever subprocess. self.server_process_handler.flush_log() - - - # UNIT TESTS. def test_refresh(self): - + # All metadata is in local directory already self.repository_updater.refresh() - for role in ['root', 'timestamp', 'snapshot', 'targets']: - metadata_obj = metadata.Metadata.from_file(os.path.join( - self.client_metadata_current, role + '.json')) - - metadata_obj_2 = metadata.Metadata.from_file(os.path.join( - self.repository_directory, 'metadata', role + '.json')) - - - self.assertDictEqual(metadata_obj.to_dict(), - metadata_obj_2.to_dict()) - # Get targetinfo for 'file1.txt' listed in targets targetinfo1 = self.repository_updater.get_one_valid_targetinfo('file1.txt') # Get targetinfo for 'file3.txt' listed in the delegated role1 @@ -187,60 +155,16 @@ def test_refresh(self): self.assertListEqual(updated_targets, []) + def test_refresh_with_only_local_root(self): + os.remove(os.path.join(self.client_directory, "timestamp.json")) + os.remove(os.path.join(self.client_directory, "snapshot.json")) + os.remove(os.path.join(self.client_directory, "targets.json")) + os.remove(os.path.join(self.client_directory, "role1.json")) -def _load_role_keys(keystore_directory): - - # Populating 'self.role_keys' by importing the required public and private - # keys of 'tuf/tests/repository_data/'. The role keys are needed when - # modifying the remote repository used by the test cases in this unit test. - - # The pre-generated key files in 'repository_data/keystore' are all encrypted with - # a 'password' passphrase. - EXPECTED_KEYFILE_PASSWORD = 'password' - - # Store and return the cryptography keys of the top-level roles, including 1 - # delegated role. - role_keys = {} - - root_key_file = os.path.join(keystore_directory, 'root_key') - targets_key_file = os.path.join(keystore_directory, 'targets_key') - snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key') - timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key') - delegation_key_file = os.path.join(keystore_directory, 'delegation_key') - - role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {}, - 'role1': {}} - - # Import the top-level and delegated role public keys. - role_keys['root']['public'] = \ - repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub') - role_keys['targets']['public'] = \ - repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub') - role_keys['snapshot']['public'] = \ - repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub') - role_keys['timestamp']['public'] = \ - repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub') - role_keys['role1']['public'] = \ - repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub') - - # Import the private keys of the top-level and delegated roles. - role_keys['root']['private'] = \ - repo_tool.import_rsa_privatekey_from_file(root_key_file, - EXPECTED_KEYFILE_PASSWORD) - role_keys['targets']['private'] = \ - repo_tool.import_ed25519_privatekey_from_file(targets_key_file, - EXPECTED_KEYFILE_PASSWORD) - role_keys['snapshot']['private'] = \ - repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file, - EXPECTED_KEYFILE_PASSWORD) - role_keys['timestamp']['private'] = \ - repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file, - EXPECTED_KEYFILE_PASSWORD) - role_keys['role1']['private'] = \ - repo_tool.import_ed25519_privatekey_from_file(delegation_key_file, - EXPECTED_KEYFILE_PASSWORD) - - return role_keys + self.repository_updater.refresh() + + # Get targetinfo for 'file3.txt' listed in the delegated role1 + targetinfo3= self.repository_updater.get_one_valid_targetinfo('file3.txt') if __name__ == '__main__': utils.configure_test_logging(sys.argv) diff --git a/tuf/client_rework/metadata_wrapper.py b/tuf/client_rework/metadata_wrapper.py deleted file mode 100644 index fbc3335c3e..0000000000 --- a/tuf/client_rework/metadata_wrapper.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2021, New York University and the TUF contributors -# SPDX-License-Identifier: MIT OR Apache-2.0 - -"""Metadata wrapper -""" -import time - -from securesystemslib.keys import format_metadata_to_key - -from tuf import exceptions, formats -from tuf.api import metadata - - -class MetadataWrapper: - """Helper classes extending or adding missing - functionality to metadata API - """ - - def __init__(self, meta): - self._meta = meta - - @classmethod - def from_json_object(cls, raw_data): - """Loads JSON-formatted TUF metadata from a file object.""" - # Use local scope import to avoid circular import errors - # pylint: disable=import-outside-toplevel - from tuf.api.serialization.json import JSONDeserializer - - deserializer = JSONDeserializer() - meta = deserializer.deserialize(raw_data) - return cls(meta=meta) - - @classmethod - def from_json_file(cls, filename): - """Loads JSON-formatted TUF metadata from a file.""" - meta = metadata.Metadata.from_file(filename) - return cls(meta=meta) - - @property - def signed(self): - """ - TODO - """ - return self._meta.signed - - @property - def version(self): - """ - TODO - """ - return self._meta.signed.version - - def verify(self, keys, threshold): - """ - TODO - """ - verified = 0 - # 1.3. Check signatures - for key in keys: - self._meta.verify(key) - verified += 1 - - if verified < threshold: - raise exceptions.InsufficientKeysError - - def persist(self, filename): - """ - TODO - """ - self._meta.to_file(filename) - - def expires(self, reference_time=None): - """ - TODO - """ - if reference_time is None: - expires_timestamp = formats.datetime_to_unix_timestamp( - self._meta.signed.expires - ) - reference_time = int(time.time()) - - if expires_timestamp < reference_time: - raise exceptions.ExpiredMetadataError - - -class RootWrapper(MetadataWrapper): - """ - TODO - """ - - def keys(self, role): - """ - TODO - """ - keys = [] - for keyid in self._meta.signed.roles[role].keyids: - key_metadata = self._meta.signed.keys[keyid].to_dict() - key, dummy = format_metadata_to_key(key_metadata) - keys.append(key) - - return keys - - def threshold(self, role): - """ - TODO - """ - return self._meta.signed.roles[role].threshold - - -class TimestampWrapper(MetadataWrapper): - """ - TODO - """ - - @property - def snapshot(self): - """ - TODO - """ - return self._meta.signed.meta["snapshot.json"] - - -class SnapshotWrapper(MetadataWrapper): - """ - TODO - """ - - def role(self, name): - """ - TODO - """ - return self._meta.signed.meta[name + ".json"] - - -class TargetsWrapper(MetadataWrapper): - """ - TODO - """ - - @property - def targets(self): - """ - TODO - """ - return self._meta.signed.targets - - @property - def delegations(self): - """ - TODO - """ - return self._meta.signed.delegations - - def keys(self, role): - """ - TODO - """ - keys = [] - if self._meta.signed.delegations is not None: - for delegation in self._meta.signed.delegations.roles: - if delegation.name == role: - for keyid in delegation.keyids: - key_metadata = self._meta.signed.delegations.keys[keyid] - key, dummy = format_metadata_to_key( - key_metadata.to_dict() - ) - keys.append(key) - return keys - - return keys - - def threshold(self, role): - """ - TODO - """ - if self._meta.signed.delegations is not None: - for delegation in self._meta.signed.delegations.roles: - if delegation.name == role: - return delegation.threshold - - return None diff --git a/tuf/client_rework/updater_rework.py b/tuf/client_rework/updater_rework.py index fbf3778ee7..470c8a14f9 100644 --- a/tuf/client_rework/updater_rework.py +++ b/tuf/client_rework/updater_rework.py @@ -17,18 +17,18 @@ from securesystemslib import hash as sslib_hash from securesystemslib import util as sslib_util -from tuf import exceptions, settings +from tuf import exceptions from tuf.client.fetcher import FetcherInterface -from tuf.client_rework import download, requests_fetcher - -from .metadata_wrapper import ( - RootWrapper, - SnapshotWrapper, - TargetsWrapper, - TimestampWrapper, -) +from tuf.client_rework import download, metadata_bundle, requests_fetcher # Globals +MAX_ROOT_ROTATIONS = 32 +MAX_DELEGATIONS = 32 +DEFAULT_ROOT_MAX_LENGTH = 512000 # bytes +DEFAULT_TIMESTAMP_MAX_LENGTH = 16384 # bytes +DEFAULT_SNAPSHOT_MAX_LENGTH = 2000000 # bytes +DEFAULT_TARGETS_MAX_LENGTH = 5000000 # bytes + logger = logging.getLogger(__name__) # Classes @@ -41,29 +41,35 @@ class Updater: def __init__( self, - repository_name: str, + repository_dir: str, metadata_base_url: str, target_base_url: Optional[str] = None, fetcher: Optional[FetcherInterface] = None, ): """ Args: - repository_name: directory name (within a local directory - defined by 'tuf.settings.repositories_directory') + repository_dir: Local metadata directory. Directory must be + writable and it must contain at least a root.json file. metadata_base_url: Base URL for all remote metadata downloads target_base_url: Optional; Default base URL for all remote target downloads. Can be individually set in download_target() fetcher: Optional; FetcherInterface implementation used to download both metadata and targets. Default is RequestsFetcher + + Raises: + OSError: Local root.json cannot be read + RepositoryError: Local root.json is invalid """ - self._repository_name = repository_name + self._dir = repository_dir self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) if target_base_url is None: self._target_base_url = None else: self._target_base_url = _ensure_trailing_slash(target_base_url) - self._consistent_snapshot = False - self._metadata = {} + + # Read trusted local root metadata + data = self._load_local_metadata("root") + self._bundle = metadata_bundle.MetadataBundle(data) if fetcher is None: self._fetcher = requests_fetcher.RequestsFetcher() @@ -82,6 +88,11 @@ def refresh(self) -> None: The refresh() method should be called by the client before any target requests. + + Raises: + OSError: New metadata could not be written to disk + RepositoryError: Metadata failed to verify in some way + TODO: download-related errors """ self._load_root() @@ -101,6 +112,11 @@ def get_one_valid_targetinfo(self, target_path: str) -> Dict: (https://url.spec.whatwg.org/#path-relative-url-string). Typically this is also the unix file path of the eventually downloaded file. + + Raises: + OSError: New metadata could not be written to disk + RepositoryError: Metadata failed to verify in some way + TODO: download-related errors """ return self._preorder_depth_first_walk(target_path) @@ -171,6 +187,10 @@ def download_target( destination_directory as required. target_base_url: Optional; Base URL used to form the final target download URL. Default is the value provided in Updater() + + Raises: + TODO: download-related errors + TODO: file write errors """ if target_base_url is None and self._target_base_url is None: raise ValueError( @@ -195,318 +215,113 @@ def download_target( ) sslib_util.persist_temp_file(target_file, filepath) - def _get_full_meta_name( - self, role: str, extension: str = ".json", version: int = None - ) -> str: - """ - Helper method returning full metadata file path given the role name - and file extension. - """ + def _download_metadata( + self, rolename: str, length: int, version: Optional[int] = None + ) -> bytes: + """download a metadata file and return it as bytes""" if version is None: - filename = role + extension + filename = f"{rolename}.json" else: - filename = str(version) + "." + role + extension - return os.path.join( - settings.repositories_directory, - self._repository_name, - "metadata", - "current", - filename, + filename = f"{version}.{rolename}.json" + url = parse.urljoin(self._metadata_base_url, filename) + return download.download_bytes( + url, + length, + self._fetcher, + strict_required_length=False, ) + def _load_local_metadata(self, rolename: str) -> bytes: + with open(os.path.join(self._dir, f"{rolename}.json"), "rb") as f: + return f.read() + + def _persist_metadata(self, rolename: str, data: bytes): + with open(os.path.join(self._dir, f"{rolename}.json"), "wb") as f: + f.write(data) + def _load_root(self) -> None: - """ - If metadata file for 'root' role does not exist locally, download it - over a network, verify it and store it permanently. - """ + """Load remote root metadata. - # Load trusted root metadata - # TODO: this should happen much earlier, on Updater.__init__ - self._metadata["root"] = RootWrapper.from_json_file( - self._get_full_meta_name("root") - ) + Sequentially load and persist on local disk every newer root metadata + version available on the remote. + """ # Update the root role - # 1.1. Let N denote the version number of the trusted - # root metadata file. - lower_bound = self._metadata["root"].version - upper_bound = lower_bound + settings.MAX_NUMBER_ROOT_ROTATIONS - intermediate_root = None + lower_bound = self._bundle.root.signed.version + 1 + upper_bound = lower_bound + MAX_ROOT_ROTATIONS for next_version in range(lower_bound, upper_bound): try: - root_url = parse.urljoin( - self._metadata_base_url, f"{next_version}.root.json" + data = self._download_metadata( + "root", DEFAULT_ROOT_MAX_LENGTH, next_version ) - # For each version of root iterate over the list of mirrors - # until an intermediate root is successfully downloaded and - # verified. - data = download.download_bytes( - root_url, - settings.DEFAULT_ROOT_REQUIRED_LENGTH, - self._fetcher, - strict_required_length=False, - ) - - intermediate_root = self._verify_root(data) - # TODO: persist should happen here for each intermediate - # root according to the spec + self._bundle.update_root(data) + self._persist_metadata("root", data) except exceptions.FetcherHTTPError as exception: if exception.status_code not in {403, 404}: raise - # Stop looking for a bigger version if "File not found" - # error is received + # 404/403 means current root is newest available break - if intermediate_root: - # Check for a freeze attack. The latest known time MUST be lower - # than the expiration timestamp in the trusted root metadata file - # TODO define which exceptions are part of the public API - intermediate_root.expires() - - # 1.9. If the timestamp and / or snapshot keys have been rotated, - # then delete the trusted timestamp and snapshot metadata files. - if self._metadata["root"].keys( - "timestamp" - ) != intermediate_root.keys("timestamp"): - # FIXME: use abstract storage - os.remove(self._get_full_meta_name("timestamp")) - self._metadata["timestamp"] = {} - - if self._metadata["root"].keys( - "snapshot" - ) != intermediate_root.keys("snapshot"): - # FIXME: use abstract storage - os.remove(self._get_full_meta_name("snapshot")) - self._metadata["snapshot"] = {} - - # Set the trusted root metadata file to the new root - # metadata file - self._metadata["root"] = intermediate_root - # Persist root metadata. The client MUST write the file to - # non-volatile storage as FILENAME.EXT (e.g. root.json). - self._metadata["root"].persist(self._get_full_meta_name("root")) - - # 1.10. Set whether consistent snapshots are used as per - # the trusted root metadata file - self._consistent_snapshot = self._metadata[ - "root" - ].signed.consistent_snapshot + # Verify final root + self._bundle.root_update_finished() def _load_timestamp(self) -> None: - """ - TODO - """ - # TODO Check if timestamp exists locally - timestamp_url = parse.urljoin(self._metadata_base_url, "timestamp.json") - data = download.download_bytes( - timestamp_url, - settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH, - self._fetcher, - strict_required_length=False, - ) - self._metadata["timestamp"] = self._verify_timestamp(data) - self._metadata["timestamp"].persist( - self._get_full_meta_name("timestamp.json") + """Load local and remote timestamp metadata""" + try: + data = self._load_local_metadata("timestamp") + self._bundle.update_timestamp(data) + except (OSError, exceptions.RepositoryError) as e: + # Local timestamp does not exist or is invalid + logger.debug("Failed to load local timestamp %s", e) + + # Load from remote (whether local load succeeded or not) + data = self._download_metadata( + "timestamp", DEFAULT_TIMESTAMP_MAX_LENGTH ) + self._bundle.update_timestamp(data) + self._persist_metadata("timestamp", data) def _load_snapshot(self) -> None: - """ - TODO - """ + """Load local (and if needed remote) snapshot metadata""" try: - length = self._metadata["timestamp"].snapshot["length"] - except KeyError: - length = settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH - - # Uncomment when implementing consistent_snapshot - # if self._consistent_snapshot: - # version = self._metadata["timestamp"].snapshot["version"] - # else: - # version = None - - # TODO: Check if exists locally - snapshot_url = parse.urljoin(self._metadata_base_url, "snapshot.json") - data = download.download_bytes( - snapshot_url, - length, - self._fetcher, - strict_required_length=False, - ) - - self._metadata["snapshot"] = self._verify_snapshot(data) - self._metadata["snapshot"].persist( - self._get_full_meta_name("snapshot.json") - ) - - def _load_targets(self, targets_role: str, parent_role: str) -> None: - """ - TODO - """ + data = self._load_local_metadata("snapshot") + self._bundle.update_snapshot(data) + logger.debug("Local snapshot is valid: not downloading new one") + except (OSError, exceptions.RepositoryError) as e: + # Local snapshot does not exist or is invalid: update from remote + logger.debug("Failed to load local snapshot %s", e) + + metainfo = self._bundle.timestamp.signed.meta["snapshot.json"] + length = metainfo.get("length") or DEFAULT_SNAPSHOT_MAX_LENGTH + version = None + if self._bundle.root.signed.consistent_snapshot: + version = metainfo["version"] + + data = self._download_metadata("snapshot", length, version) + self._bundle.update_snapshot(data) + self._persist_metadata("snapshot", data) + + def _load_targets(self, role: str, parent_role: str) -> None: + """Load local (and if needed remote) metadata for 'role'.""" try: - length = self._metadata["snapshot"].role(targets_role)["length"] - except KeyError: - length = settings.DEFAULT_TARGETS_REQUIRED_LENGTH - - # Uncomment when implementing consistent_snapshot - # if self._consistent_snapshot: - # version = self._metadata["snapshot"].role(targets_role)["version"] - # else: - # version = None - - # TODO: Check if exists locally - - targets_url = parse.urljoin( - self._metadata_base_url, f"{targets_role}.json" - ) - data = download.download_bytes( - targets_url, - length, - self._fetcher, - strict_required_length=False, - ) - - self._metadata[targets_role] = self._verify_targets( - data, targets_role, parent_role - ) - self._metadata[targets_role].persist( - self._get_full_meta_name(targets_role, extension=".json") - ) - - def _verify_root(self, file_content: bytes) -> RootWrapper: - """ - TODO - """ - - intermediate_root = RootWrapper.from_json_object(file_content) - - # Check for an arbitrary software attack - trusted_root = self._metadata["root"] - intermediate_root.verify( - trusted_root.keys("root"), trusted_root.threshold("root") - ) - intermediate_root.verify( - intermediate_root.keys("root"), intermediate_root.threshold("root") - ) - - # Check for a rollback attack. - if intermediate_root.version < trusted_root.version: - raise exceptions.ReplayedMetadataError( - "root", intermediate_root.version(), trusted_root.version() - ) - # Note that the expiration of the new (intermediate) root metadata - # file does not matter yet, because we will check for it in step 1.8. - - return intermediate_root - - def _verify_timestamp(self, file_content: bytes) -> TimestampWrapper: - """ - TODO - """ - intermediate_timestamp = TimestampWrapper.from_json_object(file_content) - - # Check for an arbitrary software attack - trusted_root = self._metadata["root"] - intermediate_timestamp.verify( - trusted_root.keys("timestamp"), trusted_root.threshold("timestamp") - ) - - # Check for a rollback attack. - if self._metadata.get("timestamp"): - if ( - intermediate_timestamp.signed.version - <= self._metadata["timestamp"].version - ): - raise exceptions.ReplayedMetadataError( - "root", - intermediate_timestamp.version(), - self._metadata["timestamp"].version(), - ) - - if self._metadata.get("snapshot"): - if ( - intermediate_timestamp.snapshot.version - <= self._metadata["timestamp"].snapshot["version"] - ): - raise exceptions.ReplayedMetadataError( - "root", - intermediate_timestamp.snapshot.version(), - self._metadata["snapshot"].version(), - ) - - intermediate_timestamp.expires() - - return intermediate_timestamp - - def _verify_snapshot(self, file_content: bytes) -> SnapshotWrapper: - """ - TODO - """ - - # Check against timestamp metadata - if self._metadata["timestamp"].snapshot.get("hash"): - _check_hashes( - file_content, self._metadata["timestamp"].snapshot.get("hash") - ) - - intermediate_snapshot = SnapshotWrapper.from_json_object(file_content) - - if ( - intermediate_snapshot.version - != self._metadata["timestamp"].snapshot["version"] - ): - raise exceptions.BadVersionNumberError - - # Check for an arbitrary software attack - trusted_root = self._metadata["root"] - intermediate_snapshot.verify( - trusted_root.keys("snapshot"), trusted_root.threshold("snapshot") - ) - - # Check for a rollback attack - if self._metadata.get("snapshot"): - for target_role in intermediate_snapshot.signed.meta: - if ( - target_role["version"] - != self._metadata["snapshot"].meta[target_role]["version"] - ): - raise exceptions.BadVersionNumberError - - intermediate_snapshot.expires() - - return intermediate_snapshot - - def _verify_targets( - self, file_content: bytes, filename: str, parent_role: str - ) -> TargetsWrapper: - """ - TODO - """ - - # Check against timestamp metadata - if self._metadata["snapshot"].role(filename).get("hash"): - _check_hashes( - file_content, self._metadata["snapshot"].targets.get("hash") - ) - - intermediate_targets = TargetsWrapper.from_json_object(file_content) - if ( - intermediate_targets.version - != self._metadata["snapshot"].role(filename)["version"] - ): - raise exceptions.BadVersionNumberError - - # Check for an arbitrary software attack - parent_role = self._metadata[parent_role] - - intermediate_targets.verify( - parent_role.keys(filename), parent_role.threshold(filename) - ) - - intermediate_targets.expires() - - return intermediate_targets + data = self._load_local_metadata(role) + self._bundle.update_delegated_targets(data, role, parent_role) + logger.debug("Local %s is valid: not downloading new one", role) + except (OSError, exceptions.RepositoryError) as e: + # Local 'role' does not exist or is invalid: update from remote + logger.debug("Failed to load local %s: %s", role, e) + + metainfo = self._bundle.snapshot.signed.meta[f"{role}.json"] + length = metainfo.get("length") or DEFAULT_TARGETS_MAX_LENGTH + version = None + if self._bundle.root.signed.consistent_snapshot: + version = metainfo["version"] + + data = self._download_metadata(role, length, version) + self._bundle.update_delegated_targets(data, role, parent_role) + self._persist_metadata(role, data) def _preorder_depth_first_walk(self, target_filepath) -> Dict: """ @@ -516,15 +331,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: target = None role_names = [("targets", "root")] visited_role_names = set() - number_of_delegations = settings.MAX_NUMBER_OF_DELEGATIONS - - # Ensure the client has the most up-to-date version of 'targets.json'. - # Raise 'exceptions.NoWorkingMirrorError' if the changed metadata - # cannot be successfully downloaded and - # 'exceptions.RepositoryError' if the referenced metadata is - # missing. Target methods such as this one are called after the - # top-level metadata have been refreshed (i.e., updater.refresh()). - # self._update_metadata_if_changed('targets') + number_of_delegations = MAX_DELEGATIONS # Preorder depth-first traversal of the graph of target delegations. while ( @@ -542,14 +349,8 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: # The metadata for 'role_name' must be downloaded/updated before # its targets, delegations, and child roles can be inspected. - # self._metadata['current'][role_name] is currently missing. - # _refresh_targets_metadata() does not refresh 'targets.json', it - # expects _update_metadata_if_changed() to have already refreshed - # it, which this function has checked above. - # self._refresh_targets_metadata(role_name, - # refresh_all_delegated_roles=False) - - role_metadata = self._metadata[role_name] + + role_metadata = self._bundle[role_name].signed target = role_metadata.targets.get(target_filepath) # After preorder check, add current role to set of visited roles. @@ -610,10 +411,8 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict: and len(role_names) > 0 ): msg = ( - f"{len(role_names)} roles left to visit, ", - "but allowed to visit at most ", - f"{settings.MAX_NUMBER_OF_DELEGATIONS}", - " delegations.", + f"{len(role_names)} roles left to visit, but allowed to ", + f"visit at most {MAX_DELEGATIONS} delegations.", ) logger.debug(msg) @@ -749,27 +548,6 @@ def _check_hashes_obj(file_object, trusted_hashes): ) -def _check_hashes(file_content, trusted_hashes): - """ - TODO - """ - # Verify each trusted hash of 'trusted_hashes'. If all are valid, simply - # return. - for algorithm, trusted_hash in trusted_hashes.items(): - digest_object = sslib_hash.digest(algorithm) - - digest_object.update(file_content) - computed_hash = digest_object.hexdigest() - - # Raise an exception if any of the hashes are incorrect. - if trusted_hash != computed_hash: - raise exceptions.BadHashError(trusted_hash, computed_hash) - - logger.info( - "The file's " + algorithm + " hash is" " correct: " + trusted_hash - ) - - def _get_filepath_hash(target_filepath, hash_function="sha256"): """ TODO