Skip to content

Commit 0795496

Browse files
feat: Updating FeatureViewProjection and OnDemandFeatureView to add batch_source and entities (feast-dev#4530)
* feat: Updating protos for Projections to include more info Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * adding unit test Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * adding type checking where batch source is already serialized into protobuf Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * almost got everything working and type validation behaving Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * cleaned up and have tests behaving Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * removed comment Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * updated FeatureViewProjection batch_source serialization Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * trying to debug a test Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * handling snowflake issue, cant confirm why it is happening so just going to put a workaround Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * linter Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * trying to handle it correctly Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * handling the else case for from_feature_view_definition Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * adding print Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * adding test of issue Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * think i got everything working now Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> * removing print Signed-off-by: Francisco Javier Arceo <farceo@redhat.com> --------- Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
1 parent d5ef57e commit 0795496

12 files changed

+363
-34
lines changed

protos/feast/core/FeatureViewProjection.proto

+10
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ option java_outer_classname = "FeatureReferenceProto";
66
option java_package = "feast.proto.core";
77

88
import "feast/core/Feature.proto";
9+
import "feast/core/DataSource.proto";
910

1011

1112
// A projection to be applied on top of a FeatureView.
@@ -22,4 +23,13 @@ message FeatureViewProjection {
2223

2324
// Map for entity join_key overrides of feature data entity join_key to entity data join_key
2425
map<string,string> join_key_map = 4;
26+
27+
string timestamp_field = 5;
28+
string date_partition_column = 6;
29+
string created_timestamp_column = 7;
30+
// Batch/Offline DataSource where this view can retrieve offline feature data.
31+
DataSource batch_source = 8;
32+
// Streaming DataSource from where this view can consume "online" feature data.
33+
DataSource stream_source = 9;
34+
2535
}

protos/feast/core/OnDemandFeatureView.proto

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ message OnDemandFeatureViewSpec {
6363
// Owner of the on demand feature view.
6464
string owner = 8;
6565
string mode = 11;
66+
bool write_to_online_store = 12;
67+
68+
// List of names of entities associated with this feature view.
69+
repeated string entities = 13;
70+
// List of specifications for each entity defined as part of this feature view.
71+
repeated FeatureSpecV2 entity_columns = 14;
6672
}
6773

6874
message OnDemandFeatureViewMeta {

sdk/python/feast/base_feature_view.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.protobuf.json_format import MessageToJson
1919
from google.protobuf.message import Message
2020

21+
from feast.data_source import DataSource
2122
from feast.feature_view_projection import FeatureViewProjection
2223
from feast.field import Field
2324
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
@@ -65,6 +66,7 @@ def __init__(
6566
description: str = "",
6667
tags: Optional[Dict[str, str]] = None,
6768
owner: str = "",
69+
source: Optional[DataSource] = None,
6870
):
6971
"""
7072
Creates a BaseFeatureView object.
@@ -76,7 +78,8 @@ def __init__(
7678
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
7779
owner (optional): The owner of the base feature view, typically the email of the
7880
primary maintainer.
79-
81+
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
82+
If a stream source, the source should contain a batch_source for backfills & batch materialization.
8083
Raises:
8184
ValueError: A field mapping conflicts with an Entity or a Feature.
8285
"""
@@ -90,6 +93,9 @@ def __init__(
9093
self.created_timestamp = None
9194
self.last_updated_timestamp = None
9295

96+
if source:
97+
self.source = source
98+
9399
@property
94100
@abstractmethod
95101
def proto_class(self) -> Type[Message]:
@@ -156,6 +162,10 @@ def __eq__(self, other):
156162
or self.tags != other.tags
157163
or self.owner != other.owner
158164
):
165+
# This is meant to ignore the File Source change to Push Source
166+
if isinstance(type(self.source), type(other.source)):
167+
if self.source != other.source:
168+
return False
159169
return False
160170

161171
return True

sdk/python/feast/feature_view.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ def __init__(
206206
description=description,
207207
tags=tags,
208208
owner=owner,
209+
source=source,
209210
)
210211
self.online = online
211212
self.materialization_intervals = []
@@ -429,7 +430,9 @@ def from_proto(cls, feature_view_proto: FeatureViewProto):
429430

430431
# FeatureViewProjections are not saved in the FeatureView proto.
431432
# Create the default projection.
432-
feature_view.projection = FeatureViewProjection.from_definition(feature_view)
433+
feature_view.projection = FeatureViewProjection.from_feature_view_definition(
434+
feature_view
435+
)
433436

434437
if feature_view_proto.meta.HasField("created_timestamp"):
435438
feature_view.created_timestamp = (

sdk/python/feast/feature_view_projection.py

+79-7
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
from attr import dataclass
44

5+
from feast.data_source import DataSource
56
from feast.field import Field
67
from feast.protos.feast.core.FeatureViewProjection_pb2 import (
78
FeatureViewProjection as FeatureViewProjectionProto,
89
)
910

1011
if TYPE_CHECKING:
1112
from feast.base_feature_view import BaseFeatureView
13+
from feast.feature_view import FeatureView
1214

1315

1416
@dataclass
@@ -27,50 +29,120 @@ class FeatureViewProjection:
2729
is not ready to be projected, i.e. still needs to go through feature inference.
2830
join_key_map: A map to modify join key columns during retrieval of this feature
2931
view projection.
32+
timestamp_field: The timestamp field of the feature view projection.
33+
date_partition_column: The date partition column of the feature view projection.
34+
created_timestamp_column: The created timestamp column of the feature view projection.
35+
batch_source: The batch source of data where this group of features
36+
is stored. This is optional ONLY if a push source is specified as the
37+
stream_source, since push sources contain their own batch sources.
38+
3039
"""
3140

3241
name: str
3342
name_alias: Optional[str]
3443
desired_features: List[str]
3544
features: List[Field]
3645
join_key_map: Dict[str, str] = {}
46+
timestamp_field: Optional[str] = None
47+
date_partition_column: Optional[str] = None
48+
created_timestamp_column: Optional[str] = None
49+
batch_source: Optional[DataSource] = None
3750

3851
def name_to_use(self):
3952
return self.name_alias or self.name
4053

4154
def to_proto(self) -> FeatureViewProjectionProto:
55+
batch_source = None
56+
if getattr(self, "batch_source", None):
57+
if isinstance(self.batch_source, DataSource):
58+
batch_source = self.batch_source.to_proto()
59+
else:
60+
batch_source = self.batch_source
4261
feature_reference_proto = FeatureViewProjectionProto(
4362
feature_view_name=self.name,
4463
feature_view_name_alias=self.name_alias or "",
4564
join_key_map=self.join_key_map,
65+
timestamp_field=self.timestamp_field or "",
66+
date_partition_column=self.date_partition_column or "",
67+
created_timestamp_column=self.created_timestamp_column or "",
68+
batch_source=batch_source,
4669
)
4770
for feature in self.features:
4871
feature_reference_proto.feature_columns.append(feature.to_proto())
4972

5073
return feature_reference_proto
5174

5275
@staticmethod
53-
def from_proto(proto: FeatureViewProjectionProto):
76+
def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection":
77+
batch_source = (
78+
DataSource.from_proto(proto.batch_source)
79+
if str(getattr(proto, "batch_source"))
80+
else None
81+
)
5482
feature_view_projection = FeatureViewProjection(
5583
name=proto.feature_view_name,
5684
name_alias=proto.feature_view_name_alias or None,
5785
features=[],
5886
join_key_map=dict(proto.join_key_map),
5987
desired_features=[],
88+
timestamp_field=proto.timestamp_field or None,
89+
date_partition_column=proto.date_partition_column or None,
90+
created_timestamp_column=proto.created_timestamp_column or None,
91+
batch_source=batch_source,
6092
)
6193
for feature_column in proto.feature_columns:
6294
feature_view_projection.features.append(Field.from_proto(feature_column))
6395

6496
return feature_view_projection
6597

98+
@staticmethod
99+
def from_feature_view_definition(feature_view: "FeatureView"):
100+
# TODO need to implement this for StreamFeatureViews
101+
if getattr(feature_view, "batch_source", None):
102+
return FeatureViewProjection(
103+
name=feature_view.name,
104+
name_alias=None,
105+
features=feature_view.features,
106+
desired_features=[],
107+
timestamp_field=feature_view.batch_source.created_timestamp_column
108+
or None,
109+
created_timestamp_column=feature_view.batch_source.created_timestamp_column
110+
or None,
111+
date_partition_column=feature_view.batch_source.date_partition_column
112+
or None,
113+
batch_source=feature_view.batch_source or None,
114+
)
115+
else:
116+
return FeatureViewProjection(
117+
name=feature_view.name,
118+
name_alias=None,
119+
features=feature_view.features,
120+
desired_features=[],
121+
)
122+
66123
@staticmethod
67124
def from_definition(base_feature_view: "BaseFeatureView"):
68-
return FeatureViewProjection(
69-
name=base_feature_view.name,
70-
name_alias=None,
71-
features=base_feature_view.features,
72-
desired_features=[],
73-
)
125+
if getattr(base_feature_view, "batch_source", None):
126+
return FeatureViewProjection(
127+
name=base_feature_view.name,
128+
name_alias=None,
129+
features=base_feature_view.features,
130+
desired_features=[],
131+
timestamp_field=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
132+
or None,
133+
created_timestamp_column=base_feature_view.batch_source.created_timestamp_column # type:ignore[attr-defined]
134+
or None,
135+
date_partition_column=base_feature_view.batch_source.date_partition_column # type:ignore[attr-defined]
136+
or None,
137+
batch_source=base_feature_view.batch_source or None, # type:ignore[attr-defined]
138+
)
139+
else:
140+
return FeatureViewProjection(
141+
name=base_feature_view.name,
142+
name_alias=None,
143+
features=base_feature_view.features,
144+
desired_features=[],
145+
)
74146

75147
def get_feature(self, feature_name: str) -> Field:
76148
try:

0 commit comments

Comments
 (0)