Skip to content

Commit 417b16b

Browse files
fix: Redundant feature materialization and premature incremental materialization timestamp updates (feast-dev#3789)
* SAASMLOPS-767 wait for jobs to complete Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * SAASMLOPS-805 Stopgap change to fix duplicate materialization of data Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * SAASMLOPS-805 save BYTEWAX_REPLICAS=1 Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * SAASMLOPS-809 fix bytewax workers so they only process a single file (feast-dev#6) * SAASMLOPS-809 fix bytewax workers so they only process a single file * SAASMLOPS-809 fix newlines Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * SAASMLOPS-833 add configurable job timeout (feast-dev#7) * SAASMLOPS-833 add configurable job timeout * SAASMLOPS-833 fix whitespace Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * develop Run large materializations in batches of pods Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * master Set job_batch_size at least equal to max_parallelism Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * master clarity max_parallelism description Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * master resolve bug that causes materialization to continue after job error Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> * master resolve bug causing pod logs to not be printed Signed-off-by: James Crabtree <james.crabtree@sailpoint.com> --------- Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
1 parent b3852bf commit 417b16b

File tree

5 files changed

+139
-19
lines changed

5 files changed

+139
-19
lines changed

sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ COPY README.md README.md
2525
# git dir to infer the version of feast we're installing.
2626
# https://github.com/pypa/setuptools_scm#usage-from-docker
2727
# I think it also assumes that this dockerfile is being built from the root of the directory.
28-
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,gcp,bytewax]'
28+
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake]'
2929

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pyarrow.parquet as pq
66
from bytewax.dataflow import Dataflow # type: ignore
77
from bytewax.execution import cluster_main
8-
from bytewax.inputs import ManualInputConfig, distribute
8+
from bytewax.inputs import ManualInputConfig
99
from bytewax.outputs import ManualOutputConfig
1010
from tqdm import tqdm
1111

@@ -21,11 +21,13 @@ def __init__(
2121
config: RepoConfig,
2222
feature_view: FeatureView,
2323
paths: List[str],
24+
worker_index: int,
2425
):
2526
self.config = config
2627
self.feature_store = FeatureStore(config=config)
2728

2829
self.feature_view = feature_view
30+
self.worker_index = worker_index
2931
self.paths = paths
3032

3133
self._run_dataflow()
@@ -40,11 +42,7 @@ def process_path(self, path):
4042
return batches
4143

4244
def input_builder(self, worker_index, worker_count, _state):
43-
worker_paths = distribute(self.paths, worker_index, worker_count)
44-
for path in worker_paths:
45-
yield None, path
46-
47-
return
45+
return [(None, self.paths[self.worker_index])]
4846

4947
def output_builder(self, worker_index, worker_count):
5048
def yield_batch(iterable, batch_size):

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py

+125-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
import logging
12
import uuid
23
from datetime import datetime
4+
from time import sleep
35
from typing import Callable, List, Literal, Sequence, Union
46

57
import yaml
68
from kubernetes import client
79
from kubernetes import config as k8s_config
810
from kubernetes import utils
11+
from kubernetes.client.exceptions import ApiException
912
from kubernetes.utils import FailToCreateError
1013
from pydantic import StrictStr
1114
from tqdm import tqdm
@@ -16,6 +19,7 @@
1619
from feast.infra.materialization.batch_materialization_engine import (
1720
BatchMaterializationEngine,
1821
MaterializationJob,
22+
MaterializationJobStatus,
1923
MaterializationTask,
2024
)
2125
from feast.infra.offline_stores.offline_store import OfflineStore
@@ -27,6 +31,8 @@
2731

2832
from .bytewax_materialization_job import BytewaxMaterializationJob
2933

34+
logger = logging.getLogger(__name__)
35+
3036

3137
class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
3238
"""Batch Materialization Engine config for Bytewax"""
@@ -65,11 +71,26 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
6571
""" (optional) additional labels to append to kubernetes objects """
6672

6773
max_parallelism: int = 10
68-
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
74+
""" (optional) Maximum number of pods allowed to run in parallel"""
75+
76+
synchronous: bool = False
77+
""" (optional) If true, wait for materialization for one feature to complete before moving to the next """
78+
79+
retry_limit: int = 2
80+
""" (optional) Maximum number of times to retry a materialization worker pod"""
6981

7082
mini_batch_size: int = 1000
7183
""" (optional) Number of rows to process per write operation (default 1000)"""
7284

