Skip to content

Commit 5f5caf0

Browse files
authored
feat: Return entity key in the retrieval document api (#4511)
* update entity retrieval and add duckdb Signed-off-by: cmuhao <sduxuhao@gmail.com> * lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix typo Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix lint Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> * fix test Signed-off-by: cmuhao <sduxuhao@gmail.com> --------- Signed-off-by: cmuhao <sduxuhao@gmail.com>
1 parent 4e2eacc commit 5f5caf0

File tree

9 files changed

+149
-61
lines changed

9 files changed

+149
-61
lines changed

sdk/python/feast/feature_store.py

+34-12
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@
7878
FieldStatus,
7979
GetOnlineFeaturesResponse,
8080
)
81+
from feast.protos.feast.types.EntityKey_pb2 import EntityKey
8182
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
83+
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
8284
from feast.repo_config import RepoConfig, load_repo_config
8385
from feast.repo_contents import RepoContents
8486
from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference
@@ -1666,20 +1668,29 @@ def retrieve_online_documents(
16661668
distance_metric,
16671669
)
16681670

1669-
# TODO Refactor to better way of populating result
1670-
# TODO populate entity in the response after returning entity in document_features is supported
16711671
# TODO currently not return the vector value since it is same as feature value, if embedding is supported,
16721672
# the feature value can be raw text before embedded
1673-
document_feature_vals = [feature[2] for feature in document_features]
1674-
document_feature_distance_vals = [feature[4] for feature in document_features]
1673+
entity_key_vals = [feature[1] for feature in document_features]
1674+
join_key_values: Dict[str, List[ValueProto]] = {}
1675+
for entity_key_val in entity_key_vals:
1676+
if entity_key_val is not None:
1677+
for join_key, entity_value in zip(
1678+
entity_key_val.join_keys, entity_key_val.entity_values
1679+
):
1680+
if join_key not in join_key_values:
1681+
join_key_values[join_key] = []
1682+
join_key_values[join_key].append(entity_value)
1683+
1684+
document_feature_vals = [feature[4] for feature in document_features]
1685+
document_feature_distance_vals = [feature[5] for feature in document_features]
16751686
online_features_response = GetOnlineFeaturesResponse(results=[])
16761687
utils._populate_result_rows_from_columnar(
16771688
online_features_response=online_features_response,
1678-
data={requested_feature: document_feature_vals},
1679-
)
1680-
utils._populate_result_rows_from_columnar(
1681-
online_features_response=online_features_response,
1682-
data={"distance": document_feature_distance_vals},
1689+
data={
1690+
**join_key_values,
1691+
requested_feature: document_feature_vals,
1692+
"distance": document_feature_distance_vals,
1693+
},
16831694
)
16841695
return OnlineResponse(online_features_response)
16851696

