@@ -1039,6 +1039,105 @@ async def create_storage_key(
1039
1039
metadata = self .runtime .metadata ,
1040
1040
)
1041
1041
1042
+ async def subscribe_storage (
1043
+ self ,
1044
+ storage_keys : list [StorageKey ],
1045
+ subscription_handler : Callable [
1046
+ [StorageKey , Any , str ],
1047
+ Awaitable [Any ]
1048
+ ],
1049
+ ):
1050
+ """
1051
+
1052
+ Subscribe to provided storage_keys and keep tracking until `subscription_handler` returns a value
1053
+
1054
+ Example of a StorageKey:
1055
+ ```
1056
+ StorageKey.create_from_storage_function(
1057
+ "System", "Account", ["5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY"]
1058
+ )
1059
+ ```
1060
+
1061
+ Example of a subscription handler:
1062
+ ```
1063
+ async def subscription_handler(storage_key, obj, subscription_id):
1064
+ if obj is not None:
1065
+ # the subscription will run until your subscription_handler returns something other than `None`
1066
+ return obj
1067
+ ```
1068
+
1069
+ Args:
1070
+ storage_keys: StorageKey list of storage keys to subscribe to
1071
+ subscription_handler: coroutine function to handle value changes of subscription
1072
+
1073
+ """
1074
+ await self .init_runtime ()
1075
+
1076
+ storage_key_map = {s .to_hex (): s for s in storage_keys }
1077
+
1078
+ async def result_handler (message : dict , subscription_id : str ) -> tuple [bool , Optional [ScaleType ]]:
1079
+ result_found = False
1080
+ subscription_result = None
1081
+ if "params" in message :
1082
+ # Process changes
1083
+ for change_storage_key , change_data in message ["params" ]["result" ][
1084
+ "changes"
1085
+ ]:
1086
+ # Check for target storage key
1087
+ storage_key = storage_key_map [change_storage_key ]
1088
+
1089
+ if change_data is not None :
1090
+ change_scale_type = storage_key .value_scale_type
1091
+ result_found = True
1092
+ elif (
1093
+ storage_key .metadata_storage_function .value ["modifier" ]
1094
+ == "Default"
1095
+ ):
1096
+ # Fallback to default value of storage function if no result
1097
+ change_scale_type = storage_key .value_scale_type
1098
+ change_data = (
1099
+ storage_key .metadata_storage_function .value_object [
1100
+ "default"
1101
+ ].value_object
1102
+ )
1103
+ else :
1104
+ # No result is interpreted as an Option<...> result
1105
+ change_scale_type = f"Option<{ storage_key .value_scale_type } >"
1106
+ change_data = (
1107
+ storage_key .metadata_storage_function .value_object [
1108
+ "default"
1109
+ ].value_object
1110
+ )
1111
+
1112
+ # Decode SCALE result data
1113
+ updated_obj = await self .decode_scale (
1114
+ type_string = change_scale_type ,
1115
+ scale_bytes = hex_to_bytes (change_data )
1116
+ )
1117
+
1118
+ subscription_result = await subscription_handler (
1119
+ storage_key , updated_obj , subscription_id
1120
+ )
1121
+
1122
+ if subscription_result is not None :
1123
+ # Handler returned end result: unsubscribe from further updates
1124
+ self ._forgettable_task = asyncio .create_task (
1125
+ self .rpc_request (
1126
+ "state_unsubscribeStorage" , [subscription_id ]
1127
+ )
1128
+ )
1129
+
1130
+ return result_found , subscription_result
1131
+
1132
+ if not callable (subscription_handler ):
1133
+ raise ValueError ("Provided `subscription_handler` is not callable" )
1134
+
1135
+ return await self .rpc_request (
1136
+ "state_subscribeStorage" ,
1137
+ [[s .to_hex () for s in storage_keys ]],
1138
+ result_handler = result_handler ,
1139
+ )
1140
+
1042
1141
async def get_metadata_storage_functions (self , block_hash = None ) -> list :
1043
1142
"""
1044
1143
Retrieves a list of all storage functions in metadata active at given block_hash (or chaintip if block_hash is
0 commit comments