Skip to content

Commit 2118719

Browse files
authored
fix: Added Online Store REST client errors handler (feast-dev#4488)
* Added Online Store rest client errors handler Signed-off-by: Theodor Mihalache <tmihalac@redhat.com> * Added Online Store rest client errors handler - Small refactor to from_error_detail and FeastErrors - Fixed tests Signed-off-by: Theodor Mihalache <tmihalac@redhat.com> * Added Online Store rest client errors handler - Fixed linter Signed-off-by: Theodor Mihalache <tmihalac@redhat.com> --------- Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
1 parent 2bd03fa commit 2118719

File tree

6 files changed

+299
-171
lines changed

6 files changed

+299
-171
lines changed

sdk/python/feast/errors.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def from_error_detail(detail: str) -> Optional["FeastError"]:
5757
module = importlib.import_module(module_name)
5858
class_reference = getattr(module, class_name)
5959

60-
instance = class_reference(message)
60+
instance = class_reference.__new__(class_reference)
6161
setattr(instance, "__overridden_message__", message)
6262
return instance
6363
except Exception as e:
@@ -451,6 +451,9 @@ class PushSourceNotFoundException(FeastError):
451451
def __init__(self, push_source_name: str):
452452
super().__init__(f"Unable to find push source '{push_source_name}'.")
453453

454+
def http_status_code(self) -> int:
455+
return HttpStatusCode.HTTP_422_UNPROCESSABLE_ENTITY
456+
454457

455458
class ReadOnlyRegistryException(FeastError):
456459
def __init__(self):

sdk/python/feast/feature_server.py

+129-149
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import pandas as pd
1010
import psutil
1111
from dateutil import parser
12-
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
12+
from fastapi import Depends, FastAPI, Request, Response, status
1313
from fastapi.logger import logger
14+
from fastapi.responses import JSONResponse
1415
from google.protobuf.json_format import MessageToDict
1516
from prometheus_client import Gauge, start_http_server
1617
from pydantic import BaseModel
@@ -19,7 +20,10 @@
1920
from feast import proto_json, utils
2021
from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL
2122
from feast.data_source import PushMode
22-
from feast.errors import FeatureViewNotFoundException, PushSourceNotFoundException
23+
from feast.errors import (
24+
FeastError,
25+
FeatureViewNotFoundException,
26+
)
2327
from feast.permissions.action import WRITE, AuthzedAction
2428
from feast.permissions.security_manager import assert_permissions
2529
from feast.permissions.server.rest import inject_user_details
@@ -101,187 +105,163 @@ async def lifespan(app: FastAPI):
101105
async def get_body(request: Request):
102106
return await request.body()
103107

104-
# TODO RBAC: complete the dependencies for the other endpoints
105108
@app.post(
106109
"/get-online-features",
107110
dependencies=[Depends(inject_user_details)],
108111
)
109112
def get_online_features(body=Depends(get_body)):
110-
try:
111-
body = json.loads(body)
112-
full_feature_names = body.get("full_feature_names", False)
113-
entity_rows = body["entities"]
114-
# Initialize parameters for FeatureStore.get_online_features(...) call
115-
if "feature_service" in body:
116-
feature_service = store.get_feature_service(
117-
body["feature_service"], allow_cache=True
113+
body = json.loads(body)
114+
full_feature_names = body.get("full_feature_names", False)
115+
entity_rows = body["entities"]
116+
# Initialize parameters for FeatureStore.get_online_features(...) call
117+
if "feature_service" in body:
118+
feature_service = store.get_feature_service(
119+
body["feature_service"], allow_cache=True
120+
)
121+
assert_permissions(
122+
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
123+
)
124+
features = feature_service
125+
else:
126+
features = body["features"]
127+
all_feature_views, all_on_demand_feature_views = (
128+
utils._get_feature_views_to_use(
129+
store.registry,
130+
store.project,
131+
features,
132+
allow_cache=True,
133+
hide_dummy_entity=False,
118134
)
135+
)
136+
for feature_view in all_feature_views:
119137
assert_permissions(
120-
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
138+
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
121139
)
122-
features = feature_service
123-
else:
124-
features = body["features"]
125-
all_feature_views, all_on_demand_feature_views = (
126-
utils._get_feature_views_to_use(
127-
store.registry,
128-
store.project,
129-
features,
130-
allow_cache=True,
131-
hide_dummy_entity=False,
132-
)
140+
for od_feature_view in all_on_demand_feature_views:
141+
assert_permissions(
142+
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
133143
)
134-
for feature_view in all_feature_views:
135-
assert_permissions(
136-
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
137-
)
138-
for od_feature_view in all_on_demand_feature_views:
139-
assert_permissions(
140-
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
141-
)
142-
143-
response_proto = store.get_online_features(
144-
features=features,
145-
entity_rows=entity_rows,
146-
full_feature_names=full_feature_names,
147-
).proto
148-
149-
# Convert the Protobuf object to JSON and return it
150-
return MessageToDict(
151-
response_proto, preserving_proto_field_name=True, float_precision=18
152-
)
153-
except Exception as e:
154-
# Print the original exception on the server side
155-
logger.exception(traceback.format_exc())
156-
# Raise HTTPException to return the error message to the client
157-
raise HTTPException(status_code=500, detail=str(e))
144+
145+
response_proto = store.get_online_features(
146+
features=features,
147+
entity_rows=entity_rows,
148+
full_feature_names=full_feature_names,
149+
).proto
150+
151+
# Convert the Protobuf object to JSON and return it
152+
return MessageToDict(
153+
response_proto, preserving_proto_field_name=True, float_precision=18
154+
)
158155

159156
@app.post("/push", dependencies=[Depends(inject_user_details)])
160157
def push(body=Depends(get_body)):
161-
try:
162-
request = PushFeaturesRequest(**json.loads(body))
163-
df = pd.DataFrame(request.df)
164-
actions = []
165-
if request.to == "offline":
166-
to = PushMode.OFFLINE
167-
actions = [AuthzedAction.WRITE_OFFLINE]
168-
elif request.to == "online":
169-
to = PushMode.ONLINE
170-
actions = [AuthzedAction.WRITE_ONLINE]
171-
elif request.to == "online_and_offline":
172-
to = PushMode.ONLINE_AND_OFFLINE
173-
actions = WRITE
174-
else:
175-
raise ValueError(
176-
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
177-
)
178-
179-
from feast.data_source import PushSource
158+
request = PushFeaturesRequest(**json.loads(body))
159+
df = pd.DataFrame(request.df)
160+
actions = []
161+
if request.to == "offline":
162+
to = PushMode.OFFLINE
163+
actions = [AuthzedAction.WRITE_OFFLINE]
164+
elif request.to == "online":
165+
to = PushMode.ONLINE
166+
actions = [AuthzedAction.WRITE_ONLINE]
167+
elif request.to == "online_and_offline":
168+
to = PushMode.ONLINE_AND_OFFLINE
169+
actions = WRITE
170+
else:
171+
raise ValueError(
172+
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
173+
)
180174

181-
all_fvs = store.list_feature_views(
182-
allow_cache=request.allow_registry_cache
183-
) + store.list_stream_feature_views(
184-
allow_cache=request.allow_registry_cache
175+
from feast.data_source import PushSource
176+
177+
all_fvs = store.list_feature_views(
178+
allow_cache=request.allow_registry_cache
179+
) + store.list_stream_feature_views(allow_cache=request.allow_registry_cache)
180+
fvs_with_push_sources = {
181+
fv
182+
for fv in all_fvs
183+
if (
184+
fv.stream_source is not None
185+
and isinstance(fv.stream_source, PushSource)
186+
and fv.stream_source.name == request.push_source_name
185187
)
186-
fvs_with_push_sources = {
187-
fv
188-
for fv in all_fvs
189-
if (
190-
fv.stream_source is not None
191-
and isinstance(fv.stream_source, PushSource)
192-
and fv.stream_source.name == request.push_source_name
193-
)
194-
}
188+
}
195189

196-
for feature_view in fvs_with_push_sources:
197-
assert_permissions(resource=feature_view, actions=actions)
190+
for feature_view in fvs_with_push_sources:
191+
assert_permissions(resource=feature_view, actions=actions)
198192

199-
store.push(
200-
push_source_name=request.push_source_name,
201-
df=df,
202-
allow_registry_cache=request.allow_registry_cache,
203-
to=to,
204-
)
205-
except PushSourceNotFoundException as e:
206-
# Print the original exception on the server side
207-
logger.exception(traceback.format_exc())
208-
# Raise HTTPException to return the error message to the client
209-
raise HTTPException(status_code=422, detail=str(e))
210-
except Exception as e:
211-
# Print the original exception on the server side
212-
logger.exception(traceback.format_exc())
213-
# Raise HTTPException to return the error message to the client
214-
raise HTTPException(status_code=500, detail=str(e))
193+
store.push(
194+
push_source_name=request.push_source_name,
195+
df=df,
196+
allow_registry_cache=request.allow_registry_cache,
197+
to=to,
198+
)
215199

216200
@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
217201
def write_to_online_store(body=Depends(get_body)):
202+
request = WriteToFeatureStoreRequest(**json.loads(body))
203+
df = pd.DataFrame(request.df)
204+
feature_view_name = request.feature_view_name
205+
allow_registry_cache = request.allow_registry_cache
218206
try:
219-
request = WriteToFeatureStoreRequest(**json.loads(body))
220-
df = pd.DataFrame(request.df)
221-
feature_view_name = request.feature_view_name
222-
allow_registry_cache = request.allow_registry_cache
223-
try:
224-
feature_view = store.get_stream_feature_view(
225-
feature_view_name, allow_registry_cache=allow_registry_cache
226-
)
227-
except FeatureViewNotFoundException:
228-
feature_view = store.get_feature_view(
229-
feature_view_name, allow_registry_cache=allow_registry_cache
230-
)
231-
232-
assert_permissions(
233-
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
207+
feature_view = store.get_stream_feature_view(
208+
feature_view_name, allow_registry_cache=allow_registry_cache
234209
)
235-
store.write_to_online_store(
236-
feature_view_name=feature_view_name,
237-
df=df,
238-
allow_registry_cache=allow_registry_cache,
210+
except FeatureViewNotFoundException:
211+
feature_view = store.get_feature_view(
212+
feature_view_name, allow_registry_cache=allow_registry_cache
239213
)
240-
except Exception as e:
241-
# Print the original exception on the server side
242-
logger.exception(traceback.format_exc())
243-
# Raise HTTPException to return the error message to the client
244-
raise HTTPException(status_code=500, detail=str(e))
214+
215+
assert_permissions(resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE])
216+
store.write_to_online_store(
217+
feature_view_name=feature_view_name,
218+
df=df,
219+
allow_registry_cache=allow_registry_cache,
220+
)
245221

246222
@app.get("/health")
247223
def health():
248224
return Response(status_code=status.HTTP_200_OK)
249225

250226
@app.post("/materialize", dependencies=[Depends(inject_user_details)])
251227
def materialize(body=Depends(get_body)):
252-
try:
253-
request = MaterializeRequest(**json.loads(body))
254-
for feature_view in request.feature_views:
255-
assert_permissions(
256-
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
257-
)
258-
store.materialize(
259-
utils.make_tzaware(parser.parse(request.start_ts)),
260-
utils.make_tzaware(parser.parse(request.end_ts)),
261-
request.feature_views,
228+
request = MaterializeRequest(**json.loads(body))
229+
for feature_view in request.feature_views:
230+
assert_permissions(
231+
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
262232
)
263-
except Exception as e:
264-
# Print the original exception on the server side
265-
logger.exception(traceback.format_exc())
266-
# Raise HTTPException to return the error message to the client
267-
raise HTTPException(status_code=500, detail=str(e))
233+
store.materialize(
234+
utils.make_tzaware(parser.parse(request.start_ts)),
235+
utils.make_tzaware(parser.parse(request.end_ts)),
236+
request.feature_views,
237+
)
268238

269239
@app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)])
270240
def materialize_incremental(body=Depends(get_body)):
271-
try:
272-
request = MaterializeIncrementalRequest(**json.loads(body))
273-
for feature_view in request.feature_views:
274-
assert_permissions(
275-
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
276-
)
277-
store.materialize_incremental(
278-
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
241+
request = MaterializeIncrementalRequest(**json.loads(body))
242+
for feature_view in request.feature_views:
243+
assert_permissions(
244+
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
245+
)
246+
store.materialize_incremental(
247+
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
248+
)
249+
250+
@app.exception_handler(Exception)
251+
async def rest_exception_handler(request: Request, exc: Exception):
252+
# Print the original exception on the server side
253+
logger.exception(traceback.format_exc())
254+
255+
if isinstance(exc, FeastError):
256+
return JSONResponse(
257+
status_code=exc.http_status_code(),
258+
content=exc.to_error_detail(),
259+
)
260+
else:
261+
return JSONResponse(
262+
status_code=500,
263+
content=str(exc),
279264
)
280-
except Exception as e:
281-
# Print the original exception on the server side
282-
logger.exception(traceback.format_exc())
283-
# Raise HTTPException to return the error message to the client
284-
raise HTTPException(status_code=500, detail=str(e))
285265

286266
return app
287267

sdk/python/feast/infra/online_stores/remote.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
from datetime import datetime
1717
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple
1818

19+
import requests
1920
from pydantic import StrictStr
2021

2122
from feast import Entity, FeatureView, RepoConfig
2223
from feast.infra.online_stores.online_store import OnlineStore
23-
from feast.permissions.client.http_auth_requests_wrapper import (
24-
get_http_auth_requests_session,
25-
)
2624
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2725
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2826
from feast.repo_config import FeastConfigBaseModel
27+
from feast.rest_error_handler import rest_error_handling_decorator
2928
from feast.type_map import python_values_to_proto_values
3029
from feast.value_type import ValueType
3130

@@ -72,9 +71,7 @@ def online_read(
7271
req_body = self._construct_online_read_api_json_request(
7372
entity_keys, table, requested_features
7473
)
75-
response = get_http_auth_requests_session(config.auth_config).post(
76-
f"{config.online_store.path}/get-online-features", data=req_body
77-
)
74+
response = get_remote_online_features(config=config, req_body=req_body)
7875
if response.status_code == 200:
7976
logger.debug("Able to retrieve the online features from feature server.")
8077
response_json = json.loads(response.text)
@@ -167,3 +164,12 @@ def teardown(
167164
entities: Sequence[Entity],
168165
):
169166
pass
167+
168+
169+
@rest_error_handling_decorator
170+
def get_remote_online_features(
171+
session: requests.Session, config: RepoConfig, req_body: str
172+
) -> requests.Response:
173+
return session.post(
174+
f"{config.online_store.path}/get-online-features", data=req_body
175+
)

0 commit comments

Comments
 (0)