Skip to content

Commit 484240c

Browse files
authored
fix: Initial commit targetting grpc registry server (feast-dev#4458)
* initial commit targetting grpc registry server Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * refactor: Introduced base class FeastError for all Feast exceptions (feast-dev#4465) introduced base class FeastError for all Feast exceptions, with initial methods to map the grpc and HTTP status code Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * initial commit targetting grpc registry server Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * fixed merge error Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * initial commit targetting grpc registry server Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * fixed merge error Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * integrated comment Signed-off-by: Daniele Martinoli <dmartino@redhat.com> * moved imports as per comment Signed-off-by: Daniele Martinoli <dmartino@redhat.com> --------- Signed-off-by: Daniele Martinoli <dmartino@redhat.com>
1 parent c365b4e commit 484240c

File tree

7 files changed

+197
-36
lines changed

7 files changed

+197
-36
lines changed

sdk/python/feast/errors.py

+52-4
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,69 @@
1-
from typing import Any, List, Set
1+
import importlib
2+
import json
3+
import logging
4+
from typing import Any, List, Optional, Set
25

36
from colorama import Fore, Style
47
from fastapi import status as HttpStatusCode
58
from grpc import StatusCode as GrpcStatusCode
69

710
from feast.field import Field
811

12+
logger = logging.getLogger(__name__)
13+
914

1015
class FeastError(Exception):
1116
pass
1217

13-
def rpc_status_code(self) -> GrpcStatusCode:
18+
def grpc_status_code(self) -> GrpcStatusCode:
1419
return GrpcStatusCode.INTERNAL
1520

1621
def http_status_code(self) -> int:
1722
return HttpStatusCode.HTTP_500_INTERNAL_SERVER_ERROR
1823

24+
def __str__(self) -> str:
25+
if hasattr(self, "__overridden_message__"):
26+
return str(getattr(self, "__overridden_message__"))
27+
return super().__str__()
28+
29+
def __repr__(self) -> str:
30+
if hasattr(self, "__overridden_message__"):
31+
return f"{type(self).__name__}('{getattr(self,'__overridden_message__')}')"
32+
return super().__repr__()
33+
34+
def to_error_detail(self) -> str:
35+
"""
36+
Returns a JSON representation of the error for serialization purposes.
37+
38+
Returns:
39+
str: a string representation of a JSON document including `module`, `class` and `message` fields.
40+
"""
41+
42+
m = {
43+
"module": f"{type(self).__module__}",
44+
"class": f"{type(self).__name__}",
45+
"message": f"{str(self)}",
46+
}
47+
return json.dumps(m)
48+
49+
@staticmethod
50+
def from_error_detail(detail: str) -> Optional["FeastError"]:
51+
try:
52+
m = json.loads(detail)
53+
if all(f in m for f in ["module", "class", "message"]):
54+
module_name = m["module"]
55+
class_name = m["class"]
56+
message = m["message"]
57+
module = importlib.import_module(module_name)
58+
class_reference = getattr(module, class_name)
59+
60+
instance = class_reference(message)
61+
setattr(instance, "__overridden_message__", message)
62+
return instance
63+
except Exception as e:
64+
logger.warning(f"Invalid error detail: {detail}: {e}")
65+
return None
66+
1967

2068
class DataSourceNotFoundException(FeastError):
2169
def __init__(self, path):
@@ -41,7 +89,7 @@ def __init__(self, ds_name: str):
4189
class FeastObjectNotFoundException(FeastError):
4290
pass
4391

44-
def rpc_status_code(self) -> GrpcStatusCode:
92+
def grpc_status_code(self) -> GrpcStatusCode:
4593
return GrpcStatusCode.NOT_FOUND
4694

4795
def http_status_code(self) -> int:
@@ -443,7 +491,7 @@ class FeastPermissionError(FeastError, PermissionError):
443491
def __init__(self, details: str):
444492
super().__init__(f"Permission error:\n{details}")
445493

446-
def rpc_status_code(self) -> GrpcStatusCode:
494+
def grpc_status_code(self) -> GrpcStatusCode:
447495
return GrpcStatusCode.PERMISSION_DENIED
448496

449497
def http_status_code(self) -> int:
+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import grpc
2+
3+
from feast.errors import FeastError
4+
5+
6+
def exception_wrapper(behavior, request, context):
7+
try:
8+
return behavior(request, context)
9+
except grpc.RpcError as e:
10+
context.abort(e.code(), e.details())
11+
except FeastError as e:
12+
context.abort(
13+
e.grpc_status_code(),
14+
e.to_error_detail(),
15+
)
16+
17+
18+
class ErrorInterceptor(grpc.ServerInterceptor):
19+
def intercept_service(self, continuation, handler_call_details):
20+
handler = continuation(handler_call_details)
21+
if handler is None:
22+
return None
23+
24+
if handler.unary_unary:
25+
return grpc.unary_unary_rpc_method_handler(
26+
lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx),
27+
request_deserializer=handler.request_deserializer,
28+
response_serializer=handler.response_serializer,
29+
)
30+
elif handler.unary_stream:
31+
return grpc.unary_stream_rpc_method_handler(
32+
lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx),
33+
request_deserializer=handler.request_deserializer,
34+
response_serializer=handler.response_serializer,
35+
)
36+
elif handler.stream_unary:
37+
return grpc.stream_unary_rpc_method_handler(
38+
lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx),
39+
request_deserializer=handler.request_deserializer,
40+
response_serializer=handler.response_serializer,
41+
)
42+
elif handler.stream_stream:
43+
return grpc.stream_stream_rpc_method_handler(
44+
lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx),
45+
request_deserializer=handler.request_deserializer,
46+
response_serializer=handler.response_serializer,
47+
)
48+
return handler

