Skip to content

Commit 58e850f

Browse files
committed
subscript_storage added to sync
1 parent 045b22b commit 58e850f

File tree

2 files changed

+99
-6
lines changed

2 files changed

+99
-6
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,10 +1042,7 @@ async def create_storage_key(
10421042
async def subscribe_storage(
10431043
self,
10441044
storage_keys: list[StorageKey],
1045-
subscription_handler: Callable[
1046-
[StorageKey, Any, str],
1047-
Awaitable[Any]
1048-
],
1045+
subscription_handler: Callable[[StorageKey, Any, str], Awaitable[Any]],
10491046
):
10501047
"""
10511048
@@ -1075,7 +1072,9 @@ async def subscription_handler(storage_key, obj, subscription_id):
10751072

10761073
storage_key_map = {s.to_hex(): s for s in storage_keys}
10771074

1078-
async def result_handler(message: dict, subscription_id: str) -> tuple[bool, Optional[ScaleType]]:
1075+
async def result_handler(
1076+
message: dict, subscription_id: str
1077+
) -> tuple[bool, Optional[Any]]:
10791078
result_found = False
10801079
subscription_result = None
10811080
if "params" in message:
@@ -1112,7 +1111,7 @@ async def result_handler(message: dict, subscription_id: str) -> tuple[bool, Opt
11121111
# Decode SCALE result data
11131112
updated_obj = await self.decode_scale(
11141113
type_string=change_scale_type,
1115-
scale_bytes=hex_to_bytes(change_data)
1114+
scale_bytes=hex_to_bytes(change_data),
11161115
)
11171116

11181117
subscription_result = await subscription_handler(

async_substrate_interface/sync_substrate.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,100 @@ def create_storage_key(
796796
metadata=self.runtime.metadata,
797797
)
798798

799+
def subscribe_storage(
800+
self,
801+
storage_keys: list[StorageKey],
802+
subscription_handler: Callable[[StorageKey, Any, str], Any],
803+
):
804+
"""
805+
806+
Subscribe to provided storage_keys and keep tracking until `subscription_handler` returns a value
807+
808+
Example of a StorageKey:
809+
```
810+
StorageKey.create_from_storage_function(
811+
"System", "Account", ["5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY"]
812+
)
813+
```
814+
815+
Example of a subscription handler:
816+
```
817+
def subscription_handler(storage_key, obj, subscription_id):
818+
if obj is not None:
819+
# the subscription will run until your subscription_handler returns something other than `None`
820+
return obj
821+
```
822+
823+
Args:
824+
storage_keys: StorageKey list of storage keys to subscribe to
825+
subscription_handler: coroutine function to handle value changes of subscription
826+
827+
"""
828+
self.init_runtime()
829+
830+
storage_key_map = {s.to_hex(): s for s in storage_keys}
831+
832+
def result_handler(
833+
message: dict, subscription_id: str
834+
) -> tuple[bool, Optional[Any]]:
835+
result_found = False
836+
subscription_result = None
837+
if "params" in message:
838+
# Process changes
839+
for change_storage_key, change_data in message["params"]["result"][
840+
"changes"
841+
]:
842+
# Check for target storage key
843+
storage_key = storage_key_map[change_storage_key]
844+
845+
if change_data is not None:
846+
change_scale_type = storage_key.value_scale_type
847+
result_found = True
848+
elif (
849+
storage_key.metadata_storage_function.value["modifier"]
850+
== "Default"
851+
):
852+
# Fallback to default value of storage function if no result
853+
change_scale_type = storage_key.value_scale_type
854+
change_data = (
855+
storage_key.metadata_storage_function.value_object[
856+
"default"
857+
].value_object
858+
)
859+
else:
860+
# No result is interpreted as an Option<...> result
861+
change_scale_type = f"Option<{storage_key.value_scale_type}>"
862+
change_data = (
863+
storage_key.metadata_storage_function.value_object[
864+
"default"
865+
].value_object
866+
)
867+
868+
# Decode SCALE result data
869+
updated_obj = self.decode_scale(
870+
type_string=change_scale_type,
871+
scale_bytes=hex_to_bytes(change_data),
872+
)
873+
874+
subscription_result = subscription_handler(
875+
storage_key, updated_obj, subscription_id
876+
)
877+
878+
if subscription_result is not None:
879+
# Handler returned end result: unsubscribe from further updates
880+
self.rpc_request("state_unsubscribeStorage", [subscription_id])
881+
882+
return result_found, subscription_result
883+
884+
if not callable(subscription_handler):
885+
raise ValueError("Provided `subscription_handler` is not callable")
886+
887+
return self.rpc_request(
888+
"state_subscribeStorage",
889+
[[s.to_hex() for s in storage_keys]],
890+
result_handler=result_handler,
891+
)
892+
799893
def get_metadata_storage_functions(self, block_hash=None) -> list:
800894
"""
801895
Retrieves a list of all storage functions in metadata active at given block_hash (or chaintip if block_hash is

0 commit comments

Comments
 (0)