1
1
import json
2
2
import time
3
- from copy import deepcopy
4
3
from datetime import datetime , timezone
5
4
from enum import Enum
6
5
from typing import Any , Dict , List , Optional , Tuple , Union
7
6
8
7
import cbor2
9
8
import requests
10
9
import websocket
11
- from cachetools import Cache , LRUCache , TTLCache
10
+ from cachetools import Cache , LRUCache , TTLCache , func
12
11
13
12
from pycardano .address import Address
14
13
from pycardano .backend .base import (
@@ -214,9 +213,12 @@ def _parse_cost_models(self, ogmios_result: JsonDict) -> Dict[str, Dict[str, int
214
213
@property
215
214
def genesis_param (self ) -> GenesisParameters :
216
215
"""Get chain genesis parameters"""
217
- if not self ._genesis_param or self ._is_chain_tip_updated ():
218
- self ._genesis_param = self ._fetch_genesis_param ()
219
- return self ._genesis_param
216
+
217
+ @func .lru_cache (maxsize = 10 )
218
+ def _genesis_param_cache (slot ) -> GenesisParameters :
219
+ return self ._fetch_genesis_param ()
220
+
221
+ return _genesis_param_cache (self .last_block_slot )
220
222
221
223
def _fetch_genesis_param (self ) -> GenesisParameters :
222
224
result = self ._query_genesis_config ()
@@ -250,10 +252,11 @@ def epoch(self) -> int:
250
252
return self ._query_current_epoch ()
251
253
252
254
@property
255
+ @func .ttl_cache (ttl = 1 )
253
256
def last_block_slot (self ) -> int :
254
- """Slot number of last block"""
255
257
result = self ._query_chain_tip ()
256
- return result ["slot" ]
258
+ slot = result ["slot" ]
259
+ return slot
257
260
258
261
def _utxos (self , address : str ) -> List [UTxO ]:
259
262
"""Get all UTxOs associated with an address.
@@ -264,15 +267,16 @@ def _utxos(self, address: str) -> List[UTxO]:
264
267
Returns:
265
268
List[UTxO]: A list of UTxOs.
266
269
"""
267
- if (self .last_block_slot , address ) in self ._utxo_cache :
268
- return deepcopy (self ._utxo_cache [(self .last_block_slot , address )])
270
+ key = (self .last_block_slot , address )
271
+ if key in self ._utxo_cache :
272
+ return self ._utxo_cache [key ]
269
273
270
274
if self ._kupo_url :
271
275
utxos = self ._utxos_kupo (address )
272
276
else :
273
277
utxos = self ._utxos_ogmios (address )
274
278
275
- self ._utxo_cache [( self . last_block_slot , address ) ] = deepcopy ( utxos )
279
+ self ._utxo_cache [key ] = utxos
276
280
277
281
return utxos
278
282
@@ -287,9 +291,7 @@ def _get_datum_from_kupo(self, datum_hash: str) -> Optional[RawCBOR]:
287
291
"""
288
292
datum = self ._datum_cache .get (datum_hash , None )
289
293
290
- if datum is not None or (
291
- datum_hash in self ._datum_cache and not self ._is_chain_tip_updated ()
292
- ):
294
+ if datum is not None :
293
295
return datum
294
296
295
297
if self ._kupo_url is None :
0 commit comments