Skip to content

Commit 3a7e025

Browse files
author
Ivana Atanasova
committed
Apply top-level rolenames constants in tests
This applies the use of constants of top-level rolenames in the tests instead of the previously hardcoded strings. Addresses #1648 Signed-off-by: Ivana Atanasova <iyovcheva@iyovcheva-a02.vmware.com>
1 parent df35943 commit 3a7e025

9 files changed

+212
-206
lines changed

tests/repository_simulator.py

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959

6060
from tuf.api.metadata import (
6161
SPECIFICATION_VERSION,
62-
TOP_LEVEL_ROLE_NAMES,
62+
RoleNames,
6363
DelegatedRole,
6464
Delegations,
6565
Key,
@@ -73,6 +73,7 @@
7373
Timestamp,
7474
)
7575
from tuf.api.serialization.json import JSONSerializer
76+
from tuf.api.metadata import RoleNames
7677
from tuf.exceptions import FetcherHTTPError
7778
from tuf.ngclient.fetcher import FetcherInterface
7879

@@ -140,7 +141,7 @@ def targets(self) -> Targets:
140141

141142
def all_targets(self) -> Iterator[Tuple[str, Targets]]:
142143
"""Yield role name and signed portion of targets one by one."""
143-
yield "targets", self.md_targets.signed
144+
yield RoleNames.TARGETS.value, self.md_targets.signed
144145
for role, md in self.md_delegates.items():
145146
yield role, md.signed
146147

@@ -168,21 +169,21 @@ def _initialize(self):
168169
timestamp = Timestamp(1, SPEC_VER, self.safe_expiry, snapshot_meta)
169170
self.md_timestamp = Metadata(timestamp, OrderedDict())
170171

171-
roles = {role_name: Role([], 1) for role_name in TOP_LEVEL_ROLE_NAMES}
172+
roles = {role_name.value: Role([], 1) for role_name in RoleNames}
172173
root = Root(1, SPEC_VER, self.safe_expiry, {}, roles, True)
173174

174-
for role in TOP_LEVEL_ROLE_NAMES:
175+
for role in RoleNames:
175176
key, signer = self.create_key()
176-
root.add_key(role, key)
177-
self.add_signer(role, signer)
177+
root.add_key(role.value, key)
178+
self.add_signer(role.value, signer)
178179

179180
self.md_root = Metadata(root, OrderedDict())
180181
self.publish_root()
181182

182183
def publish_root(self):
183184
"""Sign and store a new serialized version of root."""
184185
self.md_root.signatures.clear()
185-
for signer in self.signers["root"].values():
186+
for signer in self.signers[RoleNames.ROOT.value].values():
186187
self.md_root.sign(signer, append=True)
187188