sdk/python/feast/permissions/client/grpc_client_auth_interceptor.py

+13-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import grpc
44

5+
from feast.errors import FeastError
56
from feast.permissions.auth_model import AuthConfig
67
from feast.permissions.client.auth_client_manager_factory import get_auth_token
78

@@ -20,26 +21,31 @@ def __init__(self, auth_type: AuthConfig):
2021
def intercept_unary_unary(
2122
self, continuation, client_call_details, request_iterator
2223
):
23-
client_call_details = self._append_auth_header_metadata(client_call_details)
24-
return continuation(client_call_details, request_iterator)
24+
return self._handle_call(continuation, client_call_details, request_iterator)
2525

2626
def intercept_unary_stream(
2727
self, continuation, client_call_details, request_iterator
2828
):
29-
client_call_details = self._append_auth_header_metadata(client_call_details)
30-
return continuation(client_call_details, request_iterator)
29+
return self._handle_call(continuation, client_call_details, request_iterator)
3130

3231
def intercept_stream_unary(
3332
self, continuation, client_call_details, request_iterator
3433
):
35-
client_call_details = self._append_auth_header_metadata(client_call_details)
36-
return continuation(client_call_details, request_iterator)
34+
return self._handle_call(continuation, client_call_details, request_iterator)
3735

3836
def intercept_stream_stream(
3937
self, continuation, client_call_details, request_iterator
4038
):
39+
return self._handle_call(continuation, client_call_details, request_iterator)
40+
41+
def _handle_call(self, continuation, client_call_details, request_iterator):
4142
client_call_details = self._append_auth_header_metadata(client_call_details)
42-
return continuation(client_call_details, request_iterator)
43+
result = continuation(client_call_details, request_iterator)
44+
if result.exception() is not None:
45+
mapped_error = FeastError.from_error_detail(result.exception().details())
46+
if mapped_error is not None:
47+
raise mapped_error
48+
return result
4349

