1
+ import logging
1
2
import uuid
2
3
from datetime import datetime
4
+ from time import sleep
3
5
from typing import Callable , List , Literal , Sequence , Union
4
6
5
7
import yaml
6
8
from kubernetes import client
7
9
from kubernetes import config as k8s_config
8
10
from kubernetes import utils
11
+ from kubernetes .client .exceptions import ApiException
9
12
from kubernetes .utils import FailToCreateError
10
13
from pydantic import StrictStr
11
14
from tqdm import tqdm
16
19
from feast .infra .materialization .batch_materialization_engine import (
17
20
BatchMaterializationEngine ,
18
21
MaterializationJob ,
22
+ MaterializationJobStatus ,
19
23
MaterializationTask ,
20
24
)
21
25
from feast .infra .offline_stores .offline_store import OfflineStore
27
31
28
32
from .bytewax_materialization_job import BytewaxMaterializationJob
29
33
34
+ logger = logging .getLogger (__name__ )
35
+
30
36
31
37
class BytewaxMaterializationEngineConfig (FeastConfigBaseModel ):
32
38
"""Batch Materialization Engine config for Bytewax"""
@@ -65,11 +71,26 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
65
71
""" (optional) additional labels to append to kubernetes objects """
66
72
67
73
max_parallelism : int = 10
68
- """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""
74
+ """ (optional) Maximum number of pods allowed to run in parallel"""
75
+
76
+ synchronous : bool = False
77
+ """ (optional) If true, wait for materialization for one feature to complete before moving to the next """
78
+
79
+ retry_limit : int = 2
80
+ """ (optional) Maximum number of times to retry a materialization worker pod"""
69
81
70
82
mini_batch_size : int = 1000
71
83
""" (optional) Number of rows to process per write operation (default 1000)"""
72
84
85
+ active_deadline_seconds : int = 86400
86
+ """ (optional) Maximum amount of time a materialization job is allowed to run"""
87
+
88
+ job_batch_size : int = 100
89
+ """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization"""
90
+
91
+ print_pod_logs_on_failure : bool = True
92
+ """(optional) Print pod logs on job failure. Only applies to synchronous materialization"""
93
+
73
94
74
95
class BytewaxMaterializationEngine (BatchMaterializationEngine ):
75
96
def __init__ (
@@ -173,8 +194,98 @@ def _materialize_one(
173
194
)
174
195
175
196
paths = offline_job .to_remote_storage ()
197
+ if self .batch_engine_config .synchronous :
198
+ offset = 0
199
+ total_pods = len (paths )
200
+ batch_size = self .batch_engine_config .job_batch_size
201
+ if batch_size < 1 :
202
+ raise ValueError ("job_batch_size must be a value greater than 0" )
203
+ if batch_size < self .batch_engine_config .max_parallelism :
204
+ logger .warning (
205
+ "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism"
206
+ )
207
+ batch_size = self .batch_engine_config .max_parallelism
208
+
209
+ while True :
210
+ next_offset = min (offset + batch_size , total_pods )
211
+ job = self ._await_path_materialization (
212
+ paths [offset :next_offset ],
213
+ feature_view ,
214
+ offset ,
215
+ next_offset ,
216
+ total_pods ,
217
+ )
218
+ offset += batch_size
219
+ if (
220
+ offset >= total_pods
221
+ or job .status () == MaterializationJobStatus .ERROR
222
+ ):
223
+ break
224
+ else :
225
+ job_id = str (uuid .uuid4 ())
226
+ job = self ._create_kubernetes_job (job_id , paths , feature_view )
227
+
228
+ return job
229
+
230
+ def _await_path_materialization (
231
+ self , paths , feature_view , batch_start , batch_end , total_pods
232
+ ):
176
233
job_id = str (uuid .uuid4 ())
177
- return self ._create_kubernetes_job (job_id , paths , feature_view )
234
+ job = self ._create_kubernetes_job (job_id , paths , feature_view )
235
+
236
+ try :
237
+ while job .status () in (
238
+ MaterializationJobStatus .WAITING ,
239
+ MaterializationJobStatus .RUNNING ,
240
+ ):
241
+ logger .info (
242
+ f"{ feature_view .name } materialization for pods { batch_start } -{ batch_end } "
243
+ f"(of { total_pods } ) running..."
244
+ )
245
+ sleep (30 )
246
+ logger .info (
247
+ f"{ feature_view .name } materialization for pods { batch_start } -{ batch_end } "
248
+ f"(of { total_pods } ) complete with status { job .status ()} "
249
+ )
250
+ except BaseException as e :
251
+ logger .info (f"Deleting job { job .job_id ()} " )
252
+ try :
253
+ self .batch_v1 .delete_namespaced_job (job .job_id (), self .namespace )
254
+ except ApiException as ae :
255
+ logger .warning (f"Could not delete job due to API Error: { ae .body } " )
256
+ raise e
257
+ finally :
258
+ logger .info (f"Deleting configmap { self ._configmap_name (job_id )} " )
259
+ try :
260
+ self .v1 .delete_namespaced_config_map (
261
+ self ._configmap_name (job_id ), self .namespace
262
+ )
263
+ except ApiException as ae :
264
+ logger .warning (
265
+ f"Could not delete configmap due to API Error: { ae .body } "
266
+ )
267
+
268
+ if (
269
+ job .status () == MaterializationJobStatus .ERROR
270
+ and self .batch_engine_config .print_pod_logs_on_failure
271
+ ):
272
+ self ._print_pod_logs (job .job_id (), feature_view , batch_start )
273
+
274
+ return job
275
+
276
+ def _print_pod_logs (self , job_id , feature_view , offset = 0 ):
277
+ pods_list = self .v1 .list_namespaced_pod (
278
+ namespace = self .namespace ,
279
+ label_selector = f"job-name={ job_id } " ,
280
+ ).items
281
+ for i , pod in enumerate (pods_list ):
282
+ logger .info (f"Logging output for { feature_view .name } pod { offset + i } " )
283
+ try :
284
+ logger .info (
285
+ self .v1 .read_namespaced_pod_log (pod .metadata .name , self .namespace )
286
+ )
287
+ except ApiException as e :
288
+ logger .warning (f"Could not retrieve pod logs due to: { e .body } " )
178
289
179
290
def _create_kubernetes_job (self , job_id , paths , feature_view ):
180
291
try :
@@ -210,7 +321,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
210
321
"kind" : "ConfigMap" ,
211
322
"apiVersion" : "v1" ,
212
323
"metadata" : {
213
- "name" : f"feast- { job_id } " ,
324
+ "name" : self . _configmap_name ( job_id ) ,
214
325
"labels" : {** labels , ** self .batch_engine_config .labels },
215
326
},
216
327
"data" : {
@@ -223,7 +334,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
223
334
body = configmap_manifest ,
224
335
)
225
336
226
- def _create_job_definition (self , job_id , namespace , pods , env ):
337
+ def _configmap_name (self , job_id ):
338
+ return f"feast-{ job_id } "
339
+
340
+ def _create_job_definition (self , job_id , namespace , pods , env , index_offset = 0 ):
227
341
"""Create a kubernetes job definition."""
228
342
job_env = [
229
343
{"name" : "RUST_BACKTRACE" , "value" : "full" },
@@ -284,8 +398,10 @@ def _create_job_definition(self, job_id, namespace, pods, env):
284
398
},
285
399
"spec" : {
286
400
"ttlSecondsAfterFinished" : 3600 ,
401
+ "backoffLimit" : self .batch_engine_config .retry_limit ,
287
402
"completions" : pods ,
288
403
"parallelism" : min (pods , self .batch_engine_config .max_parallelism ),
404
+ "activeDeadlineSeconds" : self .batch_engine_config .active_deadline_seconds ,
289
405
"completionMode" : "Indexed" ,
290
406
"template" : {
291
407
"metadata" : {
@@ -324,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
324
440
},
325
441
{
326
442
"mountPath" : "/var/feast/" ,
327
- "name" : f"feast- { job_id } " ,
443
+ "name" : self . _configmap_name ( job_id ) ,
328
444
},
329
445
],
330
446
}
@@ -355,7 +471,7 @@ def _create_job_definition(self, job_id, namespace, pods, env):
355
471
{"mountPath" : "/etc/bytewax" , "name" : "hostfile" },
356
472
{
357
473
"mountPath" : "/var/feast/" ,
358
- "name" : f"feast- { job_id } " ,
474
+ "name" : self . _configmap_name ( job_id ) ,
359
475
},
360
476
],
361
477
}
@@ -365,13 +481,13 @@ def _create_job_definition(self, job_id, namespace, pods, env):
365
481
{
366
482
"configMap" : {
367
483
"defaultMode" : 420 ,
368
- "name" : f"feast- { job_id } " ,
484
+ "name" : self . _configmap_name ( job_id ) ,
369
485
},
370
486
"name" : "python-files" ,
371
487
},
372
488
{
373
- "configMap" : {"name" : f"feast- { job_id } " },
374
- "name" : f"feast- { job_id } " ,
489
+ "configMap" : {"name" : self . _configmap_name ( job_id ) },
490
+ "name" : self . _configmap_name ( job_id ) ,
375
491
},
376
492
],
377
493
},
0 commit comments