Skip to content

Commit c1f1912

Browse files
authored
perf: Default to async endpoints, use threadpool for sync (feast-dev#4647)
1 parent 0a2bb47 commit c1f1912

File tree

6 files changed

+44
-4
lines changed

6 files changed

+44
-4
lines changed

sdk/python/feast/feature_server.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import psutil
1111
from dateutil import parser
1212
from fastapi import Depends, FastAPI, Request, Response, status
13+
from fastapi.concurrency import run_in_threadpool
1314
from fastapi.logger import logger
1415
from fastapi.responses import JSONResponse
1516
from google.protobuf.json_format import MessageToDict
@@ -112,7 +113,7 @@ async def get_body(request: Request):
112113
"/get-online-features",
113114
dependencies=[Depends(inject_user_details)],
114115
)
115-
def get_online_features(body=Depends(get_body)):
116+
async def get_online_features(body=Depends(get_body)):
116117
body = json.loads(body)
117118
full_feature_names = body.get("full_feature_names", False)
118119
entity_rows = body["entities"]
@@ -145,15 +146,22 @@ def get_online_features(body=Depends(get_body)):
145146
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
146147
)
147148

148-
response_proto = store.get_online_features(
149+
read_params = dict(
149150
features=features,
150151
entity_rows=entity_rows,
151152
full_feature_names=full_feature_names,
152-
).proto
153+
)
154+
155+
if store._get_provider().async_supported.online.read:
156+
response = await store.get_online_features_async(**read_params)
157+
else:
158+
response = await run_in_threadpool(
159+
lambda: store.get_online_features(**read_params)
160+
)
153161

154162
# Convert the Protobuf object to JSON and return it
155163
return MessageToDict(
156-
response_proto, preserving_proto_field_name=True, float_precision=18
164+
response.proto, preserving_proto_field_name=True, float_precision=18
157165
)
158166

159167
@app.post("/push", dependencies=[Depends(inject_user_details)])

sdk/python/feast/infra/online_stores/dynamodb.py

+5
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
2424
from feast.infra.online_stores.helpers import compute_entity_id
2525
from feast.infra.online_stores.online_store import OnlineStore
26+
from feast.infra.supported_async_methods import SupportedAsyncMethods
2627
from feast.protos.feast.core.DynamoDBTable_pb2 import (
2728
DynamoDBTable as DynamoDBTableProto,
2829
)
@@ -88,6 +89,10 @@ class DynamoDBOnlineStore(OnlineStore):
8889
_dynamodb_resource = None
8990
_aioboto_session = None
9091

92+
@property
93+
def async_supported(self) -> SupportedAsyncMethods:
94+
return SupportedAsyncMethods(read=True)
95+
9196
def update(
9297
self,
9398
config: RepoConfig,

sdk/python/feast/infra/online_stores/online_store.py

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from feast.feature_view import FeatureView
2323
from feast.infra.infra_object import InfraObject
2424
from feast.infra.registry.base_registry import BaseRegistry
25+
from feast.infra.supported_async_methods import SupportedAsyncMethods
2526
from feast.online_response import OnlineResponse
2627
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
2728
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -36,6 +37,10 @@ class OnlineStore(ABC):
3637
The interface that Feast uses to interact with the storage system that handles online features.
3738
"""
3839

40+
@property
41+
def async_supported(self) -> SupportedAsyncMethods:
42+
return SupportedAsyncMethods()
43+
3944
@abstractmethod
4045
def online_write_batch(
4146
self,

sdk/python/feast/infra/passthrough_provider.py

+7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from feast.infra.online_stores.helpers import get_online_store_from_config
3636
from feast.infra.provider import Provider
3737
from feast.infra.registry.base_registry import BaseRegistry
38+
from feast.infra.supported_async_methods import ProviderAsyncMethods
3839
from feast.online_response import OnlineResponse
3940
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
4041
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -79,6 +80,12 @@ def offline_store(self):
7980
)
8081
return self._offline_store
8182

83+
@property
84+
def async_supported(self) -> ProviderAsyncMethods:
85+
return ProviderAsyncMethods(
86+
online=self.online_store.async_supported,
87+
)
88+
8289
@property
8390
def batch_engine(self) -> BatchMaterializationEngine:
8491
if self._batch_engine:

sdk/python/feast/infra/provider.py

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from feast.infra.infra_object import Infra
2828
from feast.infra.offline_stores.offline_store import RetrievalJob
2929
from feast.infra.registry.base_registry import BaseRegistry
30+
from feast.infra.supported_async_methods import ProviderAsyncMethods
3031
from feast.on_demand_feature_view import OnDemandFeatureView
3132
from feast.online_response import OnlineResponse
3233
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
@@ -55,6 +56,10 @@ class Provider(ABC):
5556
def __init__(self, config: RepoConfig):
5657
pass
5758

59+
@property
60+
def async_supported(self) -> ProviderAsyncMethods:
61+
return ProviderAsyncMethods()
62+
5863
@abstractmethod
5964
def update_infra(
6065
self,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from pydantic import BaseModel, Field
2+
3+
4+
class SupportedAsyncMethods(BaseModel):
5+
read: bool = Field(default=False)
6+
write: bool = Field(default=False)
7+
8+
9+
class ProviderAsyncMethods(BaseModel):
10+
online: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods)

0 commit comments

Comments
 (0)