Skip to content

Commit 61abf89

Browse files
authored
perf: Make /push async (feast-dev#4650)
1 parent 651ef71 commit 61abf89

File tree

7 files changed

+311
-40
lines changed

7 files changed

+311
-40
lines changed

sdk/python/feast/feature_server.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async def get_online_features(body=Depends(get_body)):
165165
)
166166

167167
@app.post("/push", dependencies=[Depends(inject_user_details)])
168-
def push(body=Depends(get_body)):
168+
async def push(body=Depends(get_body)):
169169
request = PushFeaturesRequest(**json.loads(body))
170170
df = pd.DataFrame(request.df)
171171
actions = []
@@ -201,13 +201,22 @@ def push(body=Depends(get_body)):
201201
for feature_view in fvs_with_push_sources:
202202
assert_permissions(resource=feature_view, actions=actions)
203203

204-
store.push(
204+
push_params = dict(
205205
push_source_name=request.push_source_name,
206206
df=df,
207207
allow_registry_cache=request.allow_registry_cache,
208208
to=to,
209209
)
210210

211+
should_push_async = (
212+
store._get_provider().async_supported.online.write
213+
and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE]
214+
)
215+
if should_push_async:
216+
await store.push_async(**push_params)
217+
else:
218+
store.push(**push_params)
219+
211220
@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
212221
def write_to_online_store(body=Depends(get_body)):
213222
request = WriteToFeatureStoreRequest(**json.loads(body))

sdk/python/feast/feature_store.py

+108-29
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import asyncio
1415
import itertools
1516
import os
1617
import warnings
@@ -33,6 +34,7 @@
3334
import pandas as pd
3435
import pyarrow as pa
3536
from colorama import Fore, Style
37+
from fastapi.concurrency import run_in_threadpool
3638
from google.protobuf.timestamp_pb2 import Timestamp
3739
from tqdm import tqdm
3840

@@ -1423,26 +1425,13 @@ def tqdm_builder(length):
14231425
end_date,
14241426
)
14251427

1426-
def push(
1427-
self,
1428-
push_source_name: str,
1429-
df: pd.DataFrame,
1430-
allow_registry_cache: bool = True,
1431-
to: PushMode = PushMode.ONLINE,
1432-
):
1433-
"""
1434-
Push features to a push source. This updates all the feature views that have the push source as stream source.
1435-
1436-
Args:
1437-
push_source_name: The name of the push source we want to push data to.
1438-
df: The data being pushed.
1439-
allow_registry_cache: Whether to allow cached versions of the registry.
1440-
to: Whether to push to online or offline store. Defaults to online store only.
1441-
"""
1428+
def _fvs_for_push_source_or_raise(
1429+
self, push_source_name: str, allow_cache: bool
1430+
) -> set[FeatureView]:
14421431
from feast.data_source import PushSource
14431432

1444-
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
1445-
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)
1433+
all_fvs = self.list_feature_views(allow_cache=allow_cache)
1434+
all_fvs += self.list_stream_feature_views(allow_cache=allow_cache)
14461435