188189
self.signed_roots.append(self.md_root.to_bytes(JSONSerializer()))
@@ -198,9 +199,9 @@ def fetch(self, url: str) -> Iterator[bytes]:
198199
if path.startswith("/metadata/") and path.endswith(".json"):
199200
ver_and_name = path[len("/metadata/") :][: -len(".json")]
200201
# only consistent_snapshot supported ATM: timestamp is special case
201-
if ver_and_name == "timestamp":
202+
if ver_and_name == RoleNames.TIMESTAMP.value:
202203
version = None
203-
role = "timestamp"
204+
role = RoleNames.TIMESTAMP.value
204205
else:
205206
version, _, role = ver_and_name.partition(".")
206207
version = int(version)
@@ -242,19 +243,19 @@ def _fetch_metadata(
242243
243244
If version is None, non-versioned metadata is being requested.
244245
"""
245-
if role == "root":
246+
if role == RoleNames.ROOT.value:
246247
# return a version previously serialized in publish_root()
247248
if version is None or version > len(self.signed_roots):
248249
raise FetcherHTTPError(f"Unknown root version {version}", 404)
249250
logger.debug("fetched root version %d", version)
250251
return self.signed_roots[version - 1]
251252

252253
# sign and serialize the requested metadata
253-
if role == "timestamp":
254+
if role == RoleNames.TIMESTAMP.value:
254255
md: Metadata = self.md_timestamp
255-
elif role == "snapshot":
256+
elif role == RoleNames.SNAPSHOT.value:
256257
md = self.md_snapshot
257-
elif role == "targets":
258+
elif role == RoleNames.TARGETS.value:
258259
md = self.md_targets
259260
else:
260261
md = self.md_delegates[role]
@@ -290,7 +291,7 @@ def update_timestamp(self):
290291
self.timestamp.snapshot_meta.version = self.snapshot.version
291292

292293
if self.compute_metafile_hashes_length:
293-
hashes, length = self._compute_hashes_and_length("snapshot")
294+
hashes, length = self._compute_hashes_and_length(RoleNames.SNAPSHOT.value)
294295
self.timestamp.snapshot_meta.hashes = hashes
295296
self.timestamp.snapshot_meta.length = length
296297

@@ -313,7 +314,7 @@ def update_snapshot(self):
313314

314315
def add_target(self, role: str, data: bytes, path: str):
315316
"""Create a target from data and add it to the target_files."""
316-
if role == "targets":
317+
if role == RoleNames.TARGETS.value:
317318
targets = self.targets
318319
else:
319320
targets = self.md_delegates[role].signed
@@ -332,7 +333,7 @@ def add_delegation(
332333
hash_prefixes: Optional[List[str]],
333334
):
334335
"""Add delegated target role to the repository."""
335-
if delegator_name == "targets":
336+
if delegator_name == RoleNames.TARGETS.value:
336337
delegator = self.targets
337338
else:
338339
delegator = self.md_delegates[delegator_name].signed
@@ -368,9 +369,9 @@ def write(self):
368369

369370
for ver in range(1, len(self.signed_roots) + 1):
370371
with open(os.path.join(dest_dir, f"{ver}.root.json"), "wb") as f:
371-
f.write(self._fetch_metadata("root", ver))
372+
f.write(self._fetch_metadata(RoleNames.ROOT.value, ver))
372373

373-
for role in ["timestamp", "snapshot", "targets"]:
374+
for role in [RoleNames.TIMESTAMP.value, RoleNames.SNAPSHOT.value, RoleNames.TARGETS.value]:
374375
with open(os.path.join(dest_dir, f"{role}.json"), "wb") as f:
375376
f.write(self._fetch_metadata(role))
376377

tests/test_api.py

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
)
4040
from tuf.api.serialization import DeserializationError
4141
from tuf.api.serialization.json import CanonicalJSONSerializer, JSONSerializer
42+
from tuf.api.metadata import RoleNames
4243

4344
logger = logging.getLogger(__name__)
4445

@@ -70,7 +71,7 @@ def setUpClass(cls):
7071

7172
# Load keys into memory
7273
cls.keystore = {}
73-
for role in ["delegation", "snapshot", "targets", "timestamp"]:
74+
for role in ["delegation", RoleNames.SNAPSHOT.value, RoleNames.TARGETS.value, RoleNames.TIMESTAMP.value]:
7475
cls.keystore[role] = import_ed25519_privatekey_from_file(
7576
os.path.join(cls.keystore_dir, role + "_key"),
7677
password="password",
@@ -84,10 +85,10 @@ def tearDownClass(cls):
8485

8586
def test_generic_read(self):
8687
for metadata, inner_metadata_cls in [
87-
("root", Root),
88-
("snapshot", Snapshot),
89-
("timestamp", Timestamp),
90-
("targets", Targets),
88+
(RoleNames.ROOT.value, Root),
89+
(RoleNames.SNAPSHOT.value, Snapshot),
90+
(RoleNames.TIMESTAMP.value, Timestamp),
91+
(RoleNames.TARGETS.value, Targets),
9192
]:
9293

9394
# Load JSON-formatted metdata of each supported type from file
@@ -128,7 +129,7 @@ def test_compact_json(self):
128129
)
129130

130131
def test_read_write_read_compare(self):
131-
for metadata in ["root", "snapshot", "timestamp", "targets"]:
132+
for metadata in [RoleNames.ROOT.value, RoleNames.SNAPSHOT.value, RoleNames.TIMESTAMP.value, RoleNames.TARGETS.value]:
132133
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
133134
md_obj = Metadata.from_file(path)
134135

@@ -140,7 +141,7 @@ def test_read_write_read_compare(self):
140141
os.remove(path_2)
141142

142143
def test_to_from_bytes(self):
143-
for metadata in ["root", "snapshot", "timestamp", "targets"]:
144+
for metadata in [RoleNames.ROOT.value, RoleNames.SNAPSHOT.value, RoleNames.TIMESTAMP.value, RoleNames.TARGETS.value]:
144145
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
145146
with open(path, "rb") as f:
146147
metadata_bytes = f.read()
@@ -161,11 +162,11 @@ def test_sign_verify(self):
161162
root = Metadata[Root].from_file(root_path).signed
162163

163164
# Locate the public keys we need from root
164-
targets_keyid = next(iter(root.roles["targets"].keyids))
165+
targets_keyid = next(iter(root.roles[RoleNames.TARGETS.value].keyids))
165166
targets_key = root.keys[targets_keyid]
166-
snapshot_keyid = next(iter(root.roles["snapshot"].keyids))
167+
snapshot_keyid = next(iter(root.roles[RoleNames.SNAPSHOT.value].keyids))
167168
snapshot_key = root.keys[snapshot_keyid]
168-
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
169+
timestamp_keyid = next(iter(root.roles[RoleNames.TIMESTAMP.value].keyids))
169170
timestamp_key = root.keys[timestamp_keyid]
170171

171172
# Load sample metadata (targets) and assert ...
@@ -184,7 +185,7 @@ def test_sign_verify(self):
184185
with self.assertRaises(exceptions.UnsignedMetadataError):
185186
targets_key.verify_signature(md_obj, JSONSerializer())
186187

187-
sslib_signer = SSlibSigner(self.keystore["snapshot"])
188+
sslib_signer = SSlibSigner(self.keystore[RoleNames.SNAPSHOT.value])
188189
# Append a new signature with the unrelated key and assert that ...
189190
sig = md_obj.sign(sslib_signer, append=True)
190191
# ... there are now two signatures, and
@@ -195,7 +196,7 @@ def test_sign_verify(self):
195196
# ... the returned (appended) signature is for snapshot key
196197
self.assertEqual(sig.keyid, snapshot_keyid)
197198

198-
sslib_signer = SSlibSigner(self.keystore["timestamp"])
199+
sslib_signer = SSlibSigner(self.keystore[RoleNames.TIMESTAMP.value])
199200
# Create and assign (don't append) a new signature and assert that ...
200201
md_obj.sign(sslib_signer, append=False)
201202
# ... there now is only one signature,
@@ -210,7 +211,7 @@ def test_verify_failures(self):
210211
root = Metadata[Root].from_file(root_path).signed
211212

212213
# Locate the timestamp public key we need from root
213-
timestamp_keyid = next(iter(root.roles["timestamp"].keyids))
214+
timestamp_keyid = next(iter(root.roles[RoleNames.TIMESTAMP.value].keyids))
214215
timestamp_key = root.keys[timestamp_keyid]
215216

216217
# Load sample metadata (timestamp)
@@ -361,20 +362,20 @@ def test_metadata_verify_delegate(self):
361362
role2 = Metadata[Targets].from_file(role2_path)
362363

363364
# test the expected delegation tree
364-
root.verify_delegate("root", root)
365-
root.verify_delegate("snapshot", snapshot)
366-
root.verify_delegate("targets", targets)
365+
root.verify_delegate(RoleNames.ROOT.value, root)
366+
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
367+
root.verify_delegate(RoleNames.TARGETS.value, targets)
367368
targets.verify_delegate("role1", role1)
368369
role1.verify_delegate("role2", role2)
369370

370371
# only root and targets can verify delegates
371372
with self.assertRaises(TypeError):
372-
snapshot.verify_delegate("snapshot", snapshot)
373+
snapshot.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
373374
# verify fails for roles that are not delegated by delegator
374375
with self.assertRaises(ValueError):
375376
root.verify_delegate("role1", role1)
376377
with self.assertRaises(ValueError):
377-
targets.verify_delegate("targets", targets)
378+
targets.verify_delegate(RoleNames.TARGETS.value, targets)
378379
# verify fails when delegator has no delegations
379380
with self.assertRaises(ValueError):
380381
role2.verify_delegate("role1", role1)
@@ -383,31 +384,31 @@ def test_metadata_verify_delegate(self):
383384
expires = snapshot.signed.expires
384385
snapshot.signed.bump_expiration()
385386
with self.assertRaises(exceptions.UnsignedMetadataError):
386-
root.verify_delegate("snapshot", snapshot)
387+
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
387388
snapshot.signed.expires = expires
388389

389390
# verify fails if roles keys do not sign the metadata
390391
with self.assertRaises(exceptions.UnsignedMetadataError):
391-
root.verify_delegate("timestamp", snapshot)
392+
root.verify_delegate(RoleNames.TIMESTAMP.value, snapshot)
392393

393394
# Add a key to snapshot role, make sure the new sig fails to verify
394-
ts_keyid = next(iter(root.signed.roles["timestamp"].keyids))
395-
root.signed.add_key("snapshot", root.signed.keys[ts_keyid])
395+
ts_keyid = next(iter(root.signed.roles[RoleNames.TIMESTAMP.value].keyids))
396+
root.signed.add_key(RoleNames.SNAPSHOT.value, root.signed.keys[ts_keyid])
396397
snapshot.signatures[ts_keyid] = Signature(ts_keyid, "ff" * 64)
397398

398399
# verify succeeds if threshold is reached even if some signatures
399400
# fail to verify
400-
root.verify_delegate("snapshot", snapshot)
401+
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
401402

402403
# verify fails if threshold of signatures is not reached
403-
root.signed.roles["snapshot"].threshold = 2
404+
root.signed.roles[RoleNames.SNAPSHOT.value].threshold = 2
404405
with self.assertRaises(exceptions.UnsignedMetadataError):
405-
root.verify_delegate("snapshot", snapshot)
406+
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
406407

407408
# verify succeeds when we correct the new signature and reach the
408409
# threshold of 2 keys
409-
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
410-
root.verify_delegate("snapshot", snapshot)
410+
snapshot.sign(SSlibSigner(self.keystore[RoleNames.TIMESTAMP.value]), append=True)
411+
root.verify_delegate(RoleNames.SNAPSHOT.value, snapshot)
411412

412413
def test_key_class(self):
413414
# Test if from_securesystemslib_key removes the private key from keyval
@@ -433,44 +434,44 @@ def test_root_add_key_and_remove_key(self):
433434
)
434435

435436
# Assert that root does not contain the new key
436-
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
437+
self.assertNotIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
437438
self.assertNotIn(keyid, root.signed.keys)
438439

439440
# Add new root key
440-
root.signed.add_key("root", key_metadata)
441+
root.signed.add_key(RoleNames.ROOT.value, key_metadata)
441442

442443
# Assert that key is added
443-
self.assertIn(keyid, root.signed.roles["root"].keyids)
444+
self.assertIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
444445
self.assertIn(keyid, root.signed.keys)
445446

446447
# Confirm that the newly added key does not break
447448
# the object serialization
448449
root.to_dict()
449450

450451
# Try adding the same key again and assert its ignored.
451-
pre_add_keyid = root.signed.roles["root"].keyids.copy()
452-
root.signed.add_key("root", key_metadata)
453-
self.assertEqual(pre_add_keyid, root.signed.roles["root"].keyids)
452+
pre_add_keyid = root.signed.roles[RoleNames.ROOT.value].keyids.copy()
453+
root.signed.add_key(RoleNames.ROOT.value, key_metadata)
454+
self.assertEqual(pre_add_keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
454455

455456
# Add the same key to targets role as well
456-
root.signed.add_key("targets", key_metadata)
457+
root.signed.add_key(RoleNames.TARGETS.value, key_metadata)
457458

458459
# Add the same key to a nonexistent role.
459460
with self.assertRaises(ValueError):
460461
root.signed.add_key("nosuchrole", key_metadata)
461462

462463
# Remove the key from root role (targets role still uses it)
463-
root.signed.remove_key("root", keyid)
464-
self.assertNotIn(keyid, root.signed.roles["root"].keyids)
464+
root.signed.remove_key(RoleNames.ROOT.value, keyid)
465+
self.assertNotIn(keyid, root.signed.roles[RoleNames.ROOT.value].keyids)
465466
self.assertIn(keyid, root.signed.keys)
466467

467468
# Remove the key from targets as well
468-
root.signed.remove_key("targets", keyid)
469-
self.assertNotIn(keyid, root.signed.roles["targets"].keyids)
469+
root.signed.remove_key(RoleNames.TARGETS.value, keyid)
470+
self.assertNotIn(keyid, root.signed.roles[RoleNames.TARGETS.value].keyids)
470471
self.assertNotIn(keyid, root.signed.keys)
471472

472473
with self.assertRaises(ValueError):
473-
root.signed.remove_key("root", "nosuchkey")
474+
root.signed.remove_key(RoleNames.ROOT.value, "nosuchkey")
474475
with self.assertRaises(ValueError):
475476
root.signed.remove_key("nosuchrole", keyid)
476477

@@ -661,7 +662,7 @@ def test_length_and_hash_validation(self):
661662
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
662663
targets = Metadata[Targets].from_file(targets_path)
663664
file1_targetfile = targets.signed.targets["file1.txt"]
664-
filepath = os.path.join(self.repo_dir, "targets", "file1.txt")
665+
filepath = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
665666

666667
with open(filepath, "rb") as file1:
667668
file1_targetfile.verify_length_and_hashes(file1)
@@ -679,7 +680,7 @@ def test_length_and_hash_validation(self):
679680

680681
def test_targetfile_from_file(self):
681682
# Test with an existing file and valid hash algorithm
682-
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
683+
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
683684
targetfile_from_file = TargetFile.from_file(
684685
file_path, file_path, ["sha256"]
685686
)
@@ -688,20 +689,20 @@ def test_targetfile_from_file(self):
688689
targetfile_from_file.verify_length_and_hashes(file)
689690

690691
# Test with a non-existing file
691-
file_path = os.path.join(self.repo_dir, "targets", "file123.txt")
692+
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file123.txt")
692693
with self.assertRaises(FileNotFoundError):
693694
TargetFile.from_file(
694695
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
695696
)
696697

697698
# Test with an unsupported algorithm
698-
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
699+
file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
699700
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
700701
TargetFile.from_file(file_path, file_path, ["123"])
701702

702703
def test_targetfile_from_data(self):
703704
data = b"Inline test content"
704-
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
705+
target_file_path = os.path.join(self.repo_dir, RoleNames.TARGETS.value, "file1.txt")
705706

706707
# Test with a valid hash algorithm
707708
targetfile_from_data = TargetFile.from_data(

0 commit comments

Comments
 (0)