Skip to content

Commit 47dc04d

Browse files
authored
perf: Add init and cleanup of long lived resources (#4642)
* rebase Signed-off-by: Rob Howley <howley.robert@gmail.com> * offline store init doesnt make sense Signed-off-by: Rob Howley <howley.robert@gmail.com> * dont init or close Signed-off-by: Rob Howley <howley.robert@gmail.com> * update test to handle event loop for dynamo case Signed-off-by: Rob Howley <howley.robert@gmail.com> * use run util complete Signed-off-by: Rob Howley <howley.robert@gmail.com> * fix: spelling sigh Signed-off-by: Rob Howley <howley.robert@gmail.com> * run integration test as async since that is default for read Signed-off-by: Rob Howley <howley.robert@gmail.com> * add pytest async to ci reqs Signed-off-by: Rob Howley <howley.robert@gmail.com> * be safe w cleanup in test fixture Signed-off-by: Rob Howley <howley.robert@gmail.com> * be safe w cleanup in test fixture Signed-off-by: Rob Howley <howley.robert@gmail.com> * update pytest ini Signed-off-by: Rob Howley <howley.robert@gmail.com> * not in a finally Signed-off-by: Rob Howley <howley.robert@gmail.com> * remove close Signed-off-by: Rob Howley <howley.robert@gmail.com> * test client is a lifespan aware context manager Signed-off-by: Rob Howley <howley.robert@gmail.com> --------- Signed-off-by: Rob Howley <howley.robert@gmail.com>
1 parent e726c09 commit 47dc04d

13 files changed

+583
-113
lines changed

sdk/python/feast/feature_server.py

+2
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,11 @@ def async_refresh():
100100

101101
@asynccontextmanager
102102
async def lifespan(app: FastAPI):
103+
await store.initialize()
103104
async_refresh()
104105
yield
105106
stop_refresh()
107+
await store.close()
106108

107109
app = FastAPI(lifespan=lifespan)
108110

sdk/python/feast/feature_store.py

+8
Original file line numberDiff line numberDiff line change
@@ -2167,6 +2167,14 @@ def list_saved_datasets(
21672167
self.project, allow_cache=allow_cache, tags=tags
21682168
)
21692169

2170+
async def initialize(self) -> None:
2171+
"""Initialize long-lived clients and/or resources needed for accessing datastores"""
2172+
await self._get_provider().initialize(self.config)
2173+
2174+
async def close(self) -> None:
2175+
"""Cleanup any long-lived clients and/or resources"""
2176+
await self._get_provider().close()
2177+
21702178

21712179
def _print_materialization_log(
21722180
start_date, end_date, num_feature_views: int, online_store: str

sdk/python/feast/infra/online_stores/dynamodb.py

+56-18
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import asyncio
15+
import contextlib
1516
import itertools
1617
import logging
1718
from datetime import datetime
1819
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union
1920

21+
from aiobotocore.config import AioConfig
2022
from pydantic import StrictBool, StrictStr
2123

2224
from feast import Entity, FeatureView, utils
@@ -75,6 +77,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
7577
session_based_auth: bool = False
7678
"""AWS session based client authentication"""
7779

80+
max_pool_connections: int = 10
81+
"""Max number of connections for async Dynamodb operations"""
82+
7883

7984
class DynamoDBOnlineStore(OnlineStore):
8085
"""
@@ -87,7 +92,14 @@ class DynamoDBOnlineStore(OnlineStore):
8792

8893
_dynamodb_client = None
8994
_dynamodb_resource = None
90-
_aioboto_session = None
95+
96+
async def initialize(self, config: RepoConfig):
97+
await _get_aiodynamodb_client(
98+
config.online_store.region, config.online_store.max_pool_connections
99+
)
100+
101+
async def close(self):
102+
await _aiodynamodb_close()
91103

92104
@property
93105
def async_supported(self) -> SupportedAsyncMethods:
@@ -326,15 +338,17 @@ def to_tbl_resp(raw_client_response):
326338
batches.append(batch)
327339
entity_id_batches.append(entity_id_batch)
328340

329-
async with self._get_aiodynamodb_client(online_config.region) as client:
330-
response_batches = await asyncio.gather(
331-
*[
332-
client.batch_get_item(
333-
RequestItems=entity_id_batch,
334-
)
335-
for entity_id_batch in entity_id_batches
336-
]
337-
)
341+
client = await _get_aiodynamodb_client(
342+
online_config.region, online_config.max_pool_connections
343+
)
344+
response_batches = await asyncio.gather(
345+
*[
346+
client.batch_get_item(
347+
RequestItems=entity_id_batch,
348+
)
349+
for entity_id_batch in entity_id_batches
350+
]
351+
)
338352

339353
result_batches = []
340354
for batch, response in zip(batches, response_batches):
@@ -349,14 +363,6 @@ def to_tbl_resp(raw_client_response):
349363

350364
return list(itertools.chain(*result_batches))
351365

352-
def _get_aioboto_session(self):
353-
if self._aioboto_session is None:
354-
self._aioboto_session = session.get_session()
355-
return self._aioboto_session
356-
357-
def _get_aiodynamodb_client(self, region: str):
358-
return self._get_aioboto_session().create_client("dynamodb", region_name=region)
359-
360366
def _get_dynamodb_client(
361367
self,
362368
region: str,
@@ -489,6 +495,38 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
489495
}
490496

491497

498+
_aioboto_session = None
499+
_aioboto_client = None
500+
501+
502+
def _get_aioboto_session():
503+
global _aioboto_session
504+
if _aioboto_session is None:
505+
logger.debug("initializing the aiobotocore session")
506+
_aioboto_session = session.get_session()
507+
return _aioboto_session
508+
509+
510+
async def _get_aiodynamodb_client(region: str, max_pool_connections: int):
511+
global _aioboto_client
512+
if _aioboto_client is None:
513+
logger.debug("initializing the aiobotocore dynamodb client")
514+
client_context = _get_aioboto_session().create_client(
515+
"dynamodb",
516+
region_name=region,
517+
config=AioConfig(max_pool_connections=max_pool_connections),
518+
)
519+
context_stack = contextlib.AsyncExitStack()
520+
_aioboto_client = await context_stack.enter_async_context(client_context)
521+
return _aioboto_client
522+
523+
524+
async def _aiodynamodb_close():
525+
global _aioboto_client
526+
if _aioboto_client:
527+
await _aioboto_client.close()
528+
529+
492530
def _initialize_dynamodb_client(
493531
region: str,
494532
endpoint_url: Optional[str] = None,

sdk/python/feast/infra/online_stores/online_store.py

+6
Original file line numberDiff line numberDiff line change
@@ -422,3 +422,9 @@ def retrieve_online_documents(
422422
raise NotImplementedError(
423423
f"Online store {self.__class__.__name__} does not support online retrieval"
424424
)
425+
426+
async def initialize(self, config: RepoConfig) -> None:
427+
pass
428+
429+
async def close(self) -> None:
430+
pass

sdk/python/feast/infra/passthrough_provider.py

+6
Original file line numberDiff line numberDiff line change
@@ -518,3 +518,9 @@ def get_table_column_names_and_types_from_data_source(
518518
return self.offline_store.get_table_column_names_and_types_from_data_source(
519519
config=config, data_source=data_source
520520
)
521+
522+
async def initialize(self, config: RepoConfig) -> None:
523+
await self.online_store.initialize(config)
524+
525+
async def close(self) -> None:
526+
await self.online_store.close()

sdk/python/feast/infra/provider.py

+8
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,14 @@ def get_table_column_names_and_types_from_data_source(
476476
"""
477477
pass
478478

479+
@abstractmethod
480+
async def initialize(self, config: RepoConfig) -> None:
481+
pass
482+
483+
@abstractmethod
484+
async def close(self) -> None:
485+
pass
486+
479487

480488
def get_provider(config: RepoConfig) -> Provider:
481489
if "." not in config.provider:

sdk/python/pytest.ini

+4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
[pytest]
2+
asyncio_mode = auto
3+
24
markers =
35
universal_offline_stores: mark a test as using all offline stores.
46
universal_online_stores: mark a test as using all online stores.
@@ -7,6 +9,8 @@ env =
79
IS_TEST=True
810

911
filterwarnings =
12+
error::_pytest.warning_types.PytestConfigWarning
13+
error::_pytest.warning_types.PytestUnhandledCoroutineWarning
1014
ignore::DeprecationWarning:pyspark.sql.pandas.*:
1115
ignore::DeprecationWarning:pyspark.sql.connect.*:
1216
ignore::DeprecationWarning:httpx.*:

0 commit comments

Comments
 (0)