Skip to content

Commit 9cf9d96

Browse files
sudohainguyenachals
authored andcommitted
feat: Optimize bytewax pod resource with zero-copy
Signed-off-by: Hai Nguyen <quanghai.ng1512@gmail.com>
1 parent 1f91fc6 commit 9cf9d96

File tree

3 files changed

+31
-27
lines changed

3 files changed

+31
-27
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

+17-24
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
from typing import List
34

@@ -7,11 +8,11 @@
78
from bytewax.execution import cluster_main
89
from bytewax.inputs import ManualInputConfig
910
from bytewax.outputs import ManualOutputConfig
10-
from tqdm import tqdm
1111

1212
from feast import FeatureStore, FeatureView, RepoConfig
1313
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1414

15+
logger = logging.getLogger(__name__)
1516
DEFAULT_BATCH_SIZE = 1000
1617

1718

@@ -29,14 +30,20 @@ def __init__(
2930
self.feature_view = feature_view
3031
self.worker_index = worker_index
3132
self.paths = paths
33+
self.mini_batch_size = int(
34+
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
35+
)
3236

3337
self._run_dataflow()
3438

3539
def process_path(self, path):
40+
logger.info(f"Processing path {path}")
3641
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
3742
batches = []
3843
for fragment in dataset.fragments:
39-
for batch in fragment.to_table().to_batches():
44+
for batch in fragment.to_table().to_batches(
45+
max_chunksize=self.mini_batch_size
46+
):
4047
batches.append(batch)
4148

4249
return batches
@@ -45,40 +52,26 @@ def input_builder(self, worker_index, worker_count, _state):
4552
return [(None, self.paths[self.worker_index])]
4653

4754
def output_builder(self, worker_index, worker_count):
48-
def yield_batch(iterable, batch_size):
49-
"""Yield mini-batches from an iterable."""
50-
for i in range(0, len(iterable), batch_size):
51-
yield iterable[i : i + batch_size]
52-
53-
def output_fn(batch):
54-
table = pa.Table.from_batches([batch])
55+
def output_fn(mini_batch):
56+
table: pa.Table = pa.Table.from_batches([mini_batch])
5557

5658
if self.feature_view.batch_source.field_mapping is not None:
5759
table = _run_pyarrow_field_mapping(
5860
table, self.feature_view.batch_source.field_mapping
5961
)
60-
6162
join_key_to_value_type = {
6263
entity.name: entity.dtype.to_value_type()
6364
for entity in self.feature_view.entity_columns
6465
}
65-
6666
rows_to_write = _convert_arrow_to_proto(
6767
table, self.feature_view, join_key_to_value_type
6868
)
69-
provider = self.feature_store._get_provider()
70-
with tqdm(total=len(rows_to_write)) as progress:
71-
# break rows_to_write to mini-batches
72-
batch_size = int(
73-
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
74-
)
75-
for mini_batch in yield_batch(rows_to_write, batch_size):
76-
provider.online_write_batch(
77-
config=self.config,
78-
table=self.feature_view,
79-
data=mini_batch,
80-
progress=progress.update,
81-
)
69+
self.feature_store._get_provider().online_write_batch(
70+
config=self.config,
71+
table=self.feature_view,
72+
data=rows_to_write,
73+
progress=None,
74+
)
8275

8376
return output_fn
8477

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
8282
mini_batch_size: int = 1000
8383
""" (optional) Number of rows to process per write operation (default 1000)"""
8484

85+
bytewax_replicas: int = 5
86+
""" (optional) Number of process to spawn in each pods to handle a file in parallel"""
87+
88+
bytewax_worker_per_process: int = 1
89+
""" (optional) Number of threads as worker per bytewax process"""
90+
8591
active_deadline_seconds: int = 86400
8692
""" (optional) Maximum amount of time a materialization job is allowed to run"""
8793

@@ -111,7 +117,6 @@ def __init__(
111117
self.offline_store = offline_store
112118
self.online_store = online_store
113119

114-
# TODO: Configure k8s here
115120
k8s_config.load_config()
116121

117122
self.k8s_client = client.api_client.ApiClient()
@@ -299,6 +304,9 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
299304
len(paths), # Create a pod for each parquet file
300305
self.batch_engine_config.env,
301306
)
307+
logger.info(
308+
f"Created job `dataflow-{job_id}` on namespace `{self.namespace}`"
309+
)
302310
except FailToCreateError as failures:
303311
return BytewaxMaterializationJob(job_id, self.namespace, error=failures)
304312

@@ -345,7 +353,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
345353
{"name": "BYTEWAX_WORKDIR", "value": "/bytewax"},
346354
{
347355
"name": "BYTEWAX_WORKERS_PER_PROCESS",
348-
"value": "1",
356+
"value": f"{self.batch_engine_config.bytewax_worker_per_process}",
349357
},
350358
{
351359
"name": "BYTEWAX_POD_NAME",
@@ -358,7 +366,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
358366
},
359367
{
360368
"name": "BYTEWAX_REPLICAS",
361-
"value": f"{pods}",
369+
"value": f"{self.batch_engine_config.bytewax_replicas}",
362370
},
363371
{
364372
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",

sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23

34
import yaml
@@ -8,6 +9,8 @@
89
)
910

1011
if __name__ == "__main__":
12+
logging.basicConfig(level=logging.INFO)
13+
1114
with open("/var/feast/feature_store.yaml") as f:
1215
feast_config = yaml.safe_load(f)
1316

0 commit comments

Comments
 (0)