@@ -1691,7 +1702,11 @@ def _retrieve_from_online_store(
16911702
query: List[float],
16921703
top_k: int,
16931704
distance_metric: Optional[str],
1694-
) -> List[Tuple[Timestamp, "FieldStatus.ValueType", Value, Value, Value]]:
1705+
) -> List[
1706+
Tuple[
1707+
Timestamp, Optional[EntityKey], "FieldStatus.ValueType", Value, Value, Value
1708+
]
1709+
]:
16951710
"""
16961711
Search and return document features from the online document store.
16971712
"""
@@ -1707,7 +1722,7 @@ def _retrieve_from_online_store(
17071722
read_row_protos = []
17081723
row_ts_proto = Timestamp()
17091724

1710-
for row_ts, feature_val, vector_value, distance_val in documents:
1725+
for row_ts, entity_key, feature_val, vector_value, distance_val in documents:
17111726
# Reset timestamp to default or update if row_ts is not None
17121727
if row_ts is not None:
17131728
row_ts_proto.FromDatetime(row_ts)
@@ -1721,7 +1736,14 @@ def _retrieve_from_online_store(
17211736
status = FieldStatus.PRESENT
17221737

17231738
read_row_protos.append(
1724-
(row_ts_proto, status, feature_val, vector_value, distance_val)
1739+
(
1740+
row_ts_proto,
1741+
entity_key,
1742+
status,
1743+
feature_val,
1744+
vector_value,
1745+
distance_val,
1746+
)
17251747
)
17261748
return read_row_protos
17271749

sdk/python/feast/infra/online_stores/contrib/elasticsearch.py

+14-11
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@
99
from elasticsearch import Elasticsearch, helpers
1010

1111
from feast import Entity, FeatureView, RepoConfig
12-
from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key
12+
from feast.infra.key_encoding_utils import (
13+
get_list_val_str,
14+
serialize_entity_key,
15+
)
1316
from feast.infra.online_stores.online_store import OnlineStore
1417
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
1518
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
1619
from feast.repo_config import FeastConfigBaseModel
17-
from feast.utils import to_naive_utc
20+
from feast.utils import _build_retrieve_online_document_record, to_naive_utc
1821

1922

2023
class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel):
@@ -224,6 +227,7 @@ def retrieve_online_documents(
224227
) -> List[
225228
Tuple[
226229
Optional[datetime],
230+
Optional[EntityKeyProto],
227231
Optional[ValueProto],
228232
Optional[ValueProto],
229233
Optional[ValueProto],
@@ -232,6 +236,7 @@ def retrieve_online_documents(
232236
result: List[
233237
Tuple[
234238
Optional[datetime],
239+
Optional[EntityKeyProto],
235240
Optional[ValueProto],
236241
Optional[ValueProto],
237242
Optional[ValueProto],
@@ -247,23 +252,21 @@ def retrieve_online_documents(
247252
)
248253
rows = response["hits"]["hits"][0:top_k]
249254
for row in rows:
255+
entity_key = row["_source"]["entity_key"]
250256
feature_value = row["_source"]["feature_value"]
251257
vector_value = row["_source"]["vector_value"]
252258
timestamp = row["_source"]["timestamp"]
253259
distance = row["_score"]
254260
timestamp = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%f")
255261

256-
feature_value_proto = ValueProto()
257-
feature_value_proto.ParseFromString(base64.b64decode(feature_value))
258-
259-
vector_value_proto = ValueProto(string_val=str(vector_value))
260-
distance_value_proto = ValueProto(float_val=distance)
261262
result.append(
262-
(
263+
_build_retrieve_online_document_record(
264+
entity_key,
265+
base64.b64decode(feature_value),
266+
str(vector_value),
267+
distance,
263268
timestamp,
264-
feature_value_proto,
265-
vector_value_proto,
266-
distance_value_proto,
269+
config.entity_key_serialization_version,
267270
)
268271
)
269272
return result

sdk/python/feast/infra/online_stores/contrib/postgres.py

+15-22
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
3838
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
3939
from feast.repo_config import RepoConfig
40+
from feast.utils import _build_retrieve_online_document_record
4041

4142
SUPPORTED_DISTANCE_METRICS_DICT = {
4243
"cosine": "<=>",
@@ -360,6 +361,7 @@ def retrieve_online_documents(
360361
) -> List[
361362
Tuple[
362363
Optional[datetime],
364+
Optional[EntityKeyProto],
363365
Optional[ValueProto],
364366
Optional[ValueProto],
365367
Optional[ValueProto],
@@ -391,12 +393,11 @@ def retrieve_online_documents(
391393
)
392394

393395
distance_metric_sql = SUPPORTED_DISTANCE_METRICS_DICT[distance_metric]
394-
# Convert the embedding to a string to be used in postgres vector search
395-
query_embedding_str = f"[{','.join(str(el) for el in embedding)}]"
396396

397397
result: List[
398398
Tuple[
399399
Optional[datetime],
400+
Optional[EntityKeyProto],
400401
Optional[ValueProto],
401402
Optional[ValueProto],
402403
Optional[ValueProto],
@@ -415,45 +416,37 @@ def retrieve_online_documents(
415416
feature_name,
416417
value,
417418
vector_value,
418-
vector_value {distance_metric_sql} %s as distance,
419+
vector_value {distance_metric_sql} %s::vector as distance,
419420
event_ts FROM {table_name}
420421
WHERE feature_name = {feature_name}
421422
ORDER BY distance
422423
LIMIT {top_k};
423424
"""
424425
).format(
425-
distance_metric_sql=distance_metric_sql,
426+
distance_metric_sql=sql.SQL(distance_metric_sql),
426427
table_name=sql.Identifier(table_name),
427428
feature_name=sql.Literal(requested_feature),
428429
top_k=sql.Literal(top_k),
429430
),
430-
(query_embedding_str,),
431+
(embedding,),
431432
)
432433
rows = cur.fetchall()
433-
434434
for (
435435
entity_key,
436-
feature_name,
437-
value,
436+
_,
437+
feature_val,
438438
vector_value,
439-
distance,
439+
distance_val,
440440
event_ts,
441441
) in rows:
442-
# TODO Deserialize entity_key to return the entity in response
443-
# entity_key_proto = EntityKeyProto()
444-
# entity_key_proto_bin = bytes(entity_key)
445-
446-
feature_value_proto = ValueProto()
447-
feature_value_proto.ParseFromString(bytes(value))
448-
449-
vector_value_proto = ValueProto(string_val=vector_value)
450-
distance_value_proto = ValueProto(float_val=distance)
451442
result.append(
452-
(
443+
_build_retrieve_online_document_record(
444+
entity_key,
445+
feature_val,
446+
vector_value,
447+
distance_val,
453448
event_ts,
454-
feature_value_proto,
455-
vector_value_proto,
456-
distance_value_proto,
449+
config.entity_key_serialization_version,
457450
)
458451
)
459452

sdk/python/feast/infra/online_stores/online_store.py

+1
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ def retrieve_online_documents(
349349
) -> List[
350350
Tuple[
351351
Optional[datetime],
352+
Optional[EntityKeyProto],
352353
Optional[ValueProto],
353354
Optional[ValueProto],
354355
Optional[ValueProto],

sdk/python/feast/infra/online_stores/sqlite.py

+9-13
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@
3333
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
3434
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
3535
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
36-
from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto
3736
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
3837
from feast.repo_config import FeastConfigBaseModel, RepoConfig
39-
from feast.utils import to_naive_utc
38+
from feast.utils import _build_retrieve_online_document_record, to_naive_utc
4039

4140

4241
class SqliteOnlineStoreConfig(FeastConfigBaseModel):
@@ -303,6 +302,7 @@ def retrieve_online_documents(
303302
) -> List[
304303
Tuple[
305304
Optional[datetime],
305+
Optional[EntityKeyProto],
306306
Optional[ValueProto],
307307
Optional[ValueProto],
308308
Optional[ValueProto],
@@ -385,26 +385,22 @@ def retrieve_online_documents(
385385
result: List[
386386
Tuple[
387387
Optional[datetime],
388+
Optional[EntityKeyProto],
388389
Optional[ValueProto],
389390
Optional[ValueProto],
390391
Optional[ValueProto],
391392
]
392393
] = []
393394

394395
for entity_key, _, string_value, distance, event_ts in rows:
395-
feature_value_proto = ValueProto()
396-
feature_value_proto.ParseFromString(string_value if string_value else b"")
397-
vector_value_proto = ValueProto(
398-
float_list_val=FloatListProto(val=embedding)
399-
)
400-
distance_value_proto = ValueProto(float_val=distance)
401-
402396
result.append(
403-
(
397+
_build_retrieve_online_document_record(
398+
entity_key,
399+
string_value if string_value else b"",
400+
embedding,
401+
distance,
404402
event_ts,
405-
feature_value_proto,
406-
vector_value_proto,
407-
distance_value_proto,
403+
config.entity_key_serialization_version,
408404
)
409405
)
410406

sdk/python/feast/infra/provider.py

+1
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ def retrieve_online_documents(
364364
) -> List[
365365
Tuple[
366366
Optional[datetime],
367+
Optional[EntityKeyProto],
367368
Optional[ValueProto],
368369
Optional[ValueProto],
369370
Optional[ValueProto],

sdk/python/feast/utils.py

+50-1
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
FeatureViewNotFoundException,
3434
RequestDataNotFoundInEntityRowsException,
3535
)
36+
from feast.infra.key_encoding_utils import deserialize_entity_key
3637
from feast.protos.feast.serving.ServingService_pb2 import (
3738
FieldStatus,
3839
GetOnlineFeaturesResponse,
3940
)
4041
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
42+
from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto
4143
from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto
4244
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
4345
from feast.type_map import python_values_to_proto_values
@@ -49,7 +51,6 @@
4951
from feast.feature_view import FeatureView
5052
from feast.on_demand_feature_view import OnDemandFeatureView
5153

52-
5354
APPLICATION_NAME = "feast-dev/feast"
5455
USER_AGENT = "{}/{}".format(APPLICATION_NAME, get_version())
5556

@@ -1050,3 +1051,51 @@ def tags_str_to_dict(tags: str = "") -> dict[str, str]:
10501051

10511052
def _utc_now() -> datetime:
10521053
return datetime.now(tz=timezone.utc)
1054+
1055+
1056+
def _build_retrieve_online_document_record(
1057+
entity_key: Union[str, bytes],
1058+
feature_value: Union[str, bytes],
1059+
vector_value: Union[str, List[float]],
1060+
distance_value: float,
1061+
event_timestamp: datetime,
1062+
entity_key_serialization_version: int,
1063+
) -> Tuple[
1064+
Optional[datetime],
1065+
Optional[EntityKeyProto],
1066+
Optional[ValueProto],
1067+
Optional[ValueProto],
1068+
Optional[ValueProto],
1069+
]:
1070+
if entity_key_serialization_version < 3:
1071+
entity_key_proto = None
1072+
else:
1073+
if isinstance(entity_key, str):
1074+
entity_key_proto_bin = entity_key.encode("utf-8")
1075+
else:
1076+
entity_key_proto_bin = entity_key
1077+
entity_key_proto = deserialize_entity_key(
1078+
entity_key_proto_bin,
1079+
entity_key_serialization_version=entity_key_serialization_version,
1080+
)
1081+
1082+
feature_value_proto = ValueProto()
1083+
1084+
if isinstance(feature_value, str):
1085+
feature_value_proto.ParseFromString(feature_value.encode("utf-8"))
1086+
else:
1087+
feature_value_proto.ParseFromString(feature_value)
1088+
1089+
if isinstance(vector_value, str):
1090+
vector_value_proto = ValueProto(string_val=vector_value)
1091+
else:
1092+
vector_value_proto = ValueProto(float_list_val=FloatListProto(val=vector_value))
1093+
1094+
distance_value_proto = ValueProto(float_val=distance_value)
1095+
return (
1096+
event_timestamp,
1097+
entity_key_proto,
1098+
feature_value_proto,
1099+
vector_value_proto,
1100+
distance_value_proto,
1101+
)

0 commit comments

Comments
 (0)