@@ -61,6 +61,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
61
61
include_security_context_capabilities : bool = True
62
62
""" (optional) Include security context capabilities in the init and job container spec """
63
63
64
+ labels : dict = {}
65
+ """ (optional) additional labels to append to kubernetes objects """
66
+
67
+ max_parallelism : int = 10
68
+ """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
69
+
64
70
65
71
class BytewaxMaterializationEngine (BatchMaterializationEngine ):
66
72
def __init__ (
@@ -82,7 +88,7 @@ def __init__(
82
88
self .online_store = online_store
83
89
84
90
# TODO: Configure k8s here
85
- k8s_config .load_kube_config ()
91
+ k8s_config .load_config ()
86
92
87
93
self .k8s_client = client .api_client .ApiClient ()
88
94
self .v1 = client .CoreV1Api (self .k8s_client )
@@ -196,14 +202,13 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
196
202
{"paths" : paths , "feature_view" : feature_view .name }
197
203
)
198
204
205
+ labels = {"feast-bytewax-materializer" : "configmap" }
199
206
configmap_manifest = {
200
207
"kind" : "ConfigMap" ,
201
208
"apiVersion" : "v1" ,
202
209
"metadata" : {
203
210
"name" : f"feast-{ job_id } " ,
204
- "labels" : {
205
- "feast-bytewax-materializer" : "configmap" ,
206
- },
211
+ "labels" : {** labels , ** self .batch_engine_config .labels },
207
212
},
208
213
"data" : {
209
214
"feature_store.yaml" : feature_store_configuration ,
@@ -260,27 +265,25 @@ def _create_job_definition(self, job_id, namespace, pods, env):
260
265
"drop" : ["ALL" ],
261
266
}
262
267
268
+ job_labels = {"feast-bytewax-materializer" : "job" }
269
+ pod_labels = {"feast-bytewax-materializer" : "pod" }
263
270
job_definition = {
264
271
"apiVersion" : "batch/v1" ,
265
272
"kind" : "Job" ,
266
273
"metadata" : {
267
274
"name" : f"dataflow-{ job_id } " ,
268
275
"namespace" : namespace ,
269
- "labels" : {
270
- "feast-bytewax-materializer" : "job" ,
271
- },
276
+ "labels" : {** job_labels , ** self .batch_engine_config .labels },
272
277
},
273
278
"spec" : {
274
279
"ttlSecondsAfterFinished" : 3600 ,
275
280
"completions" : pods ,
276
- "parallelism" : pods ,
281
+ "parallelism" : min ( pods , self . batch_engine_config . max_parallelism ) ,
277
282
"completionMode" : "Indexed" ,
278
283
"template" : {
279
284
"metadata" : {
280
285
"annotations" : self .batch_engine_config .annotations ,
281
- "labels" : {
282
- "feast-bytewax-materializer" : "pod" ,
283
- },
286
+ "labels" : {** pod_labels , ** self .batch_engine_config .labels },
284
287
},
285
288
"spec" : {
286
289
"restartPolicy" : "Never" ,
0 commit comments