Skip to content

Create constants for top-level rolenames #1672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def targets(self) -> Targets:

def all_targets(self) -> Iterator[Tuple[str, Targets]]:
"""Yield role name and signed portion of targets one by one."""
yield "targets", self.md_targets.signed
yield Targets.type, self.md_targets.signed
for role, md in self.md_delegates.items():
yield role, md.signed

Expand Down Expand Up @@ -181,7 +181,7 @@ def _initialize(self) -> None:
def publish_root(self) -> None:
"""Sign and store a new serialized version of root."""
self.md_root.signatures.clear()
for signer in self.signers["root"].values():
for signer in self.signers[Root.type].values():
self.md_root.sign(signer, append=True)

self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
Expand All @@ -197,8 +197,8 @@ def fetch(self, url: str) -> Iterator[bytes]:
ver_and_name = path[len("/metadata/") :][: -len(".json")]
version_str, _, role = ver_and_name.partition(".")
# root is always version-prefixed while timestamp is always NOT
if role == "root" or (
self.root.consistent_snapshot and ver_and_name != "timestamp"
if role == Root.type or (
self.root.consistent_snapshot and ver_and_name != Timestamp.type
):
version: Optional[int] = int(version_str)
else:
Expand Down Expand Up @@ -248,7 +248,7 @@ def _fetch_metadata(

If version is None, non-versioned metadata is being requested.
"""
if role == "root":
if role == Root.type:
# return a version previously serialized in publish_root()
if version is None or version > len(self.signed_roots):
raise FetcherHTTPError(f"Unknown root version {version}", 404)
Expand All @@ -257,11 +257,11 @@ def _fetch_metadata(

# sign and serialize the requested metadata
md: Optional[Metadata]
if role == "timestamp":
if role == Timestamp.type:
md = self.md_timestamp
elif role == "snapshot":
elif role == Snapshot.type:
md = self.md_snapshot
elif role == "targets":
elif role == Targets.type:
md = self.md_targets
else:
md = self.md_delegates.get(role)
Expand Down Expand Up @@ -297,7 +297,7 @@ def update_timestamp(self) -> None:
self.timestamp.snapshot_meta.version = self.snapshot.version

if self.compute_metafile_hashes_length:
hashes, length = self._compute_hashes_and_length("snapshot")
hashes, length = self._compute_hashes_and_length(Snapshot.type)
self.timestamp.snapshot_meta.hashes = hashes
self.timestamp.snapshot_meta.length = length

Expand All @@ -320,7 +320,7 @@ def update_snapshot(self) -> None:

def add_target(self, role: str, data: bytes, path: str) -> None:
"""Create a target from data and add it to the target_files."""
if role == "targets":
if role == Targets.type:
targets = self.targets
else:
targets = self.md_delegates[role].signed
Expand All @@ -339,7 +339,7 @@ def add_delegation(
hash_prefixes: Optional[List[str]],
) -> None:
"""Add delegated target role to the repository."""
if delegator_name == "targets":
if delegator_name == Targets.type:
delegator = self.targets
else:
delegator = self.md_delegates[delegator_name].signed
Expand Down Expand Up @@ -375,9 +375,9 @@ def write(self) -> None:

for ver in range(1, len(self.signed_roots) + 1):
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
f.write(self._fetch_metadata("root", ver))
f.write(self._fetch_metadata(Root.type, ver))

for role in ["timestamp", "snapshot", "targets"]:
for role in [Timestamp.type, Snapshot.type, Targets.type]:
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
f.write(self._fetch_metadata(role))

Expand Down
88 changes: 44 additions & 44 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def setUpClass(cls) -> None:

# Load keys into memory
cls.keystore = {}
for role in ["delegation", "snapshot", "targets", "timestamp"]:
for role in ["delegation", Snapshot.type, Targets.type, Timestamp.type]:
cls.keystore[role] = import_ed25519_privatekey_from_file(
os.path.join(cls.keystore_dir, role + "_key"),
password="password",
Expand All @@ -92,10 +92,10 @@ def tearDownClass(cls) -> None:

def test_generic_read(self) -> None:
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
("timestamp", Timestamp),
("targets", Targets),
(Root.type, Root),
(Snapshot.type, Snapshot),
(Timestamp.type, Timestamp),
(Targets.type, Targets),
]:

# Load JSON-formatted metdata of each supported type from file
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_compact_json(self) -> None:
)

def test_read_write_read_compare(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)

Expand All @@ -148,7 +148,7 @@ def test_read_write_read_compare(self) -> None:
os.remove(path_2)

def test_to_from_bytes(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
for metadata in [Root.type, Snapshot.type, Timestamp.type, Targets.type]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
metadata_bytes = f.read()
Expand All @@ -169,11 +169,11 @@ def test_sign_verify(self) -> None:
root = Metadata[Root].from_file(root_path).signed

# Locate the public keys we need from root
targets_keyid = next(iter(root.roles["targets"].keyids))
targets_keyid = next(iter(root.roles[Targets.type].keyids))
targets_key = root.keys[targets_keyid]
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
snapshot_keyid = next(iter(root.roles[Snapshot.type].keyids))
snapshot_key = root.keys[snapshot_keyid]
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (targets) and assert ...
Expand All @@ -192,7 +192,7 @@ def test_sign_verify(self) -> None:
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type]

sslib_signer = SSlibSigner(self.keystore["snapshot"])
sslib_signer = SSlibSigner(self.keystore[Snapshot.type])
# Append a new signature with the unrelated key and assert that ...
sig = md_obj.sign(sslib_signer, append=True)
# ... there are now two signatures, and
Expand All @@ -203,7 +203,7 @@ def test_sign_verify(self) -> None:
# ... the returned (appended) signature is for snapshot key
self.assertEqual(sig.keyid, snapshot_keyid)

sslib_signer = SSlibSigner(self.keystore["timestamp"])
sslib_signer = SSlibSigner(self.keystore[Timestamp.type])
# Create and assign (don't append) a new signature and assert that ...
md_obj.sign(sslib_signer, append=False)
# ... there now is only one signature,
Expand All @@ -218,7 +218,7 @@ def test_verify_failures(self) -> None:
root = Metadata[Root].from_file(root_path).signed

# Locate the timestamp public key we need from root
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
timestamp_keyid = next(iter(root.roles[Timestamp.type].keyids))
timestamp_key = root.keys[timestamp_keyid]

# Load sample metadata (timestamp)
Expand Down Expand Up @@ -369,20 +369,20 @@ def test_metadata_verify_delegate(self) -> None:
role2 = Metadata[Targets].from_file(role2_path)

# test the expected delegation tree
root.verify_delegate("root", root)
root.verify_delegate("snapshot", snapshot)
root.verify_delegate("targets", targets)
root.verify_delegate(Root.type, root)
root.verify_delegate(Snapshot.type, snapshot)
root.verify_delegate(Targets.type, targets)
targets.verify_delegate("role1", role1)
role1.verify_delegate("role2", role2)

# only root and targets can verify delegates
with self.assertRaises(TypeError):
snapshot.verify_delegate("snapshot", snapshot)
snapshot.verify_delegate(Snapshot.type, snapshot)
# verify fails for roles that are not delegated by delegator
with self.assertRaises(ValueError):
root.verify_delegate("role1", role1)
with self.assertRaises(ValueError):
targets.verify_delegate("targets", targets)
targets.verify_delegate(Targets.type, targets)
# verify fails when delegator has no delegations
with self.assertRaises(ValueError):
role2.verify_delegate("role1", role1)
Expand All @@ -391,31 +391,31 @@ def test_metadata_verify_delegate(self) -> None:
expires = snapshot.signed.expires
snapshot.signed.bump_expiration()
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)
snapshot.signed.expires = expires

# verify fails if roles keys do not sign the metadata
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("timestamp", snapshot)
root.verify_delegate(Timestamp.type, snapshot)

# Add a key to snapshot role, make sure the new sig fails to verify
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
ts_keyid = next(iter(root.signed.roles[Timestamp.type].keyids))
root.signed.add_key(Snapshot.type, root.signed.keys[ts_keyid])
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)

# verify succeeds if threshold is reached even if some signatures
# fail to verify
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)

# verify fails if threshold of signatures is not reached
root.signed.roles["snapshot"].threshold = 2
root.signed.roles[Snapshot.type].threshold = 2
with self.assertRaises(exceptions.UnsignedMetadataError):
root.verify_delegate("snapshot", snapshot)
root.verify_delegate(Snapshot.type, snapshot)

# verify succeeds when we correct the new signature and reach the
# threshold of 2 keys
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)
snapshot.sign(SSlibSigner(self.keystore[Timestamp.type]), append=True)
root.verify_delegate(Snapshot.type, snapshot)

def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
Expand All @@ -441,44 +441,44 @@ def test_root_add_key_and_remove_key(self) -> None:
)

# Assert that root does not contain the new key
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
self.assertNotIn(keyid, root.signed.keys)

# Add new root key
root.signed.add_key("root", key_metadata)
root.signed.add_key(Root.type, key_metadata)

# Assert that key is added
self.assertIn(keyid, root.signed.roles["root"].keyids)
self.assertIn(keyid, root.signed.roles[Root.type].keyids)
self.assertIn(keyid, root.signed.keys)

# Confirm that the newly added key does not break
# the object serialization
root.to_dict()

# Try adding the same key again and assert its ignored.
pre_add_keyid = root.signed.roles["root"].keyids.copy()
root.signed.add_key("root", key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
pre_add_keyid = root.signed.roles[Root.type].keyids.copy()
root.signed.add_key(Root.type, key_metadata)
self.assertEqual(pre_add_keyid, root.signed.roles[Root.type].keyids)

# Add the same key to targets role as well
root.signed.add_key("targets", key_metadata)
root.signed.add_key(Targets.type, key_metadata)

# Add the same key to a nonexistent role.
with self.assertRaises(ValueError):
root.signed.add_key("nosuchrole", key_metadata)

# Remove the key from root role (targets role still uses it)
root.signed.remove_key("root", keyid)
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
root.signed.remove_key(Root.type, keyid)
self.assertNotIn(keyid, root.signed.roles[Root.type].keyids)
self.assertIn(keyid, root.signed.keys)

# Remove the key from targets as well
root.signed.remove_key("targets", keyid)
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
root.signed.remove_key(Targets.type, keyid)
self.assertNotIn(keyid, root.signed.roles[Targets.type].keyids)
self.assertNotIn(keyid, root.signed.keys)

with self.assertRaises(ValueError):
root.signed.remove_key("root", "nosuchkey")
root.signed.remove_key(Root.type, "nosuchkey")
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)

Expand Down Expand Up @@ -670,7 +670,7 @@ def test_length_and_hash_validation(self) -> None:
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)
file1_targetfile = targets.signed.targets["file1.txt"]
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
filepath = os.path.join(self.repo_dir, Targets.type, "file1.txt")

with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)
Expand All @@ -688,7 +688,7 @@ def test_length_and_hash_validation(self) -> None:

def test_targetfile_from_file(self) -> None:
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ["sha256"]
)
Expand All @@ -697,20 +697,20 @@ def test_targetfile_from_file(self) -> None:
targetfile_from_file.verify_length_and_hashes(file)

# Test with a non-existing file
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)

# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
TargetFile.from_file(file_path, file_path, ["123"])

def test_targetfile_from_data(self) -> None:
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
target_file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")

# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(
Expand Down
Loading