72
72
Timestamp ,
73
73
)
74
74
from tuf .api .serialization .json import JSONSerializer
75
- from tuf .exceptions import FetcherHTTPError , RepositoryError
75
+ from tuf .exceptions import FetcherHTTPError
76
76
from tuf .ngclient .fetcher import FetcherInterface
77
77
78
78
logger = logging .getLogger (__name__ )
@@ -89,6 +89,9 @@ class RepositoryTarget:
89
89
90
90
91
91
class RepositorySimulator (FetcherInterface ):
92
+ """Simulates a repository that can be used for testing"""
93
+
94
+ # pylint: disable=too-many-instance-attributes
92
95
def __init__ (self ):
93
96
self .md_root : Metadata [Root ] = None
94
97
self .md_timestamp : Metadata [Timestamp ] = None
@@ -134,11 +137,14 @@ def targets(self) -> Targets:
134
137
return self .md_targets .signed
135
138
136
139
def all_targets (self ) -> Iterator [Tuple [str , Targets ]]:
140
+ """Yield role name and signed portion of targets one by one"""
137
141
yield "targets" , self .md_targets .signed
138
142
for role , md in self .md_delegates .items ():
139
143
yield role , md .signed
140
144
145
+ # pylint: disable-next=no-self-use
141
146
def create_key (self ) -> Tuple [Key , SSlibSigner ]:
147
+ """Create ed25519 key"""
142
148
sslib_key = generate_ed25519_key ()
143
149
return Key .from_securesystemslib_key (sslib_key ), SSlibSigner (sslib_key )
144
150
@@ -178,6 +184,7 @@ def publish_root(self):
178
184
logger .debug ("Published root v%d" , self .root .version )
179
185
180
186
def fetch (self , url : str ) -> Iterator [bytes ]:
187
+ """Fetches data from the given url and yeilds an Iterator"""
181
188
if not self .root .consistent_snapshot :
182
189
raise NotImplementedError ("non-consistent snapshot not supported" )
183
190
path = parse .urlparse (url ).path
@@ -188,20 +195,24 @@ def fetch(self, url: str) -> Iterator[bytes]:
188
195
version = None
189
196
role = "timestamp"
190
197
else :
198
+ # pylint: disable-next=invalid-name
191
199
version , _ , role = ver_and_name .partition ("." )
192
200
version = int (version )
193
201
yield self ._fetch_metadata (role , version )
194
202
elif path .startswith ("/targets/" ):
195
203
# figure out target path and hash prefix
196
204
target_path = path [len ("/targets/" ) :]
197
205
dir_parts , sep , prefixed_filename = target_path .rpartition ("/" )
206
+ # pylint: disable-next=invalid-name
198
207
prefix , _ , filename = prefixed_filename .partition ("." )
199
208
target_path = f"{ dir_parts } { sep } { filename } "
200
209
201
210
yield self ._fetch_target (target_path , prefix )
202
211
else :
203
212
raise FetcherHTTPError (f"Unknown path '{ path } '" , 404 )
204
213
214
+ # Disable the redefined-builtin warning raised because of the hash func.
215
+ # pylint: disable-next=redefined-builtin
205
216
def _fetch_target (self , target_path : str , hash : Optional [str ]) -> bytes :
206
217
"""Return data for 'target_path', checking 'hash' if it is given.
207
218
@@ -227,35 +238,32 @@ def _fetch_metadata(
227
238
# return a version previously serialized in publish_root()
228
239
if version is None or version > len (self .signed_roots ):
229
240
raise FetcherHTTPError (f"Unknown root version { version } " , 404 )
230
- logger .debug ("fetched root version %d" , role , version )
241
+ logger .debug ("fetched root version %d" , version )
231
242
return self .signed_roots [version - 1 ]
243
+
244
+ # sign and serialize the requested metadata
245
+ if role == "timestamp" :
246
+ md : Metadata = self .md_timestamp
247
+ elif role == "snapshot" :
248
+ md = self .md_snapshot
249
+ elif role == "targets" :
250
+ md = self .md_targets
232
251
else :
233
- # sign and serialize the requested metadata
234
- if role == "timestamp" :
235
- md : Metadata = self .md_timestamp
236
- elif role == "snapshot" :
237
- md = self .md_snapshot
238
- elif role == "targets" :
239
- md = self .md_targets
240
- else :
241
- md = self .md_delegates [role ]
242
-
243
- if md is None :
244
- raise FetcherHTTPError (f"Unknown role { role } " , 404 )
245
- if version is not None and version != md .signed .version :
246
- raise FetcherHTTPError (f"Unknown { role } version { version } " , 404 )
247
-
248
- md .signatures .clear ()
249
- for signer in self .signers [role ]:
250
- md .sign (signer , append = True )
251
-
252
- logger .debug (
253
- "fetched %s v%d with %d sigs" ,
254
- role ,
255
- md .signed .version ,
256
- len (self .signers [role ]),
257
- )
258
- return md .to_bytes (JSONSerializer ())
252
+ md = self .md_delegates [role ]
253
+
254
+ if md is None :
255
+ raise FetcherHTTPError (f"Unknown role { role } " , 404 )
256
+ if version is not None and version != md .signed .version :
257
+ raise FetcherHTTPError (f"Unknown { role } version { version } " , 404 )
258
+
259
+ md .signatures .clear ()
260
+ for signer in self .signers [role ]:
261
+ md .sign (signer , append = True )
262
+
263
+ num_signers = len (self .signers [role ])
264
+ msg = f"fetched { role } { md .signed .version } with { num_signers } sigs"
265
+ logger .debug (msg )
266
+ return md .to_bytes (JSONSerializer ())
259
267
260
268
def _compute_hashes_and_length (
261
269
self , role : str
@@ -267,6 +275,7 @@ def _compute_hashes_and_length(
267
275
return hashes , len (data )
268
276
269
277
def update_timestamp (self ):
278
+ """Update timestamp and assign snapshot ver to snapshot_meta ver"""
270
279
self .timestamp .snapshot_meta .version = self .snapshot .version
271
280
272
281
if self .compute_metafile_hashes_length :
@@ -277,6 +286,7 @@ def update_timestamp(self):
277
286
self .timestamp .version += 1
278
287
279
288
def update_snapshot (self ):
289
+ """Update snapshot, assign targets versions and update timestamp"""
280
290
for role , delegate in self .all_targets ():
281
291
hashes = None
282
292
length = None
@@ -291,6 +301,7 @@ def update_snapshot(self):
291
301
self .update_timestamp ()
292
302
293
303
def add_target (self , role : str , data : bytes , path : str ):
304
+ """Create a target from data and add it to the target_files"""
294
305
if role == "targets" :
295
306
targets = self .targets
296
307
else :
@@ -309,6 +320,7 @@ def add_delegation(
309
320
paths : Optional [List [str ]],
310
321
hash_prefixes : Optional [List [str ]],
311
322
):
323
+ """Add delegated target role to the repository"""
312
324
if delegator_name == "targets" :
313
325
delegator = self .targets
314
326
else :
@@ -342,17 +354,18 @@ def write(self):
342
354
print (f"Repository Simulator dumps in { self .dump_dir } " )
343
355
344
356
self .dump_version += 1
345
- dir = os .path .join (self .dump_dir , str (self .dump_version ))
346
- os .makedirs (dir )
357
+ dest_dir = os .path .join (self .dump_dir , str (self .dump_version ))
358
+ os .makedirs (dest_dir )
347
359
348
360
for ver in range (1 , len (self .signed_roots ) + 1 ):
349
- with open (os .path .join (dir , f"{ ver } .root.json" ), "wb" ) as f :
361
+ with open (os .path .join (dest_dir , f"{ ver } .root.json" ), "wb" ) as f :
350
362
f .write (self ._fetch_metadata ("root" , ver ))
351
363
352
364
for role in ["timestamp" , "snapshot" , "targets" ]:
353
- with open (os .path .join (dir , f"{ role } .json" ), "wb" ) as f :
365
+ with open (os .path .join (dest_dir , f"{ role } .json" ), "wb" ) as f :
354
366
f .write (self ._fetch_metadata (role ))
355
367
368
+ # pylint: disable-next=consider-iterating-dictionary
356
369
for role in self .md_delegates .keys ():
357
- with open (os .path .join (dir , f"{ role } .json" ), "wb" ) as f :
370
+ with open (os .path .join (dest_dir , f"{ role } .json" ), "wb" ) as f :
358
371
f .write (self ._fetch_metadata (role ))
0 commit comments