Skip to content

Commit 9b0e5ce

Browse files
feat: Make bytewax job write as mini-batches (#3777)
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
1 parent fb6b807 commit 9b0e5ce

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from typing import List
23

34
import pyarrow as pa
@@ -11,6 +12,8 @@
1112
from feast import FeatureStore, FeatureView, RepoConfig
1213
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1314

15+
DEFAULT_BATCH_SIZE = 1000
16+
1417

1518
class BytewaxMaterializationDataflow:
1619
def __init__(
@@ -44,6 +47,11 @@ def input_builder(self, worker_index, worker_count, _state):
4447
return
4548

4649
def output_builder(self, worker_index, worker_count):
50+
def yield_batch(iterable, batch_size):
51+
"""Yield mini-batches from an iterable."""
52+
for i in range(0, len(iterable), batch_size):
53+
yield iterable[i : i + batch_size]
54+
4755
def output_fn(batch):
4856
table = pa.Table.from_batches([batch])
4957

@@ -62,12 +70,17 @@ def output_fn(batch):
6270
)
6371
provider = self.feature_store._get_provider()
6472
with tqdm(total=len(rows_to_write)) as progress:
65-
provider.online_write_batch(
66-
config=self.config,
67-
table=self.feature_view,
68-
data=rows_to_write,
69-
progress=progress.update,
73+
# break rows_to_write to mini-batches
74+
batch_size = int(
75+
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
7076
)
77+
for mini_batch in yield_batch(rows_to_write, batch_size):
78+
provider.online_write_batch(
79+
config=self.config,
80+
table=self.feature_view,
81+
data=mini_batch,
82+
progress=progress.update,
83+
)
7184

7285
return output_fn
7386

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

+7
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
6767
max_parallelism: int = 10
6868
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
6969

70+
mini_batch_size: int = 1000
71+
""" (optional) Number of rows to process per write operation (default 1000)"""
72+
7073

7174
class BytewaxMaterializationEngine(BatchMaterializationEngine):
7275
def __init__(
@@ -254,6 +257,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
254257
"name": "BYTEWAX_STATEFULSET_NAME",
255258
"value": f"dataflow-{job_id}",
256259
},
260+
{
261+
"name": "BYTEWAX_MINI_BATCH_SIZE",
262+
"value": str(self.batch_engine_config.mini_batch_size),
263+
},
257264
]
258265
# Add any Feast configured environment variables
259266
job_env.extend(env)

0 commit comments

Comments
 (0)