85+
active_deadline_seconds: int = 86400
86+
""" (optional) Maximum amount of time a materialization job is allowed to run"""
87+
88+
job_batch_size: int = 100
89+
""" (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""
90+
91+
print_pod_logs_on_failure: bool = True
92+
"""(optional) Print pod logs on job failure. Only applies to synchronous materialization"""
93+
7394

7495
class BytewaxMaterializationEngine(BatchMaterializationEngine):
7596
def __init__(
@@ -173,8 +194,98 @@ def _materialize_one(
173194
)
174195

175196
paths = offline_job.to_remote_storage()
197+
if self.batch_engine_config.synchronous:
198+
offset = 0
199+
total_pods = len(paths)
200+
batch_size = self.batch_engine_config.job_batch_size
201+
if batch_size < 1:
202+
raise ValueError("job_batch_size must be a value greater than 0")
203+
if batch_size < self.batch_engine_config.max_parallelism:
204+
logger.warning(
205+
"job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism"
206+
)
207+
batch_size = self.batch_engine_config.max_parallelism
208+
209+
while True:
210+
next_offset = min(offset + batch_size, total_pods)
211+
job = self._await_path_materialization(
212+
paths[offset:next_offset],
213+
feature_view,
214+
offset,
215+
next_offset,
216+
total_pods,
217+
)
218+
offset += batch_size
219+
if (
220+
offset >= total_pods
221+
or job.status() == MaterializationJobStatus.ERROR
222+
):
223+
break
224+
else:
225+
job_id = str(uuid.uuid4())
226+
job = self._create_kubernetes_job(job_id, paths, feature_view)
227+
228+
return job
229+
230+
def _await_path_materialization(
231+
self, paths, feature_view, batch_start, batch_end, total_pods
232+
):
176233
job_id = str(uuid.uuid4())
177-
return self._create_kubernetes_job(job_id, paths, feature_view)
234+
job = self._create_kubernetes_job(job_id, paths, feature_view)
235+
236+
try:
237+
while job.status() in (
238+
MaterializationJobStatus.WAITING,
239+
MaterializationJobStatus.RUNNING,
240+
):
241+
logger.info(
242+
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
243+
f"(of {total_pods}) running..."
244+
)
245+
sleep(30)
246+
logger.info(
247+
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
248+
f"(of {total_pods}) complete with status {job.status()}"
249+
)
250+
except BaseException as e:
251+
logger.info(f"Deleting job {job.job_id()}")
252+
try:
253+
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
254+
except ApiException as ae:
255+
logger.warning(f"Could not delete job due to API Error: {ae.body}")
256+
raise e
257+
finally:
258+
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
259+
try:
260+
self.v1.delete_namespaced_config_map(
261+
self._configmap_name(job_id), self.namespace
262+
)
263+
except ApiException as ae:
264+
logger.warning(
265+
f"Could not delete configmap due to API Error: {ae.body}"
266+
)
267+
268+
if (
269+
job.status() == MaterializationJobStatus.ERROR
270+
and self.batch_engine_config.print_pod_logs_on_failure
271+
):
272+
self._print_pod_logs(job.job_id(), feature_view, batch_start)
273+
274+
return job
275+
276+
def _print_pod_logs(self, job_id, feature_view, offset=0):
277+
pods_list = self.v1.list_namespaced_pod(
278+
namespace=self.namespace,
279+
label_selector=f"job-name={job_id}",
280+
).items
281+
for i, pod in enumerate(pods_list):
282+
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
283+
try:
284+
logger.info(
285+
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
286+
)
287+
except ApiException as e:
288+
logger.warning(f"Could not retrieve pod logs due to: {e.body}")
178289

179290
def _create_kubernetes_job(self, job_id, paths, feature_view):
180291
try:
@@ -210,7 +321,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
210321
"kind": "ConfigMap",
211322
"apiVersion": "v1",
212323
"metadata": {
213-
"name": f"feast-{job_id}",
324+
"name": self._configmap_name(job_id),
214325
"labels": {**labels, **self.batch_engine_config.labels},
215326
},
216327
"data": {
@@ -223,7 +334,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
223334
body=configmap_manifest,
224335
)
225336

226-
def _create_job_definition(self, job_id, namespace, pods, env):
337+
def _configmap_name(self, job_id):
338+
return f"feast-{job_id}"
339+
340+
def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
227341
"""Create a kubernetes job definition."""
228342
job_env = [
229343
{"name": "RUST_BACKTRACE", "value": "full"},
@@ -284,8 +398,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
284398
},
285399
"spec": {
286400
"ttlSecondsAfterFinished": 3600,
401+
"backoffLimit": self.batch_engine_config.retry_limit,
287402
"completions": pods,
288403
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
404+
"activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds,
289405
"completionMode": "Indexed",
290406
"template": {
291407
"metadata": {
@@ -324,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
324440
},
325441
{
326442
"mountPath": "/var/feast/",
327-
"name": f"feast-{job_id}",
443+
"name": self._configmap_name(job_id),
328444
},
329445
],
330446
}
@@ -355,7 +471,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
355471
{"mountPath": "/etc/bytewax", "name": "hostfile"},
356472
{
357473
"mountPath": "/var/feast/",
358-
"name": f"feast-{job_id}",
474+
"name": self._configmap_name(job_id),
359475
},
360476
],
361477
}
@@ -365,13 +481,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
365481
{
366482
"configMap": {
367483
"defaultMode": 420,
368-
"name": f"feast-{job_id}",
484+
"name": self._configmap_name(job_id),
369485
},
370486
"name": "python-files",
371487
},
372488
{
373-
"configMap": {"name": f"feast-{job_id}"},
374-
"name": f"feast-{job_id}",
489+
"configMap": {"name": self._configmap_name(job_id)},
490+
"name": self._configmap_name(job_id),
375491
},
376492
],
377493
},

sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,13 @@ def status(self):
3636
if job_status.completion_time is None:
3737
return MaterializationJobStatus.RUNNING
3838
elif job_status.failed is not None:
39+
self._error = Exception(f"Job {self.job_id()} failed")
3940
return MaterializationJobStatus.ERROR
40-
elif job_status.active is None and job_status.succeeded is not None:
41-
if job_status.conditions[0].type == "Complete":
42-
return MaterializationJobStatus.SUCCEEDED
41+
elif job_status.active is None:
42+
if job_status.completion_time is not None:
43+
if job_status.conditions[0].type == "Complete":
44+
return MaterializationJobStatus.SUCCEEDED
45+
return MaterializationJobStatus.WAITING
4346

4447
def should_be_retried(self):
4548
return False

sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import os
2+
13
import yaml
24

35
from feast import FeatureStore, RepoConfig
@@ -19,4 +21,5 @@
1921
config,
2022
store.get_feature_view(bytewax_config["feature_view"]),
2123
bytewax_config["paths"],
24+
int(os.environ["JOB_COMPLETION_INDEX"]),
2225
)

0 commit comments

Comments
 (0)