8
8
9
9
from feast import Entity
10
10
from feast .feature_view import FeatureView
11
- from feast .infra .key_encoding_utils import serialize_entity_key
11
+ from feast .infra .online_stores . helpers import compute_entity_id
12
12
from feast .infra .online_stores .online_store import OnlineStore
13
13
from feast .infra .utils .hbase_utils import HbaseConstants , HbaseUtils
14
14
from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
@@ -104,14 +104,15 @@ def online_write_batch(
104
104
105
105
hbase = HbaseUtils (self ._get_conn (config ))
106
106
project = config .project
107
- table_name = _table_id (project , table )
107
+ table_name = self . _table_id (project , table )
108
108
109
109
b = hbase .batch (table_name )
110
110
for entity_key , values , timestamp , created_ts in data :
111
- row_key = serialize_entity_key (
111
+ row_key = self . _hbase_row_key (
112
112
entity_key ,
113
- entity_key_serialization_version = config .entity_key_serialization_version ,
114
- ).hex ()
113
+ feature_view_name = table .name ,
114
+ config = config ,
115
+ )
115
116
values_dict = {}
116
117
for feature_name , val in values .items ():
117
118
values_dict [
@@ -133,6 +134,9 @@ def online_write_batch(
133
134
b .put (row_key , values_dict )
134
135
b .send ()
135
136
137
+ if progress :
138
+ progress (len (data ))
139
+
136
140
@log_exceptions_and_usage (online_store = "hbase" )
137
141
def online_read (
138
142
self ,
@@ -152,15 +156,16 @@ def online_read(
152
156
"""
153
157
hbase = HbaseUtils (self ._get_conn (config ))
154
158
project = config .project
155
- table_name = _table_id (project , table )
159
+ table_name = self . _table_id (project , table )
156
160
157
161
result : List [Tuple [Optional [datetime ], Optional [Dict [str , ValueProto ]]]] = []
158
162
159
163
row_keys = [
160
- serialize_entity_key (
164
+ self . _hbase_row_key (
161
165
entity_key ,
162
- entity_key_serialization_version = config .entity_key_serialization_version ,
163
- ).hex ()
166
+ feature_view_name = table .name ,
167
+ config = config ,
168
+ )
164
169
for entity_key in entity_keys
165
170
]
166
171
rows = hbase .rows (table_name , row_keys = row_keys )
@@ -206,12 +211,12 @@ def update(
206
211
207
212
# We don't create any special state for the entites in this implementation.
208
213
for table in tables_to_keep :
209
- table_name = _table_id (project , table )
214
+ table_name = self . _table_id (project , table )
210
215
if not hbase .check_if_table_exist (table_name ):
211
216
hbase .create_table_with_default_cf (table_name )
212
217
213
218
for table in tables_to_delete :
214
- table_name = _table_id (project , table )
219
+ table_name = self . _table_id (project , table )
215
220
hbase .delete_table (table_name )
216
221
217
222
def teardown (
@@ -231,16 +236,43 @@ def teardown(
231
236
project = config .project
232
237
233
238
for table in tables :
234
- table_name = _table_id (project , table )
239
+ table_name = self . _table_id (project , table )
235
240
hbase .delete_table (table_name )
236
241
242
+ def _hbase_row_key (
243
+ self ,
244
+ entity_key : EntityKeyProto ,
245
+ feature_view_name : str ,
246
+ config : RepoConfig ,
247
+ ) -> bytes :
248
+ """
249
+ Computes the HBase row key for a given entity key and feature view name.
237
250
238
- def _table_id (project : str , table : FeatureView ) -> str :
239
- """
240
- Returns table name given the project_name and the feature_view.
251
+ Args:
252
+ entity_key (EntityKeyProto): The entity key to compute the row key for.
253
+ feature_view_name (str): The name of the feature view to compute the row key for.
254
+ config (RepoConfig): The configuration for the Feast repository.
241
255
242
- Args:
243
- project: Name of the feast project.
244
- table: Feast FeatureView.
245
- """
246
- return f"{ project } _{ table .name } "
256
+ Returns:
257
+ bytes: The HBase row key for the given entity key and feature view name.
258
+ """
259
+ entity_id = compute_entity_id (
260
+ entity_key ,
261
+ entity_key_serialization_version = config .entity_key_serialization_version ,
262
+ )
263
+ # Even though `entity_id` uniquely identifies an entity, we use the same table
264
+ # for multiple feature_views with the same set of entities.
265
+ # To uniquely identify the row for a feature_view, we suffix the name of the feature_view itself.
266
+ # This also ensures that features for entities from various feature_views are
267
+ # colocated.
268
+ return f"{ entity_id } #{ feature_view_name } " .encode ()
269
+
270
+ def _table_id (self , project : str , table : FeatureView ) -> str :
271
+ """
272
+ Returns table name given the project_name and the feature_view.
273
+
274
+ Args:
275
+ project: Name of the feast project.
276
+ table: Feast FeatureView.
277
+ """
278
+ return f"{ project } :{ table .name } "
0 commit comments