Skip to content

Commit 043eff1

Browse files
authored
perf: Parallelize read calls by table and batch (feast-dev#4619)
1 parent 7f00b16 commit 043eff1

File tree

2 files changed

+48
-20
lines changed

2 files changed

+48
-20
lines changed

sdk/python/feast/infra/online_stores/dynamodb.py

+34-18
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import asyncio
1415
import itertools
1516
import logging
1617
from datetime import datetime
@@ -297,7 +298,6 @@ async def online_read_async(
297298
batch_size = online_config.batch_size
298299
entity_ids = self._to_entity_ids(config, entity_keys)
299300
entity_ids_iter = iter(entity_ids)
300-
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
301301
table_name = _get_table_name(online_config, config, table)
302302

303303
deserialize = TypeDeserializer().deserialize
@@ -309,24 +309,40 @@ def to_tbl_resp(raw_client_response):
309309
"values": deserialize(raw_client_response["values"]),
310310
}
311311

312+
batches = []
313+
entity_id_batches = []
314+
while True:
315+
batch = list(itertools.islice(entity_ids_iter, batch_size))
316+
if not batch:
317+
break
318+
entity_id_batch = self._to_client_batch_get_payload(
319+
online_config, table_name, batch
320+
)
321+
batches.append(batch)
322+
entity_id_batches.append(entity_id_batch)
323+
312324
async with self._get_aiodynamodb_client(online_config.region) as client:
313-
while True:
314-
batch = list(itertools.islice(entity_ids_iter, batch_size))
315-
316-
# No more items to insert
317-
if len(batch) == 0:
318-
break
319-
batch_entity_ids = self._to_client_batch_get_payload(
320-
online_config, table_name, batch
321-
)
322-
response = await client.batch_get_item(
323-
RequestItems=batch_entity_ids,
324-
)
325-
batch_result = self._process_batch_get_response(
326-
table_name, response, entity_ids, batch, to_tbl_response=to_tbl_resp
327-
)
328-
result.extend(batch_result)
329-
return result
325+
response_batches = await asyncio.gather(
326+
*[
327+
client.batch_get_item(
328+
RequestItems=entity_id_batch,
329+
)
330+
for entity_id_batch in entity_id_batches
331+
]
332+
)
333+
334+
result_batches = []
335+
for batch, response in zip(batches, response_batches):
336+
result_batch = self._process_batch_get_response(
337+
table_name,
338+
response,
339+
entity_ids,
340+
batch,
341+
to_tbl_response=to_tbl_resp,
342+
)
343+
result_batches.append(result_batch)
344+
345+
return list(itertools.chain(*result_batches))
330346

331347
def _get_aioboto_session(self):
332348
if self._aioboto_session is None:

sdk/python/feast/infra/online_stores/online_store.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import asyncio
1515
from abc import ABC, abstractmethod
1616
from datetime import datetime
1717
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union
@@ -240,7 +240,7 @@ async def get_online_features_async(
240240
native_entity_values=True,
241241
)
242242

243-
for table, requested_features in grouped_refs:
243+
async def query_table(table, requested_features):
244244
# Get the correct set of entity values with the correct join keys.
245245
table_entity_values, idxs = utils._get_unique_entities(
246246
table,
@@ -258,6 +258,18 @@ async def get_online_features_async(
258258
requested_features=requested_features,
259259
)
260260

261+
return idxs, read_rows
262+
263+
all_responses = await asyncio.gather(
264+
*[
265+
query_table(table, requested_features)
266+
for table, requested_features in grouped_refs
267+
]
268+
)
269+
270+
for (idxs, read_rows), (table, requested_features) in zip(
271+
all_responses, grouped_refs
272+
):
261273
feature_data = utils._convert_rows_to_protobuf(
262274
requested_features, read_rows
263275
)

0 commit comments

Comments
 (0)