Skip to content

Commit 4e2eacc

Browse files
EXPEbdodlaBhargav Dodla
and
Bhargav Dodla
authored
fix: Fix for SQL registry initialization fails feast-dev#4543 (feast-dev#4544)
* fix: Fix for SQL registry initialization fails feast-dev#4543 Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com> * fix: Removed combined_sql_fixtures Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com> * fix: Added protobuf dependency to pyproject.toml Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com> --------- Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com> Co-authored-by: Bhargav Dodla <bdodla@expediagroup.com>
1 parent 1b92803 commit 4e2eacc

File tree

4 files changed

+89
-17
lines changed

4 files changed

+89
-17
lines changed

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[build-system]
22
requires = [
3+
"protobuf<5",
34
"grpcio-tools>=1.56.2,<2",
45
"mypy-protobuf>=3.1",
56
"pybindgen==0.22.0",

sdk/python/feast/infra/registry/caching_registry.py

+23-15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from feast.permissions.permission import Permission
2020
from feast.project import Project
2121
from feast.project_metadata import ProjectMetadata
22+
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
2223
from feast.saved_dataset import SavedDataset, ValidationReference
2324
from feast.stream_feature_view import StreamFeatureView
2425
from feast.utils import _utc_now
@@ -28,13 +29,14 @@
2829

2930
class CachingRegistry(BaseRegistry):
3031
def __init__(self, project: str, cache_ttl_seconds: int, cache_mode: str):
31-
self.cached_registry_proto = self.proto()
32-
self.cached_registry_proto_created = _utc_now()
32+
self.cache_mode = cache_mode
33+
self.cached_registry_proto = RegistryProto()
3334
self._refresh_lock = Lock()
3435
self.cached_registry_proto_ttl = timedelta(
3536
seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0
3637
)
37-
self.cache_mode = cache_mode
38+
self.cached_registry_proto = self.proto()
39+
self.cached_registry_proto_created = _utc_now()
3840
if cache_mode == "thread":
3941
self._start_thread_async_refresh(cache_ttl_seconds)
4042
atexit.register(self._exit_handler)
@@ -429,20 +431,26 @@ def refresh(self, project: Optional[str] = None):
429431
def _refresh_cached_registry_if_necessary(self):
430432
if self.cache_mode == "sync":
431433
with self._refresh_lock:
432-
expired = (
433-
self.cached_registry_proto is None
434-
or self.cached_registry_proto_created is None
435-
) or (
436-
self.cached_registry_proto_ttl.total_seconds()
437-
> 0 # 0 ttl means infinity
438-
and (
439-
_utc_now()
440-
> (
441-
self.cached_registry_proto_created
442-
+ self.cached_registry_proto_ttl
434+
if self.cached_registry_proto == RegistryProto():
435+
# Avoids the need to refresh the registry when cache is not populated yet
436+
# Specially during the __init__ phase
437+
# proto() will populate the cache with project metadata if no objects are registered
438+
expired = False
439+
else:
440+
expired = (
441+
self.cached_registry_proto is None
442+
or self.cached_registry_proto_created is None
443+
) or (
444+
self.cached_registry_proto_ttl.total_seconds()
445+
> 0 # 0 ttl means infinity
446+
and (
447+
_utc_now()
448+
> (
449+
self.cached_registry_proto_created
450+
+ self.cached_registry_proto_ttl
451+
)
443452
)
444453
)
445-
)
446454
if expired:
447455
logger.info("Registry cache expired, so refreshing")
448456
self.refresh()

sdk/python/feast/infra/registry/sql.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ def __init__(
251251
registry_config, SqlRegistryConfig
252252
), "SqlRegistry needs a valid registry_config"
253253

254+
self.registry_config = registry_config
255+
254256
self.write_engine: Engine = create_engine(
255257
registry_config.path, **registry_config.sqlalchemy_config_kwargs
256258
)
@@ -281,7 +283,7 @@ def __init__(
281283
def _sync_feast_metadata_to_projects_table(self):
282284
feast_metadata_projects: set = []
283285
projects_set: set = []
284-
with self.write_engine.begin() as conn:
286+
with self.read_engine.begin() as conn:
285287
stmt = select(feast_metadata).where(
286288
feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value
287289
)
@@ -290,7 +292,7 @@ def _sync_feast_metadata_to_projects_table(self):
290292
feast_metadata_projects.append(row._mapping["project_id"])
291293

292294
if len(feast_metadata_projects) > 0:
293-
with self.write_engine.begin() as conn:
295+
with self.read_engine.begin() as conn:
294296
stmt = select(projects)
295297
rows = conn.execute(stmt).all()
296298
for row in rows:

sdk/python/tests/integration/registration/test_universal_registry.py

+61
Original file line numberDiff line numberDiff line change
@@ -1767,3 +1767,64 @@ def test_apply_entity_success_with_purge_feast_metadata(test_registry):
17671767
assert len(entities) == 0
17681768

17691769
test_registry.teardown()
1770+
1771+
1772+
@pytest.mark.integration
1773+
@pytest.mark.parametrize(
1774+
"test_registry",
1775+
sql_fixtures + async_sql_fixtures,
1776+
)
1777+
def test_apply_entity_to_sql_registry_and_reinitialize_sql_registry(test_registry):
1778+
entity = Entity(
1779+
name="driver_car_id",
1780+
description="Car driver id",
1781+
tags={"team": "matchmaking"},
1782+
)
1783+
1784+
project = "project"
1785+
1786+
# Register Entity
1787+
test_registry.apply_entity(entity, project)
1788+
assert_project(project, test_registry)
1789+
1790+
entities = test_registry.list_entities(project, tags=entity.tags)
1791+
assert_project(project, test_registry)
1792+
1793+
entity = entities[0]
1794+
assert (
1795+
len(entities) == 1
1796+
and entity.name == "driver_car_id"
1797+
and entity.description == "Car driver id"
1798+
and "team" in entity.tags
1799+
and entity.tags["team"] == "matchmaking"
1800+
)
1801+
1802+
entity = test_registry.get_entity("driver_car_id", project)
1803+
assert (
1804+
entity.name == "driver_car_id"
1805+
and entity.description == "Car driver id"
1806+
and "team" in entity.tags
1807+
and entity.tags["team"] == "matchmaking"
1808+
)
1809+
1810+
# After the first apply, the created_timestamp should be the same as the last_update_timestamp.
1811+
assert entity.created_timestamp == entity.last_updated_timestamp
1812+
updated_test_registry = SqlRegistry(test_registry.registry_config, "project", None)
1813+
1814+
# Update entity
1815+
updated_entity = Entity(
1816+
name="driver_car_id",
1817+
description="Car driver Id",
1818+
tags={"team": "matchmaking"},
1819+
)
1820+
updated_test_registry.apply_entity(updated_entity, project)
1821+
1822+
updated_entity = updated_test_registry.get_entity("driver_car_id", project)
1823+
updated_test_registry.delete_entity("driver_car_id", project)
1824+
assert_project(project, updated_test_registry)
1825+
entities = updated_test_registry.list_entities(project)
1826+
assert_project(project, updated_test_registry)
1827+
assert len(entities) == 0
1828+
1829+
updated_test_registry.teardown()
1830+
test_registry.teardown()

0 commit comments

Comments
 (0)