diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000000..b2329bf780 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python + +# Copyright 2020, New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 +""" Unit tests for api/metadata.py + +""" + +import json +import sys +import logging +import os +import shutil +import tempfile +import unittest + +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta + +# TODO: Remove case handling when fully dropping support for versions >= 3.6 +IS_PY_VERSION_SUPPORTED = sys.version_info >= (3, 6) + +# Use setUpModule to tell unittest runner to skip this test module gracefully. +def setUpModule(): + if not IS_PY_VERSION_SUPPORTED: + raise unittest.SkipTest('requires Python 3.6 or higher') + +# Since setUpModule is called after imports we need to import conditionally. +if IS_PY_VERSION_SUPPORTED: + import tuf.exceptions + from tuf.api.metadata import ( + Metadata, + Snapshot, + Timestamp, + Targets + ) + + from securesystemslib.interface import ( + import_ed25519_publickey_from_file, + import_ed25519_privatekey_from_file + ) + +logger = logging.getLogger(__name__) + + +class TestMetadata(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Create a temporary directory to store the repository, metadata, and + # target files. 'temporary_directory' must be deleted in + # TearDownClass() so that temporary files are always removed, even when + # exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + test_repo_data = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'repository_data') + + cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + shutil.copytree( + os.path.join(test_repo_data, 'repository'), cls.repo_dir) + + cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore') + shutil.copytree( + os.path.join(test_repo_data, 'keystore'), cls.keystore_dir) + + # Load keys into memory + cls.keystore = {} + for role in ['delegation', 'snapshot', 'targets', 'timestamp']: + cls.keystore[role] = { + 'private': import_ed25519_privatekey_from_file( + os.path.join(cls.keystore_dir, role + '_key'), + password="password"), + 'public': import_ed25519_publickey_from_file( + os.path.join(cls.keystore_dir, role + '_key.pub')) + } + + + @classmethod + def tearDownClass(cls): + # Remove the temporary repository directory, which should contain all + # the metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + def test_generic_read(self): + for metadata, inner_metadata_cls in [ + ('snapshot', Snapshot), + ('timestamp', Timestamp), + ('targets', Targets)]: + + # Load JSON-formatted metdata of each supported type from file + # and from out-of-band read JSON string + path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + metadata_obj = Metadata.from_json_file(path) + with open(path, 'rb') as f: + metadata_str = f.read() + metadata_obj2 = Metadata.from_json(metadata_str) + + # Assert that both methods instantiate the right inner class for + # each metadata type and ... + self.assertTrue( + isinstance(metadata_obj.signed, inner_metadata_cls)) + self.assertTrue( + isinstance(metadata_obj2.signed, inner_metadata_cls)) + + # ... and return the same object (compared by dict representation) + self.assertDictEqual( + metadata_obj.to_dict(), metadata_obj2.to_dict()) + + + # Assert that it chokes correctly on an unknown metadata type + bad_metadata_path = 'bad-metadata.json' + bad_metadata = {'signed': {'_type': 'bad-metadata'}} + with open(bad_metadata_path, 'wb') as f: + f.write(json.dumps(bad_metadata).encode('utf-8')) + + with self.assertRaises(ValueError): + Metadata.from_json_file(bad_metadata_path) + + os.remove(bad_metadata_path) + + + def test_compact_json(self): + path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + metadata_obj = Metadata.from_json_file(path) + self.assertTrue( + len(metadata_obj.to_json(compact=True)) < + len(metadata_obj.to_json())) + + + def test_read_write_read_compare(self): + for metadata in ['snapshot', 'timestamp', 'targets']: + path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + metadata_obj = Metadata.from_json_file(path) + + path_2 = path + '.tmp' + metadata_obj.to_json_file(path_2) + metadata_obj_2 = Metadata.from_json_file(path_2) + + self.assertDictEqual( + metadata_obj.to_dict(), + metadata_obj_2.to_dict()) + + os.remove(path_2) + + + def test_sign_verify(self): + # Load sample metadata (targets) and assert ... + path = os.path.join(self.repo_dir, 'metadata', 'targets.json') + metadata_obj = Metadata.from_json_file(path) + + # ... it has a single existing signature, + self.assertTrue(len(metadata_obj.signatures) == 1) + # ... which is valid for the correct key. + self.assertTrue(metadata_obj.verify( + self.keystore['targets']['public'])) + + # Append a new signature with the unrelated key and assert that ... + metadata_obj.sign(self.keystore['snapshot']['private'], append=True) + # ... there are now two signatures, and + self.assertTrue(len(metadata_obj.signatures) == 2) + # ... both are valid for the corresponding keys. + self.assertTrue(metadata_obj.verify( + self.keystore['targets']['public'])) + self.assertTrue(metadata_obj.verify( + self.keystore['snapshot']['public'])) + + # Create and assign (don't append) a new signature and assert that ... + metadata_obj.sign(self.keystore['timestamp']['private'], append=False) + # ... there now is only one signature, + self.assertTrue(len(metadata_obj.signatures) == 1) + # ... valid for that key. + self.assertTrue(metadata_obj.verify( + self.keystore['timestamp']['public'])) + + # Assert exception if there are more than one signatures for a key + metadata_obj.sign(self.keystore['timestamp']['private'], append=True) + with self.assertRaises(tuf.exceptions.Error) as ctx: + metadata_obj.verify(self.keystore['timestamp']['public']) + self.assertTrue( + '2 signatures for key' in str(ctx.exception), + str(ctx.exception)) + + # Assert exception if there is no signature for a key + with self.assertRaises(tuf.exceptions.Error) as ctx: + metadata_obj.verify(self.keystore['targets']['public']) + self.assertTrue( + 'no signature for' in str(ctx.exception), + str(ctx.exception)) + + + def test_metadata_base(self): + # Use of Snapshot is arbitrary, we're just testing the base class features + # with real data + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + md = Metadata.from_json_file(snapshot_path) + + self.assertEqual(md.signed.version, 1) + md.signed.bump_version() + self.assertEqual(md.signed.version, 2) + self.assertEqual(md.signed.expires, datetime(2030, 1, 1, 0, 0)) + md.signed.bump_expiration() + self.assertEqual(md.signed.expires, datetime(2030, 1, 2, 0, 0)) + md.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(md.signed.expires, datetime(2031, 1, 2, 0, 0)) + + + def test_metadata_snapshot(self): + snapshot_path = os.path.join( + self.repo_dir, 'metadata', 'snapshot.json') + snapshot = Metadata.from_json_file(snapshot_path) + + # Create a dict representing what we expect the updated data to be + fileinfo = snapshot.signed.meta + hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'} + fileinfo['role1.json']['version'] = 2 + fileinfo['role1.json']['hashes'] = hashes + fileinfo['role1.json']['length'] = 123 + + snapshot.signed.update('role1', 2, 123, hashes) + self.assertEqual(snapshot.signed.meta, fileinfo) + + + def test_metadata_timestamp(self): + timestamp_path = os.path.join( + self.repo_dir, 'metadata', 'timestamp.json') + timestamp = Metadata.from_json_file(timestamp_path) + + self.assertEqual(timestamp.signed.version, 1) + timestamp.signed.bump_version() + self.assertEqual(timestamp.signed.version, 2) + + self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 1, 0, 0)) + timestamp.signed.bump_expiration() + self.assertEqual(timestamp.signed.expires, datetime(2030, 1, 2, 0, 0)) + timestamp.signed.bump_expiration(timedelta(days=365)) + self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 2, 0, 0)) + + # Test whether dateutil.relativedelta works, this provides a much + # easier to use interface for callers + delta = relativedelta(days=1) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, datetime(2031, 1, 3, 0, 0)) + delta = relativedelta(years=5) + timestamp.signed.bump_expiration(delta) + self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0)) + + hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'} + fileinfo = timestamp.signed.meta['snapshot.json'] + fileinfo['hashes'] = hashes + fileinfo['version'] = 2 + fileinfo['length'] = 520 + timestamp.signed.update(2, 520, hashes) + self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo) + + +# Run unit test. +if __name__ == '__main__': + unittest.main() diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py new file mode 100644 index 0000000000..ee5ab8eed1 --- /dev/null +++ b/tuf/api/metadata.py @@ -0,0 +1,521 @@ +"""TUF role metadata model. + +This module provides container classes for TUF role metadata, including methods +to read/serialize/write from and to JSON, perform TUF-compliant metadata +updates, and create and verify signatures. + +""" +# Imports +from datetime import datetime, timedelta +from typing import Any, Dict, Optional + +import json +import logging +import tempfile + +from securesystemslib.formats import encode_canonical +from securesystemslib.util import ( + load_json_file, + load_json_string, + persist_temp_file +) +from securesystemslib.storage import StorageBackendInterface +from securesystemslib.keys import create_signature, verify_signature + +import tuf.formats +import tuf.exceptions + + +# Types +JsonDict = Dict[str, Any] + + +# Classes. +class Metadata(): + """A container for signed TUF metadata. + + Provides methods to (de-)serialize JSON metadata from and to file + storage, and to create and verify signatures. + + Attributes: + signed: A subclass of Signed, which has the actual metadata payload, + i.e. one of Targets, Snapshot, Timestamp or Root. + + signatures: A list of signatures over the canonical JSON representation + of the value of the signed attribute:: + + [ + { + 'keyid': '', + 'sig':' '' + }, + ... + ] + + """ + def __init__(self, signed: 'Signed', signatures: list) -> None: + self.signed = signed + self.signatures = signatures + + + # Deserialization (factories). + @classmethod + def from_dict(cls, metadata: JsonDict) -> 'Metadata': + """Creates Metadata object from its JSON/dict representation. + + Calls 'from_dict' for any complex metadata attribute represented by a + class also that has a 'from_dict' factory method. (Currently this is + only the signed attribute.) + + Arguments: + metadata: TUF metadata in JSON/dict representation, as e.g. + returned by 'json.loads'. + + Raises: + KeyError: The metadata dict format is invalid. + ValueError: The metadata has an unrecognized signed._type field. + + Returns: + A TUF Metadata object. + + """ + # Dispatch to contained metadata class on metadata _type field. + _type = metadata['signed']['_type'] + + if _type == 'targets': + inner_cls = Targets + elif _type == 'snapshot': + inner_cls = Snapshot + elif _type == 'timestamp': + inner_cls = Timestamp + elif _type == 'root': + # TODO: implement Root class + raise NotImplementedError('Root not yet implemented') + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + # NOTE: If Signature becomes a class, we should iterate over + # metadata['signatures'], call Signature.from_dict for each item, and + # pass a list of Signature objects to the Metadata constructor intead. + return cls( + signed=inner_cls.from_dict(metadata['signed']), + signatures=metadata['signatures']) + + + @classmethod + def from_json(cls, metadata_json: str) -> 'Metadata': + """Loads JSON-formatted TUF metadata from a string. + + Arguments: + metadata_json: TUF metadata in JSON-string representation. + + Raises: + securesystemslib.exceptions.Error, ValueError, KeyError: The + metadata cannot be parsed. + + Returns: + A TUF Metadata object. + + """ + return cls.from_dict(load_json_string(metadata_json)) + + + @classmethod + def from_json_file( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads JSON-formatted TUF metadata from file storage. + + Arguments: + filename: The path to read the file from. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + + Raises: + securesystemslib.exceptions.StorageError: The file cannot be read. + securesystemslib.exceptions.Error, ValueError, KeyError: The + metadata cannot be parsed. + + Returns: + A TUF Metadata object. + + """ + return cls.from_dict(load_json_file(filename, storage_backend)) + + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + return { + 'signatures': self.signatures, + 'signed': self.signed.to_dict() + } + + + def to_json(self, compact: bool = False) -> None: + """Returns the optionally compacted JSON representation of self. """ + return json.dumps( + self.to_dict(), + indent=(None if compact else 1), + separators=((',', ':') if compact else (',', ': ')), + sort_keys=True) + + + def to_json_file( + self, filename: str, compact: bool = False, + storage_backend: StorageBackendInterface = None) -> None: + """Writes the JSON representation of self to file storage. + + Arguments: + filename: The path to write the file to. + compact: A boolean indicating if the JSON string should be compact + by excluding whitespace. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + Raises: + securesystemslib.exceptions.StorageError: + The file cannot be written. + + """ + with tempfile.TemporaryFile() as f: + f.write(self.to_json(compact).encode('utf-8')) + persist_temp_file(f, filename, storage_backend) + + + # Signatures. + def sign(self, key: JsonDict, append: bool = False) -> JsonDict: + """Creates signature over 'signed' and assigns it to 'signatures'. + + Arguments: + key: A securesystemslib-style private key object used for signing. + append: A boolean indicating if the signature should be appended to + the list of signatures or replace any existing signatures. The + default behavior is to replace signatures. + + Raises: + securesystemslib.exceptions.FormatError: Key argument is malformed. + securesystemslib.exceptions.CryptoError, \ + securesystemslib.exceptions.UnsupportedAlgorithmError: + Signing errors. + + Returns: + A securesystemslib-style signature object. + + """ + signature = create_signature(key, self.signed.to_canonical_bytes()) + + if append: + self.signatures.append(signature) + else: + self.signatures = [signature] + + return signature + + + def verify(self, key: JsonDict) -> bool: + """Verifies 'signatures' over 'signed' that match the passed key by id. + + Arguments: + key: A securesystemslib-style public key object. + + Raises: + # TODO: Revise exception taxonomy + tuf.exceptions.Error: None or multiple signatures found for key. + securesystemslib.exceptions.FormatError: Key argument is malformed. + securesystemslib.exceptions.CryptoError, \ + securesystemslib.exceptions.UnsupportedAlgorithmError: + Signing errors. + + Returns: + A boolean indicating if the signature is valid for the passed key. + + """ + signatures_for_keyid = list(filter( + lambda sig: sig['keyid'] == key['keyid'], self.signatures)) + + if not signatures_for_keyid: + raise tuf.exceptions.Error( + f'no signature for key {key["keyid"]}.') + + elif len(signatures_for_keyid) > 1: + raise tuf.exceptions.Error( + f'{len(signatures_for_keyid)} signatures for key ' + f'{key["keyid"]}, not sure which one to verify.') + else: + return verify_signature( + key, signatures_for_keyid[0], + self.signed.to_canonical_bytes()) + + + +class Signed: + """A base class for the signed part of TUF metadata. + + Objects with base class Signed are usually included in a Metadata object + on the signed attribute. This class provides attributes and methods that + are common for all TUF metadata types (roles). + + Attributes: + _type: The metadata type string. + version: The metadata version number. + spec_version: The TUF specification version number (semver) the + metadata format adheres to. + expires: The metadata expiration datetime object. + + + """ + # NOTE: Signed is a stupid name, because this might not be signed yet, but + # we keep it to match spec terminology (I often refer to this as "payload", + # or "inner metadata") + + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime) -> None: + + self._type = _type + self.version = version + self.spec_version = spec_version + self.expires = expires + + # TODO: Should we separate data validation from constructor? + if version < 0: + raise ValueError(f'version must be < 0, got {version}') + self.version = version + + + # Deserialization (factories). + @classmethod + def from_dict(cls, signed_dict: JsonDict) -> 'Signed': + """Creates Signed object from its JSON/dict representation. """ + + # Convert 'expires' TUF metadata string to a datetime object, which is + # what the constructor expects and what we store. The inverse operation + # is implemented in 'to_dict'. + signed_dict['expires'] = datetime.strptime( + signed_dict['expires'], + "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=None) + # NOTE: We write the converted 'expires' back into 'signed_dict' above + # so that we can pass it to the constructor as '**signed_dict' below, + # along with other fields that belong to Signed subclasses. + # Any 'from_dict'(-like) conversions of fields that correspond to a + # subclass should be performed in the 'from_dict' method of that + # subclass and also be written back into 'signed_dict' before calling + # super().from_dict. + + # NOTE: cls might be a subclass of Signed, if 'from_dict' was called on + # that subclass (see e.g. Metadata.from_dict). + return cls(**signed_dict) + + + # Serialization. + def to_canonical_bytes(self) -> bytes: + """Returns the UTF-8 encoded canonical JSON representation of self. """ + return encode_canonical(self.to_dict()).encode('UTF-8') + + + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + return { + '_type': self._type, + 'version': self.version, + 'spec_version': self.spec_version, + 'expires': self.expires.isoformat() + 'Z' + } + + + # Modification. + def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None: + """Increments the expires attribute by the passed timedelta. """ + self.expires += delta + + + def bump_version(self) -> None: + """Increments the metadata version number by 1.""" + self.version += 1 + + +class Timestamp(Signed): + """A container for the signed part of timestamp metadata. + + Attributes: + meta: A dictionary that contains information about snapshot metadata:: + + { + 'snapshot.json': { + 'version': , + 'length': , // optional + 'hashes': { + '': '', + '': '', + ... + } + } + } + + """ + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, meta: JsonDict) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta + self.meta = meta + + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + + # Modification. + def update(self, version: int, length: int, hashes: JsonDict) -> None: + """Assigns passed info about snapshot metadata to meta dict. """ + self.meta['snapshot.json'] = { + 'version': version, + 'length': length, + 'hashes': hashes + } + + +class Snapshot(Signed): + """A container for the signed part of snapshot metadata. + + Attributes: + meta: A dictionary that contains information about targets metadata:: + + { + 'targets.json': { + 'version': , + 'length': , // optional + 'hashes': { + '': '', + '': '', + ... + } // optional + }, + '.json': { + ... + }, + '.json': { + ... + }, + ... + } + + """ + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, meta: JsonDict) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta + self.meta = meta + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'meta': self.meta + }) + return json_dict + + + # Modification. + def update( + self, rolename: str, version: int, length: Optional[int] = None, + hashes: Optional[JsonDict] = None) -> None: + """Assigns passed (delegated) targets role info to meta dict. """ + metadata_fn = f'{rolename}.json' + + self.meta[metadata_fn] = {'version': version} + if length is not None: + self.meta[metadata_fn]['length'] = length + + if hashes is not None: + self.meta[metadata_fn]['hashes'] = hashes + + +class Targets(Signed): + """A container for the signed part of targets metadata. + + Attributes: + targets: A dictionary that contains information about target files:: + + { + '': { + 'length': , + 'hashes': { + '': '', + '': '', + ... + }, + 'custom': // optional + }, + ... + } + + delegations: A dictionary that contains a list of delegated target + roles and public key store used to verify their metadata + signatures:: + + { + 'keys' : { + '': { + 'keytype': '', + 'scheme': '', + 'keyid_hash_algorithms': [ + '', + '' + ... + ], + 'keyval': { + 'public': '' + } + }, + ... + }, + 'roles': [ + { + 'name': '', + 'keyids': ['', ...], + 'threshold': , + 'terminating': , + 'path_hash_prefixes': ['', ... ], // or + 'paths' : ['PATHPATTERN', ... ], + }, + ... + ] + } + + """ + def __init__( + self, _type: str, version: int, spec_version: str, + expires: datetime, targets: JsonDict, delegations: JsonDict + ) -> None: + super().__init__(_type, version, spec_version, expires) + # TODO: Add class for meta + self.targets = targets + self.delegations = delegations + + + # Serialization. + def to_dict(self) -> JsonDict: + """Returns the JSON-serializable dictionary representation of self. """ + json_dict = super().to_dict() + json_dict.update({ + 'targets': self.targets, + 'delegations': self.delegations, + }) + return json_dict + + # Modification. + def update(self, filename: str, fileinfo: JsonDict) -> None: + """Assigns passed target file info to meta dict. """ + self.targets[filename] = fileinfo