Skip to content

Commit d793c77

Browse files
EXPEbdodlaBhargav Dodla
and
Bhargav Dodla
authored
feat: Added support for reading from Reader Endpoints for AWS Aurora use cases (#4494)
fix: Resovled merge conflicts associated to new changes Signed-off-by: Bhargav Dodla <bdodla@expediagroup.com> Co-authored-by: Bhargav Dodla <bdodla@expediagroup.com>
1 parent 87e7ca4 commit d793c77

File tree

2 files changed

+74
-20
lines changed

2 files changed

+74
-20
lines changed

sdk/python/feast/infra/registry/sql.py

+31-20
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ class SqlRegistryConfig(RegistryConfig):
202202
""" str: Path to metadata store.
203203
If registry_type is 'sql', then this is a database URL as expected by SQLAlchemy """
204204

205+
read_path: Optional[StrictStr] = None
206+
""" str: Read Path to metadata store if different from path.
207+
If registry_type is 'sql', then this is a Read Endpoint for database URL. If not set, path will be used for read and write. """
208+
205209
sqlalchemy_config_kwargs: Dict[str, Any] = {"echo": False}
206210
""" Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """
207211

@@ -223,13 +227,20 @@ def __init__(
223227
registry_config, SqlRegistryConfig
224228
), "SqlRegistry needs a valid registry_config"
225229

226-
self.engine: Engine = create_engine(
230+
self.write_engine: Engine = create_engine(
227231
registry_config.path, **registry_config.sqlalchemy_config_kwargs
228232
)
233+
if registry_config.read_path:
234+
self.read_engine: Engine = create_engine(
235+
registry_config.read_path,
236+
**registry_config.sqlalchemy_config_kwargs,
237+
)
238+
else:
239+
self.read_engine = self.write_engine
240+
metadata.create_all(self.write_engine)
229241
self.thread_pool_executor_worker_count = (
230242
registry_config.thread_pool_executor_worker_count
231243
)
232-
metadata.create_all(self.engine)
233244
self.purge_feast_metadata = registry_config.purge_feast_metadata
234245
# Sync feast_metadata to projects table
235246
# when purge_feast_metadata is set to True, Delete data from
@@ -246,7 +257,7 @@ def __init__(
246257
def _sync_feast_metadata_to_projects_table(self):
247258
feast_metadata_projects: set = []
248259
projects_set: set = []
249-
with self.engine.begin() as conn:
260+
with self.write_engine.begin() as conn:
250261
stmt = select(feast_metadata).where(
251262
feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value
252263
)
@@ -255,7 +266,7 @@ def _sync_feast_metadata_to_projects_table(self):
255266
feast_metadata_projects.append(row._mapping["project_id"])
256267

257268
if len(feast_metadata_projects) > 0:
258-
with self.engine.begin() as conn:
269+
with self.write_engine.begin() as conn:
259270
stmt = select(projects)
260271
rows = conn.execute(stmt).all()
261272
for row in rows:
@@ -267,7 +278,7 @@ def _sync_feast_metadata_to_projects_table(self):
267278
self.apply_project(Project(name=project_name), commit=True)
268279

269280
if self.purge_feast_metadata:
270-
with self.engine.begin() as conn:
281+
with self.write_engine.begin() as conn:
271282
for project_name in feast_metadata_projects:
272283
stmt = delete(feast_metadata).where(
273284
feast_metadata.c.project_id == project_name
@@ -285,7 +296,7 @@ def teardown(self):
285296
validation_references,
286297
permissions,
287298
}:
288-
with self.engine.begin() as conn:
299+
with self.write_engine.begin() as conn:
289300
stmt = delete(t)
290301
conn.execute(stmt)
291302

@@ -549,7 +560,7 @@ def apply_feature_service(
549560
)
550561

551562
def delete_data_source(self, name: str, project: str, commit: bool = True):
552-
with self.engine.begin() as conn:
563+
with self.write_engine.begin() as conn:
553564
stmt = delete(data_sources).where(
554565
data_sources.c.data_source_name == name,
555566
data_sources.c.project_id == project,
@@ -607,7 +618,7 @@ def _list_on_demand_feature_views(
607618
)
608619

609620
def _list_project_metadata(self, project: str) -> List[ProjectMetadata]:
610-
with self.engine.begin() as conn:
621+
with self.read_engine.begin() as conn:
611622
stmt = select(feast_metadata).where(
612623
feast_metadata.c.project_id == project,
613624
)
@@ -726,7 +737,7 @@ def apply_user_metadata(
726737
table = self._infer_fv_table(feature_view)
727738

728739
name = feature_view.name
729-
with self.engine.begin() as conn:
740+
with self.write_engine.begin() as conn:
730741
stmt = select(table).where(
731742
getattr(table.c, "feature_view_name") == name,
732743
table.c.project_id == project,
@@ -781,7 +792,7 @@ def get_user_metadata(
781792
table = self._infer_fv_table(feature_view)
782793

783794
name = feature_view.name
784-
with self.engine.begin() as conn:
795+
with self.read_engine.begin() as conn:
785796
stmt = select(table).where(getattr(table.c, "feature_view_name") == name)
786797
row = conn.execute(stmt).first()
787798
if row:
@@ -885,7 +896,7 @@ def _apply_object(
885896
name = name or (obj.name if hasattr(obj, "name") else None)
886897
assert name, f"name needs to be provided for {obj}"
887898

888-
with self.engine.begin() as conn:
899+
with self.write_engine.begin() as conn:
889900
update_datetime = _utc_now()
890901
update_time = int(update_datetime.timestamp())
891902
stmt = select(table).where(
@@ -961,7 +972,7 @@ def _apply_object(
961972

962973
def _maybe_init_project_metadata(self, project):
963974
# Initialize project metadata if needed
964-
with self.engine.begin() as conn:
975+
with self.write_engine.begin() as conn:
965976
update_datetime = _utc_now()
966977
update_time = int(update_datetime.timestamp())
967978
stmt = select(feast_metadata).where(
@@ -988,7 +999,7 @@ def _delete_object(
988999
id_field_name: str,
9891000
not_found_exception: Optional[Callable],
9901001
):
991-
with self.engine.begin() as conn:
1002+
with self.write_engine.begin() as conn:
9921003
stmt = delete(table).where(
9931004
getattr(table.c, id_field_name) == name, table.c.project_id == project
9941005
)
@@ -1014,7 +1025,7 @@ def _get_object(
10141025
proto_field_name: str,
10151026
not_found_exception: Optional[Callable],
10161027
):
1017-
with self.engine.begin() as conn:
1028+
with self.read_engine.begin() as conn:
10181029
stmt = select(table).where(
10191030
getattr(table.c, id_field_name) == name, table.c.project_id == project
10201031
)
@@ -1036,7 +1047,7 @@ def _list_objects(
10361047
proto_field_name: str,
10371048
tags: Optional[dict[str, str]] = None,
10381049
):
1039-
with self.engine.begin() as conn:
1050+
with self.read_engine.begin() as conn:
10401051
stmt = select(table).where(table.c.project_id == project)
10411052
rows = conn.execute(stmt).all()
10421053
if rows:
@@ -1051,7 +1062,7 @@ def _list_objects(
10511062
return []
10521063

10531064
def _set_last_updated_metadata(self, last_updated: datetime, project: str):
1054-
with self.engine.begin() as conn:
1065+
with self.write_engine.begin() as conn:
10551066
stmt = select(feast_metadata).where(
10561067
feast_metadata.c.metadata_key
10571068
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
@@ -1085,7 +1096,7 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
10851096
conn.execute(insert_stmt)
10861097

10871098
def _get_last_updated_metadata(self, project: str):
1088-
with self.engine.begin() as conn:
1099+
with self.read_engine.begin() as conn:
10891100
stmt = select(feast_metadata).where(
10901101
feast_metadata.c.metadata_key
10911102
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
@@ -1130,7 +1141,7 @@ def apply_permission(
11301141
)
11311142

11321143
def delete_permission(self, name: str, project: str, commit: bool = True):
1133-
with self.engine.begin() as conn:
1144+
with self.write_engine.begin() as conn:
11341145
stmt = delete(permissions).where(
11351146
permissions.c.permission_name == name,
11361147
permissions.c.project_id == project,
@@ -1143,7 +1154,7 @@ def _list_projects(
11431154
self,
11441155
tags: Optional[dict[str, str]],
11451156
) -> List[Project]:
1146-
with self.engine.begin() as conn:
1157+
with self.read_engine.begin() as conn:
11471158
stmt = select(projects)
11481159
rows = conn.execute(stmt).all()
11491160
if rows:
@@ -1188,7 +1199,7 @@ def delete_project(
11881199
):
11891200
project = self.get_project(name, allow_cache=False)
11901201
if project:
1191-
with self.engine.begin() as conn:
1202+
with self.write_engine.begin() as conn:
11921203
for t in {
11931204
managed_infra,
11941205
saved_datasets,

sdk/python/tests/integration/registration/test_universal_registry.py

+43
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,43 @@ def minio_registry(minio_server):
118118
yield Registry("project", registry_config, None)
119119

120120

121+
POSTGRES_READONLY_USER = "read_only_user"
122+
POSTGRES_READONLY_PASSWORD = "readonly_password"
123+
121124
logger = logging.getLogger(__name__)
122125

123126

127+
def add_pg_read_only_user(
128+
container_host, container_port, db_name, postgres_user, postgres_password
129+
):
130+
# Connect to PostgreSQL as an admin
131+
import psycopg
132+
133+
conn_string = f"dbname={db_name} user={postgres_user} password={postgres_password} host={container_host} port={container_port}"
134+
135+
with psycopg.connect(conn_string) as conn:
136+
user_exists = conn.execute(
137+
f"SELECT 1 FROM pg_catalog.pg_user WHERE usename = '{POSTGRES_READONLY_USER}'"
138+
).fetchone()
139+
if not user_exists:
140+
conn.execute(
141+
f"CREATE USER {POSTGRES_READONLY_USER} WITH PASSWORD '{POSTGRES_READONLY_PASSWORD}';"
142+
)
143+
144+
conn.execute(
145+
f"REVOKE ALL PRIVILEGES ON DATABASE {db_name} FROM {POSTGRES_READONLY_USER};"
146+
)
147+
conn.execute(
148+
f"GRANT CONNECT ON DATABASE {db_name} TO {POSTGRES_READONLY_USER};"
149+
)
150+
conn.execute(
151+
f"GRANT SELECT ON ALL TABLES IN SCHEMA public TO {POSTGRES_READONLY_USER};"
152+
)
153+
conn.execute(
154+
f"ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO {POSTGRES_READONLY_USER};"
155+
)
156+
157+
124158
@pytest.fixture(scope="function")
125159
def pg_registry(postgres_server):
126160
db_name = "".join(random.choices(string.ascii_lowercase, k=10))
@@ -130,13 +164,22 @@ def pg_registry(postgres_server):
130164
container_port = postgres_server.get_exposed_port(5432)
131165
container_host = postgres_server.get_container_host_ip()
132166

167+
add_pg_read_only_user(
168+
container_host,
169+
container_port,
170+
db_name,
171+
postgres_server.username,
172+
postgres_server.password,
173+
)
174+
133175
registry_config = SqlRegistryConfig(
134176
registry_type="sql",
135177
cache_ttl_seconds=2,
136178
cache_mode="sync",
137179
# The `path` must include `+psycopg` in order for `sqlalchemy.create_engine()`
138180
# to understand that we are using psycopg3.
139181
path=f"postgresql+psycopg://{postgres_server.username}:{postgres_server.password}@{container_host}:{container_port}/{db_name}",
182+
read_path=f"postgresql+psycopg://{POSTGRES_READONLY_USER}:{POSTGRES_READONLY_PASSWORD}@{container_host}:{container_port}/{db_name}",
140183
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
141184
thread_pool_executor_worker_count=0,
142185
purge_feast_metadata=False,

0 commit comments

Comments
 (0)