14471436
fvs_with_push_sources = {
14481437
fv
@@ -1457,7 +1446,27 @@ def push(
14571446
if not fvs_with_push_sources:
14581447
raise PushSourceNotFoundException(push_source_name)
14591448

1460-
for fv in fvs_with_push_sources:
1449+
return fvs_with_push_sources
1450+
1451+
def push(
1452+
self,
1453+
push_source_name: str,
1454+
df: pd.DataFrame,
1455+
allow_registry_cache: bool = True,
1456+
to: PushMode = PushMode.ONLINE,
1457+
):
1458+
"""
1459+
Push features to a push source. This updates all the feature views that have the push source as stream source.
1460+
1461+
Args:
1462+
push_source_name: The name of the push source we want to push data to.
1463+
df: The data being pushed.
1464+
allow_registry_cache: Whether to allow cached versions of the registry.
1465+
to: Whether to push to online or offline store. Defaults to online store only.
1466+
"""
1467+
for fv in self._fvs_for_push_source_or_raise(
1468+
push_source_name, allow_registry_cache
1469+
):
14611470
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
14621471
self.write_to_online_store(
14631472
fv.name, df, allow_registry_cache=allow_registry_cache
@@ -1467,22 +1476,42 @@ def push(
14671476
fv.name, df, allow_registry_cache=allow_registry_cache
14681477
)
14691478

1470-
def write_to_online_store(
1479+
async def push_async(
1480+
self,
1481+
push_source_name: str,
1482+
df: pd.DataFrame,
1483+
allow_registry_cache: bool = True,
1484+
to: PushMode = PushMode.ONLINE,
1485+
):
1486+
fvs = self._fvs_for_push_source_or_raise(push_source_name, allow_registry_cache)
1487+
1488+
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
1489+
_ = await asyncio.gather(
1490+
*[
1491+
self.write_to_online_store_async(
1492+
fv.name, df, allow_registry_cache=allow_registry_cache
1493+
)
1494+
for fv in fvs
1495+
]
1496+
)
1497+
1498+
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
1499+
1500+
def _offline_write():
1501+
for fv in fvs:
1502+
self.write_to_offline_store(
1503+
fv.name, df, allow_registry_cache=allow_registry_cache
1504+
)
1505+
1506+
await run_in_threadpool(_offline_write)
1507+
1508+
def _get_feature_view_and_df_for_online_write(
14711509
self,
14721510
feature_view_name: str,
14731511
df: Optional[pd.DataFrame] = None,
14741512
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
14751513
allow_registry_cache: bool = True,
14761514
):
1477-
"""
1478-
Persists a dataframe to the online store.
1479-
1480-
Args:
1481-
feature_view_name: The feature view to which the dataframe corresponds.
1482-
df: The dataframe to be persisted.
1483-
inputs: Optional the dictionary object to be written
1484-
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1485-
"""
14861515
feature_view_dict = {
14871516
fv_proto.name: fv_proto
14881517
for fv_proto in self.list_all_feature_views(allow_registry_cache)
@@ -1509,10 +1538,60 @@ def write_to_online_store(
15091538
df = pd.DataFrame(df)
15101539
except Exception as _:
15111540
raise DataFrameSerializationError(df)
1541+
return feature_view, df
1542+
1543+
def write_to_online_store(
1544+
self,
1545+
feature_view_name: str,
1546+
df: Optional[pd.DataFrame] = None,
1547+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1548+
allow_registry_cache: bool = True,
1549+
):
1550+
"""
1551+
Persists a dataframe to the online store.
15121552
1553+
Args:
1554+
feature_view_name: The feature view to which the dataframe corresponds.
1555+
df: The dataframe to be persisted.
1556+
inputs: Optional the dictionary object to be written
1557+
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1558+
"""
1559+
1560+
feature_view, df = self._get_feature_view_and_df_for_online_write(
1561+
feature_view_name=feature_view_name,
1562+
df=df,
1563+
inputs=inputs,
1564+
allow_registry_cache=allow_registry_cache,
1565+
)
15131566
provider = self._get_provider()
15141567
provider.ingest_df(feature_view, df)
15151568

1569+
async def write_to_online_store_async(
1570+
self,
1571+
feature_view_name: str,
1572+
df: Optional[pd.DataFrame] = None,
1573+
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
1574+
allow_registry_cache: bool = True,
1575+
):
1576+
"""
1577+
Persists a dataframe to the online store asynchronously.
1578+
1579+
Args:
1580+
feature_view_name: The feature view to which the dataframe corresponds.
1581+
df: The dataframe to be persisted.
1582+
inputs: Optional the dictionary object to be written
1583+
allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry.
1584+
"""
1585+
1586+
feature_view, df = self._get_feature_view_and_df_for_online_write(
1587+
feature_view_name=feature_view_name,
1588+
df=df,
1589+
inputs=inputs,
1590+
allow_registry_cache=allow_registry_cache,
1591+
)
1592+
provider = self._get_provider()
1593+
await provider.ingest_df_async(feature_view, df)
1594+
15161595
def write_to_offline_store(
15171596
self,
15181597
feature_view_name: str,

sdk/python/feast/infra/online_stores/online_store.py

+27
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,33 @@ def online_write_batch(
6767
"""
6868
pass
6969

70+
async def online_write_batch_async(
71+
self,
72+
config: RepoConfig,
73+
table: FeatureView,
74+
data: List[
75+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
76+
],
77+
progress: Optional[Callable[[int], Any]],
78+
) -> None:
79+
"""
80+
Writes a batch of feature rows to the online store asynchronously.
81+
82+
If a tz-naive timestamp is passed to this method, it is assumed to be UTC.
83+
84+
Args:
85+
config: The config for the current feature store.
86+
table: Feature view to which these feature rows correspond.
87+
data: A list of quadruplets containing feature data. Each quadruplet contains an entity
88+
key, a dict containing feature values, an event timestamp for the row, and the created
89+
timestamp for the row if it exists.
90+
progress: Function to be called once a batch of rows is written to the online store, used
91+
to show progress.
92+
"""
93+
raise NotImplementedError(
94+
f"Online store {self.__class__.__name__} does not support online write batch async"
95+
)
96+
7097
@abstractmethod
7198
def online_read(
7299
self,

sdk/python/feast/infra/passthrough_provider.py

+47-9
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,20 @@ def online_write_batch(
188188
if self.online_store:
189189
self.online_store.online_write_batch(config, table, data, progress)
190190

191+
async def online_write_batch_async(
192+
self,
193+
config: RepoConfig,
194+
table: Union[FeatureView, BaseFeatureView, OnDemandFeatureView],
195+
data: List[
196+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
197+
],
198+
progress: Optional[Callable[[int], Any]],
199+
) -> None:
200+
if self.online_store:
201+
await self.online_store.online_write_batch_async(
202+
config, table, data, progress
203+
)
204+
191205
def offline_write_batch(
192206
self,
193207
config: RepoConfig,
@@ -291,8 +305,8 @@ def retrieve_online_documents(
291305
)
292306
return result
293307

294-
def ingest_df(
295-
self,
308+
@staticmethod
309+
def _prep_rows_to_write_for_ingestion(
296310
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
297311
df: pd.DataFrame,
298312
field_mapping: Optional[Dict] = None,
@@ -307,10 +321,6 @@ def ingest_df(
307321
for entity in feature_view.entity_columns
308322
}
309323
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
310-
311-
self.online_write_batch(
312-
self.repo_config, feature_view, rows_to_write, progress=None
313-
)
314324
else:
315325
if hasattr(feature_view, "entity_columns"):
316326
join_keys = {
@@ -336,9 +346,37 @@ def ingest_df(
336346
join_keys[entity.name] = entity.dtype.to_value_type()
337347
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)
338348

339-
self.online_write_batch(
340-
self.repo_config, feature_view, rows_to_write, progress=None
341-
)
349+
return rows_to_write
350+
351+
def ingest_df(
352+
self,
353+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
354+
df: pd.DataFrame,
355+
field_mapping: Optional[Dict] = None,
356+
):
357+
rows_to_write = self._prep_rows_to_write_for_ingestion(
358+
feature_view=feature_view,
359+
df=df,
360+
field_mapping=field_mapping,
361+
)
362+
self.online_write_batch(
363+
self.repo_config, feature_view, rows_to_write, progress=None
364+
)
365+
366+
async def ingest_df_async(
367+
self,
368+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
369+
df: pd.DataFrame,
370+
field_mapping: Optional[Dict] = None,
371+
):
372+
rows_to_write = self._prep_rows_to_write_for_ingestion(
373+
feature_view=feature_view,
374+
df=df,
375+
field_mapping=field_mapping,
376+
)
377+
await self.online_write_batch_async(
378+
self.repo_config, feature_view, rows_to_write, progress=None
379+
)
342380

343381
def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table):
344382
if feature_view.batch_source.field_mapping is not None:

sdk/python/feast/infra/provider.py

+42
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,32 @@ def online_write_batch(
141141
"""
142142
pass
143143

144+
@abstractmethod
145+
async def online_write_batch_async(
146+
self,
147+
config: RepoConfig,
148+
table: FeatureView,
149+
data: List[
150+
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
151+
],
152+
progress: Optional[Callable[[int], Any]],
153+
) -> None:
154+
"""
155+
Writes a batch of feature rows to the online store asynchronously.
156+
157+
If a tz-naive timestamp is passed to this method, it is assumed to be UTC.
158+
159+
Args:
160+
config: The config for the current feature store.
161+
table: Feature view to which these feature rows correspond.
162+
data: A list of quadruplets containing feature data. Each quadruplet contains an entity
163+
key, a dict containing feature values, an event timestamp for the row, and the created
164+
timestamp for the row if it exists.
165+
progress: Function to be called once a batch of rows is written to the online store, used
166+
to show progress.
167+
"""
168+
pass
169+
144170
def ingest_df(
145171
self,
146172
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
@@ -157,6 +183,22 @@ def ingest_df(
157183
"""
158184
pass
159185

186+
async def ingest_df_async(
187+
self,
188+
feature_view: Union[BaseFeatureView, FeatureView, OnDemandFeatureView],
189+
df: pd.DataFrame,
190+
field_mapping: Optional[Dict] = None,
191+
):
192+
"""
193+
Persists a dataframe to the online store asynchronously.
194+
195+
Args:
196+
feature_view: The feature view to which the dataframe corresponds.
197+
df: The dataframe to be persisted.
198+
field_mapping: A dictionary mapping dataframe column names to feature names.
199+
"""
200+
pass
201+
160202
def ingest_df_to_offline_store(
161203
self,
162204
feature_view: FeatureView,

0 commit comments

Comments
 (0)