1
1
import json
2
+ import threading
2
3
import traceback
3
4
import warnings
4
5
from typing import List , Optional
@@ -44,14 +45,37 @@ class MaterializeIncrementalRequest(BaseModel):
44
45
feature_views : Optional [List [str ]] = None
45
46
46
47
47
- def get_app (store : "feast.FeatureStore" ):
48
+ def get_app (store : "feast.FeatureStore" , registry_ttl_sec : int = 5 ):
48
49
proto_json .patch ()
49
50
50
51
app = FastAPI ()
52
+ # Asynchronously refresh registry, notifying shutdown and canceling the active timer if the app is shutting down
53
+ registry_proto = None
54
+ shutting_down = False
55
+ active_timer : Optional [threading .Timer ] = None
51
56
52
57
async def get_body (request : Request ):
53
58
return await request .body ()
54
59
60
+ def async_refresh ():
61
+ store .refresh_registry ()
62
+ nonlocal registry_proto
63
+ registry_proto = store .registry .proto ()
64
+ if shutting_down :
65
+ return
66
+ nonlocal active_timer
67
+ active_timer = threading .Timer (registry_ttl_sec , async_refresh )
68
+ active_timer .start ()
69
+
70
+ @app .on_event ("shutdown" )
71
+ def shutdown_event ():
72
+ nonlocal shutting_down
73
+ shutting_down = True
74
+ if active_timer :
75
+ active_timer .cancel ()
76
+
77
+ async_refresh ()
78
+
55
79
@app .post ("/get-online-features" )
56
80
def get_online_features (body = Depends (get_body )):
57
81
try :
@@ -180,7 +204,10 @@ def materialize_incremental(body=Depends(get_body)):
180
204
181
205
class FeastServeApplication (gunicorn .app .base .BaseApplication ):
182
206
def __init__ (self , store : "feast.FeatureStore" , ** options ):
183
- self ._app = get_app (store = store )
207
+ self ._app = get_app (
208
+ store = store ,
209
+ registry_ttl_sec = options .get ("registry_ttl_sec" , 5 ),
210
+ )
184
211
self ._options = options
185
212
super ().__init__ ()
186
213
@@ -202,11 +229,13 @@ def start_server(
202
229
no_access_log : bool ,
203
230
workers : int ,
204
231
keep_alive_timeout : int ,
232
+ registry_ttl_sec : int = 5 ,
205
233
):
206
234
FeastServeApplication (
207
235
store = store ,
208
236
bind = f"{ host } :{ port } " ,
209
237
accesslog = None if no_access_log else "-" ,
210
238
workers = workers ,
211
239
keepalive = keep_alive_timeout ,
240
+ registry_ttl_sec = registry_ttl_sec ,
212
241
).run ()
0 commit comments