4450
def _append_auth_header_metadata(self, client_call_details):
4551
logger.debug(

sdk/python/feast/permissions/server/grpc.py

-22
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,17 @@
11
import asyncio
22
import logging
3-
from typing import Optional
43

54
import grpc
65

76
from feast.permissions.auth.auth_manager import (
87
get_auth_manager,
98
)
109
from feast.permissions.security_manager import get_security_manager
11-
from feast.permissions.server.utils import (
12-
AuthManagerType,
13-
)
1410

1511
logger = logging.getLogger(__name__)
1612
logger.setLevel(logging.INFO)
1713

1814

19-
def grpc_interceptors(
20-
auth_type: AuthManagerType,
21-
) -> Optional[list[grpc.ServerInterceptor]]:
22-
"""
23-
A list of the authorization interceptors.
24-
25-
Args:
26-
auth_type: The type of authorization manager, from the feature store configuration.
27-
28-
Returns:
29-
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
30-
"""
31-
if auth_type == AuthManagerType.NONE:
32-
return None
33-
34-
return [AuthInterceptor()]
35-
36-
3715
class AuthInterceptor(grpc.ServerInterceptor):
3816
def intercept_service(self, continuation, handler_call_details):
3917
sm = get_security_manager()

sdk/python/feast/registry_server.py

+23-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from concurrent import futures
22
from datetime import datetime, timezone
3-
from typing import Union, cast
3+
from typing import Optional, Union, cast
44

55
import grpc
66
from google.protobuf.empty_pb2 import Empty
@@ -13,6 +13,7 @@
1313
from feast.errors import FeatureViewNotFoundException
1414
from feast.feast_object import FeastObject
1515
from feast.feature_view import FeatureView
16+
from feast.grpc_error_interceptor import ErrorInterceptor
1617
from feast.infra.infra_object import Infra
1718
from feast.infra.registry.base_registry import BaseRegistry
1819
from feast.on_demand_feature_view import OnDemandFeatureView
@@ -23,8 +24,9 @@
2324
assert_permissions_to_update,
2425
permitted_resources,
2526
)
26-
from feast.permissions.server.grpc import grpc_interceptors
27+
from feast.permissions.server.grpc import AuthInterceptor
2728
from feast.permissions.server.utils import (
29+
AuthManagerType,
2830
ServerType,
2931
init_auth_manager,
3032
init_security_manager,
@@ -645,7 +647,7 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr
645647

646648
server = grpc.server(
647649
futures.ThreadPoolExecutor(max_workers=10),
648-
interceptors=grpc_interceptors(auth_manager_type),
650+
interceptors=_grpc_interceptors(auth_manager_type),
649651
)
650652
RegistryServer_pb2_grpc.add_RegistryServerServicer_to_server(
651653
RegistryServer(store.registry), server
@@ -668,3 +670,21 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr
668670
server.wait_for_termination()
669671
else:
670672
return server
673+
674+
675+
def _grpc_interceptors(
676+
auth_type: AuthManagerType,
677+
) -> Optional[list[grpc.ServerInterceptor]]:
678+
"""
679+
A list of the interceptors for the registry server.
680+
681+
Args:
682+
auth_type: The type of authorization manager, from the feature store configuration.
683+
684+
Returns:
685+
list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`.
686+
"""
687+
if auth_type == AuthManagerType.NONE:
688+
return [ErrorInterceptor()]
689+
690+
return [AuthInterceptor(), ErrorInterceptor()]

sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py

+35
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
from feast import (
99
FeatureStore,
1010
)
11+
from feast.errors import (
12+
EntityNotFoundException,
13+
FeastPermissionError,
14+
FeatureViewNotFoundException,
15+
)
1116
from feast.permissions.permission import Permission
1217
from feast.registry_server import start_server
1318
from feast.wait import wait_retry_backoff # noqa: E402
@@ -70,7 +75,9 @@ def test_registry_apis(
7075
print(f"Running for\n:{auth_config}")
7176
remote_feature_store = get_remote_registry_store(server_port, feature_store)
7277
permissions = _test_list_permissions(remote_feature_store, applied_permissions)
78+
_test_get_entity(remote_feature_store, applied_permissions)
7379
_test_list_entities(remote_feature_store, applied_permissions)
80+
_test_get_fv(remote_feature_store, applied_permissions)
7481
_test_list_fvs(remote_feature_store, applied_permissions)
7582

7683
if _permissions_exist_in_permission_list(
@@ -118,6 +125,20 @@ def _test_get_historical_features(client_fs: FeatureStore):
118125
assertpy.assert_that(training_df).is_not_none()
119126

120127

128+
def _test_get_entity(client_fs: FeatureStore, permissions: list[Permission]):
129+
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
130+
client_fs, permissions, read_entities_perm
131+
):
132+
entity = client_fs.get_entity("driver")
133+
assertpy.assert_that(entity).is_not_none()
134+
assertpy.assert_that(entity.name).is_equal_to("driver")
135+
else:
136+
with pytest.raises(FeastPermissionError):
137+
client_fs.get_entity("driver")
138+
with pytest.raises(EntityNotFoundException):
139+
client_fs.get_entity("invalid-name")
140+
141+
121142
def _test_list_entities(client_fs: FeatureStore, permissions: list[Permission]):
122143
entities = client_fs.list_entities()
123144

@@ -188,6 +209,20 @@ def _is_auth_enabled(client_fs: FeatureStore) -> bool:
188209
return client_fs.config.auth_config.type != "no_auth"
189210

190211

212+
def _test_get_fv(client_fs: FeatureStore, permissions: list[Permission]):
213+
if not _is_auth_enabled(client_fs) or _is_permission_enabled(
214+
client_fs, permissions, read_fv_perm
215+
):
216+
fv = client_fs.get_feature_view("driver_hourly_stats")
217+
assertpy.assert_that(fv).is_not_none()
218+
assertpy.assert_that(fv.name).is_equal_to("driver_hourly_stats")
219+
else:
220+
with pytest.raises(FeastPermissionError):
221+
client_fs.get_feature_view("driver_hourly_stats")
222+
with pytest.raises(FeatureViewNotFoundException):
223+
client_fs.get_feature_view("invalid-name")
224+
225+
191226
def _test_list_fvs(client_fs: FeatureStore, permissions: list[Permission]):
192227
if _is_auth_enabled(client_fs) and _permissions_exist_in_permission_list(
193228
[invalid_list_entities_perm], permissions

sdk/python/tests/unit/test_errors.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import re
2+
3+
import assertpy
4+
5+
import feast.errors as errors
6+
7+
8+
def test_error_error_detail():
9+
e = errors.FeatureViewNotFoundException("abc")
10+
11+
d = e.to_error_detail()
12+
13+
assertpy.assert_that(d).is_not_none()
14+
assertpy.assert_that(d).contains('"module": "feast.errors"')
15+
assertpy.assert_that(d).contains('"class": "FeatureViewNotFoundException"')
16+
assertpy.assert_that(re.search(r"abc", d)).is_true()
17+
18+
converted_e = errors.FeastError.from_error_detail(d)
19+
assertpy.assert_that(converted_e).is_not_none()
20+
assertpy.assert_that(str(converted_e)).is_equal_to(str(e))
21+
assertpy.assert_that(repr(converted_e)).is_equal_to(repr(e))
22+
23+
24+
def test_invalid_error_error_detail():
25+
e = errors.FeastError.from_error_detail("invalid")
26+
assertpy.assert_that(e).is_none()

0 commit comments

Comments
 (0)