8
8
import os
9
9
import sys
10
10
import tempfile
11
- from typing import List , Optional
12
11
import unittest
13
12
from datetime import datetime , timedelta
13
+ from typing import Iterable , Optional
14
14
15
15
from tests import utils
16
16
from tests .repository_simulator import RepositorySimulator
17
- from tuf .api .metadata import Metadata , TOP_LEVEL_ROLE_NAMES
17
+ from tuf .api .metadata import TOP_LEVEL_ROLE_NAMES , Metadata
18
18
from tuf .exceptions import (
19
19
BadVersionNumberError ,
20
20
ExpiredMetadataError ,
24
24
)
25
25
from tuf .ngclient import Updater
26
26
27
+
27
28
class TestRefresh (unittest .TestCase ):
28
29
"""Test update of top-level metadata following
29
30
'Detailed client workflow' in the specification."""
30
31
31
32
past_datetime = datetime .utcnow ().replace (microsecond = 0 ) - timedelta (days = 5 )
32
33
33
- def setUp (self ):
34
+ def setUp (self ) -> None :
34
35
self .temp_dir = tempfile .TemporaryDirectory ()
35
36
self .metadata_dir = os .path .join (self .temp_dir .name , "metadata" )
36
37
self .targets_dir = os .path .join (self .temp_dir .name , "targets" )
@@ -45,7 +46,7 @@ def setUp(self):
45
46
)
46
47
f .write (root )
47
48
48
- def tearDown (self ):
49
+ def tearDown (self ) -> None :
49
50
self .temp_dir .cleanup ()
50
51
51
52
def _run_refresh (self ) -> Updater :
@@ -68,13 +69,15 @@ def _init_updater(self) -> Updater:
68
69
fetcher = self .sim ,
69
70
)
70
71
71
- def _assert_files_exist (self , roles : List [str ]) -> None :
72
+ def _assert_files_exist (self , roles : Iterable [str ]) -> None :
72
73
"""Assert that local metadata files exist for 'roles'"""
73
74
expected_files = sorted ([f"{ role } .json" for role in roles ])
74
75
local_metadata_files = sorted (os .listdir (self .metadata_dir ))
75
76
self .assertListEqual (local_metadata_files , expected_files )
76
77
77
- def _assert_content_equals (self , role : str , version : Optional [int ]= None ) -> None :
78
+ def _assert_content_equals (
79
+ self , role : str , version : Optional [int ] = None
80
+ ) -> None :
78
81
"""Assert that local file content is the expected"""
79
82
expected_content = self .sim ._fetch_metadata (role , version )
80
83
with open (os .path .join (self .metadata_dir , f"{ role } .json" ), "rb" ) as f :
@@ -166,7 +169,8 @@ def test_max_root_rotations(self) -> None:
166
169
167
170
updater .refresh ()
168
171
169
- # Assert that root version was increased with no more than 'max_root_rotations'
172
+ # Assert that root version was increased with no more
173
+ # than 'max_root_rotations'
170
174
md_root = Metadata .from_file (root_path )
171
175
self .assertEqual (
172
176
md_root .signed .version ,
@@ -195,7 +199,6 @@ def test_intermediate_root_incorrectly_signed(self) -> None:
195
199
self ._assert_files_exist (["root" ])
196
200
self ._assert_content_equals ("root" , 1 )
197
201
198
-
199
202
def test_intermediate_root_expired (self ) -> None :
200
203
# The expiration of the new (intermediate) root metadata file
201
204
# does not matter yet
@@ -218,7 +221,7 @@ def test_intermediate_root_expired(self) -> None:
218
221
219
222
def test_final_root_incorrectly_signed (self ) -> None :
220
223
# Check for an arbitrary software attack
221
- self .sim .root .version += 1 # root v2
224
+ self .sim .root .version += 1 # root v2
222
225
self .sim .signers ["root" ].clear ()
223
226
self .sim .publish_root ()
224
227
@@ -282,23 +285,27 @@ def test_new_timestamp_version_rollback(self) -> None:
282
285
with self .assertRaises (ReplayedMetadataError ):
283
286
self ._run_refresh ()
284
287
285
- md_timestamp = Metadata .from_file (os .path .join (self .metadata_dir , "timestamp.json" ))
288
+ md_timestamp = Metadata .from_file (
289
+ os .path .join (self .metadata_dir , "timestamp.json" )
290
+ )
286
291
self .assertEqual (md_timestamp .signed .version , 2 )
287
292
288
293
def test_new_timestamp_snapshot_rollback (self ) -> None :
289
294
# Check for a rollback attack.
290
295
self .sim .snapshot .version = 2
291
- self .sim .update_timestamp () # timestamp v2
296
+ self .sim .update_timestamp () # timestamp v2
292
297
self ._run_refresh ()
293
298
294
299
# Snapshot meta version is smaller than previous
295
300
self .sim .timestamp .snapshot_meta .version = 1
296
- self .sim .timestamp .version += 1 # timestamp v3
301
+ self .sim .timestamp .version += 1 # timestamp v3
297
302
298
303
with self .assertRaises (ReplayedMetadataError ):
299
304
self ._run_refresh ()
300
305
301
- md_timestamp = Metadata .from_file (os .path .join (self .metadata_dir , "timestamp.json" ))
306
+ md_timestamp = Metadata .from_file (
307
+ os .path .join (self .metadata_dir , "timestamp.json" )
308
+ )
302
309
self .assertEqual (md_timestamp .signed .version , 2 )
303
310
304
311
def test_new_timestamp_expired (self ) -> None :
@@ -316,24 +323,28 @@ def test_new_snapshot_hash_mismatch(self) -> None:
316
323
317
324
# Update timestamp with snapshot's hashes
318
325
self .sim .compute_metafile_hashes_length = True
319
- self .sim .update_timestamp () # timestamp v2
326
+ self .sim .update_timestamp () # timestamp v2
320
327
self ._run_refresh ()
321
328
322
329
# Modify snapshot contents without updating
323
330
# timestamp's snapshot hash
324
331
self .sim .snapshot .expires += timedelta (days = 1 )
325
332
self .sim .snapshot .version += 1 # snapshot v2
326
333
self .sim .timestamp .snapshot_meta .version = self .sim .snapshot .version
327
- self .sim .timestamp .version += 1 # timestamp v3
334
+ self .sim .timestamp .version += 1 # timestamp v3
328
335
329
336
# Hash mismatch error
330
337
with self .assertRaises (RepositoryError ):
331
338
self ._run_refresh ()
332
339
333
- md_timestamp = Metadata .from_file (os .path .join (self .metadata_dir , "timestamp.json" ))
340
+ md_timestamp = Metadata .from_file (
341
+ os .path .join (self .metadata_dir , "timestamp.json" )
342
+ )
334
343
self .assertEqual (md_timestamp .signed .version , 3 )
335
344
336
- md_snapshot = Metadata .from_file (os .path .join (self .metadata_dir , "snapshot.json" ))
345
+ md_snapshot = Metadata .from_file (
346
+ os .path .join (self .metadata_dir , "snapshot.json" )
347
+ )
337
348
self .assertEqual (md_snapshot .signed .version , 1 )
338
349
339
350
def test_new_snapshot_unsigned (self ) -> None :
@@ -370,7 +381,9 @@ def test_new_snapshot_version_rollback(self) -> None:
370
381
with self .assertRaises (ReplayedMetadataError ):
371
382
self ._run_refresh ()
372
383
373
- md_snapshot = Metadata .from_file (os .path .join (self .metadata_dir , "snapshot.json" ))
384
+ md_snapshot = Metadata .from_file (
385
+ os .path .join (self .metadata_dir , "snapshot.json" )
386
+ )
374
387
self .assertEqual (md_snapshot .signed .version , 2 )
375
388
376
389
def test_new_snapshot_expired (self ) -> None :
@@ -383,7 +396,6 @@ def test_new_snapshot_expired(self) -> None:
383
396
384
397
self ._assert_files_exist (["root" , "timestamp" ])
385
398
386
-
387
399
def test_new_targets_hash_mismatch (self ) -> None :
388
400
# Check against snapshot role’s targets hashes
389
401
@@ -395,17 +407,23 @@ def test_new_targets_hash_mismatch(self) -> None:
395
407
# Modify targets contents without updating
396
408
# snapshot's targets hashes
397
409
self .sim .targets .version += 1
398
- self .sim .snapshot .meta ["targets.json" ].version = self .sim .targets .version
410
+ self .sim .snapshot .meta [
411
+ "targets.json"
412
+ ].version = self .sim .targets .version
399
413
self .sim .snapshot .version += 1
400
414
self .sim .update_timestamp ()
401
415
402
416
with self .assertRaises (RepositoryError ):
403
417
self ._run_refresh ()
404
418
405
- md_snapshot = Metadata .from_file (os .path .join (self .metadata_dir , "snapshot.json" ))
419
+ md_snapshot = Metadata .from_file (
420
+ os .path .join (self .metadata_dir , "snapshot.json" )
421
+ )
406
422
self .assertEqual (md_snapshot .signed .version , 3 )
407
423
408
- md_targets = Metadata .from_file (os .path .join (self .metadata_dir , "targets.json" ))
424
+ md_targets = Metadata .from_file (
425
+ os .path .join (self .metadata_dir , "targets.json" )
426
+ )
409
427
self .assertEqual (md_targets .signed .version , 1 )
410
428
411
429
def test_new_targets_unsigned (self ) -> None :
0 commit comments