1
1
import json
2
2
import traceback
3
3
import warnings
4
+ from typing import List , Optional
4
5
5
6
import gunicorn .app .base
6
7
import pandas as pd
8
+ from dateutil import parser
7
9
from fastapi import FastAPI , HTTPException , Request , Response , status
8
10
from fastapi .logger import logger
9
11
from fastapi .params import Depends
10
12
from google .protobuf .json_format import MessageToDict , Parse
11
13
from pydantic import BaseModel
12
14
13
15
import feast
14
- from feast import proto_json
16
+ from feast import proto_json , utils
15
17
from feast .data_source import PushMode
16
18
from feast .errors import PushSourceNotFoundException
17
19
from feast .protos .feast .serving .ServingService_pb2 import GetOnlineFeaturesRequest
@@ -31,6 +33,17 @@ class PushFeaturesRequest(BaseModel):
31
33
to : str = "online"
32
34
33
35
36
+ class MaterializeRequest (BaseModel ):
37
+ start_ts : str
38
+ end_ts : str
39
+ feature_views : Optional [List [str ]] = None
40
+
41
+
42
+ class MaterializeIncrementalRequest (BaseModel ):
43
+ end_ts : str
44
+ feature_views : Optional [List [str ]] = None
45
+
46
+
34
47
def get_app (store : "feast.FeatureStore" ):
35
48
proto_json .patch ()
36
49
@@ -134,6 +147,34 @@ def write_to_online_store(body=Depends(get_body)):
134
147
def health ():
135
148
return Response (status_code = status .HTTP_200_OK )
136
149
150
+ @app .post ("/materialize" )
151
+ def materialize (body = Depends (get_body )):
152
+ try :
153
+ request = MaterializeRequest (** json .loads (body ))
154
+ store .materialize (
155
+ utils .make_tzaware (parser .parse (request .start_ts )),
156
+ utils .make_tzaware (parser .parse (request .end_ts )),
157
+ request .feature_views ,
158
+ )
159
+ except Exception as e :
160
+ # Print the original exception on the server side
161
+ logger .exception (traceback .format_exc ())
162
+ # Raise HTTPException to return the error message to the client
163
+ raise HTTPException (status_code = 500 , detail = str (e ))
164
+
165
+ @app .post ("/materialize-incremental" )
166
+ def materialize_incremental (body = Depends (get_body )):
167
+ try :
168
+ request = MaterializeIncrementalRequest (** json .loads (body ))
169
+ store .materialize_incremental (
170
+ utils .make_tzaware (parser .parse (request .end_ts )), request .feature_views
171
+ )
172
+ except Exception as e :
173
+ # Print the original exception on the server side
174
+ logger .exception (traceback .format_exc ())
175
+ # Raise HTTPException to return the error message to the client
176
+ raise HTTPException (status_code = 500 , detail = str (e ))
177
+
137
178
return app
138
179
139
180
0 commit comments