Skip to content

Per default refetch the chain tip at most every 20 seconds #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pycardano/backend/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pycardano.address import Address
from pycardano.network import Network
from pycardano.plutus import ExecutionUnits
from pycardano.transaction import UTxO, Transaction
from pycardano.transaction import Transaction, UTxO

__all__ = [
"GenesisParameters",
Expand Down
20 changes: 19 additions & 1 deletion pycardano/backend/ogmios.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -45,6 +46,7 @@ class OgmiosChainContext(ChainContext):
_service_name: str
_kupo_url: Optional[str]
_last_known_block_slot: int
_last_chain_tip_fetch: float
_genesis_param: Optional[GenesisParameters]
_protocol_param: Optional[ProtocolParameters]

Expand All @@ -54,14 +56,26 @@ def __init__(
network: Network,
compact_result=True,
kupo_url=None,
refetch_chain_tip_interval: Optional[float] = None,
):
self._ws_url = ws_url
self._network = network
self._service_name = "ogmios.v1:compact" if compact_result else "ogmios"
self._kupo_url = kupo_url
self._last_known_block_slot = 0
self._refetch_chain_tip_interval = (
refetch_chain_tip_interval
if refetch_chain_tip_interval is not None
else 1000
)
self._last_chain_tip_fetch = 0
self._genesis_param = None
self._protocol_param = None
if refetch_chain_tip_interval is None:
self._refetch_chain_tip_interval = (
self.genesis_param.slot_length
/ self.genesis_param.active_slots_coefficient
)

def _request(self, method: OgmiosQueryType, args: JsonDict) -> Any:
ws = websocket.WebSocket()
Expand Down Expand Up @@ -110,8 +124,12 @@ def _query_utxos_by_tx_id(self, tx_id: str, index: int) -> List[List[JsonDict]]:
return self._request(OgmiosQueryType.Query, args)

def _is_chain_tip_updated(self):
# fetch at most every twenty seconds!
if time.time() - self._last_chain_tip_fetch < self._refetch_chain_tip_interval:
return False
self._last_chain_tip_fetch = time.time()
slot = self.last_block_slot
if self._last_known_block_slot != slot:
if self._last_known_block_slot < slot:
self._last_known_block_slot = slot
return True
else:
Expand Down
47 changes: 25 additions & 22 deletions test/pycardano/backend/test_ogmios.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import patch

from pycardano.backend.base import GenesisParameters, ProtocolParameters
from pycardano.backend.ogmios import OgmiosChainContext
from pycardano.network import Network
Expand Down Expand Up @@ -80,30 +82,31 @@
]


class TestOgmiosChainContext:
chain_context = OgmiosChainContext("", Network.TESTNET)

def override_request(method, args):
if args["query"] == "currentProtocolParameters":
return PROTOCOL_RESULT
elif args["query"] == "genesisConfig":
return GENESIS_RESULT
elif "utxo" in args["query"]:
query = args["query"]["utxo"][0]
if isinstance(query, dict):
for utxo in UTXOS:
if (
utxo[0]["txId"] == query["txId"]
and utxo[0]["index"] == query["index"]
):
return [utxo]
return []
else:
return UTXOS
def override_request(method, args):
if args["query"] == "currentProtocolParameters":
return PROTOCOL_RESULT
elif args["query"] == "genesisConfig":
return GENESIS_RESULT
elif "utxo" in args["query"]:
query = args["query"]["utxo"][0]
if isinstance(query, dict):
for utxo in UTXOS:
if (
utxo[0]["txId"] == query["txId"]
and utxo[0]["index"] == query["index"]
):
return [utxo]
return []
else:
return None
return UTXOS
else:
return None

chain_context._request = override_request

class TestOgmiosChainContext:
def __init__(self):
with patch("OgmiosChainContext._request", return_value=override_request):
self.chain_context = OgmiosChainContext("", Network.TESTNET)

def test_protocol_param(self):
assert (
Expand Down