Skip to content

Commit 49497ec

Browse files
SAASMLOPS-809 fix bytewax workers so they only process a single file (#6)
* SAASMLOPS-809 fix bytewax workers so they only process a single file * SAASMLOPS-809 fix newlines Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
1 parent 21a5f05 commit 49497ec

File tree

3 files changed

+15
-12
lines changed

3 files changed

+15
-12
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,37 @@
55
import s3fs
66
from bytewax.dataflow import Dataflow # type: ignore
77
from bytewax.execution import cluster_main
8-
from bytewax.inputs import ManualInputConfig, distribute
8+
from bytewax.inputs import ManualInputConfig
99
from bytewax.outputs import ManualOutputConfig
1010
from tqdm import tqdm
11+
import logging
1112

1213
from feast import FeatureStore, FeatureView, RepoConfig
1314
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1415

16+
logger = logging.getLogger(__name__)
17+
1518

1619
class BytewaxMaterializationDataflow:
1720
def __init__(
1821
self,
1922
config: RepoConfig,
2023
feature_view: FeatureView,
2124
paths: List[str],
25+
worker_index: int
2226
):
2327
self.config = config
2428
self.feature_store = FeatureStore(config=config)
2529

2630
self.feature_view = feature_view
31+
self.worker_index = worker_index
2732
self.paths = paths
2833

2934
self._run_dataflow()
3035

3136
def process_path(self, path):
3237
fs = s3fs.S3FileSystem()
33-
print(f"Processing path {path}")
38+
logger.info(f"Processing path {path}")
3439
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
3540
batches = []
3641
for fragment in dataset.fragments:
@@ -40,11 +45,7 @@ def process_path(self, path):
4045
return batches
4146

4247
def input_builder(self, worker_index, worker_count, _state):
43-
worker_paths = distribute(self.paths, worker_index, worker_count)
44-
for path in worker_paths:
45-
yield None, path
46-
47-
return
48+
return [(None, self.paths[self.worker_index])]
4849

4950
def output_builder(self, worker_index, worker_count):
5051
def output_fn(batch):

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ def _materialize_one(
202202
try:
203203
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
204204
except ApiException as de:
205-
logger.warning(f"Could not delete job due to API Error: {ae.body}")
205+
logger.warning(f"Could not delete job due to API Error: {de.body}")
206206
raise e
207207
self._print_pod_logs(job.job_id(), feature_view)
208208
return job
@@ -290,7 +290,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
290290
},
291291
{
292292
"name": "BYTEWAX_REPLICAS",
293-
"value": "1",
293+
"value": f"{pods}",
294294
},
295295
{
296296
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
@@ -324,8 +324,8 @@ def _create_job_definition(self, job_id, namespace, pods, env):
324324
"spec": {
325325
"ttlSecondsAfterFinished": 3600,
326326
"backoffLimit": self.batch_engine_config.retry_limit,
327-
"completions": 1,
328-
"parallelism": 1,
327+
"completions": pods,
328+
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
329329
"completionMode": "Indexed",
330330
"template": {
331331
"metadata": {
@@ -342,7 +342,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
342342
"env": [
343343
{
344344
"name": "BYTEWAX_REPLICAS",
345-
"value": "1",
345+
"value": f"{pods}",
346346
}
347347
],
348348
"image": "busybox",

sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import yaml
23

34
from feast import FeatureStore, RepoConfig
@@ -19,4 +20,5 @@
1920
config,
2021
store.get_feature_view(bytewax_config["feature_view"]),
2122
bytewax_config["paths"],
23+
int(os.environ["JOB_COMPLETION_INDEX"])
2224
)

0 commit comments

Comments
 (0)