Skip to content

Commit c62377b

Browse files
authored
fix: Make transformation_service_endpoint configuration optional (feast-dev#4880)
* Make transformation_service_endpoint configuration optional Signed-off-by: Dimitris Stafylarakis <xanias@gmail.com> * Add custom error for transformation service, implement featurestore unit tests Signed-off-by: Dimitris Stafylarakis <xanias@gmail.com> --------- Signed-off-by: Dimitris Stafylarakis <xanias@gmail.com>
1 parent e04d7d5 commit c62377b

File tree

4 files changed

+264
-93
lines changed

4 files changed

+264
-93
lines changed

go/internal/feast/errors.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package feast
2+
3+
import (
4+
"google.golang.org/genproto/googleapis/rpc/errdetails"
5+
"google.golang.org/grpc/codes"
6+
"google.golang.org/grpc/status"
7+
)
8+
9+
type FeastTransformationServiceNotConfigured struct{}
10+
11+
func (FeastTransformationServiceNotConfigured) GRPCStatus() *status.Status {
12+
errorStatus := status.New(codes.Internal, "No transformation service configured")
13+
ds, err := errorStatus.WithDetails(&errdetails.LocalizedMessage{Message: "No transformation service configured, required for on-demand feature transformations"})
14+
if err != nil {
15+
return errorStatus
16+
}
17+
return ds
18+
}
19+
20+
func (e FeastTransformationServiceNotConfigured) Error() string {
21+
return e.GRPCStatus().Err().Error()
22+
}

go/internal/feast/featurestore.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package feast
33
import (
44
"context"
55
"errors"
6-
"fmt"
6+
77
"github.com/apache/arrow/go/v17/arrow/memory"
8+
89
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
910

1011
"github.com/feast-dev/feast/go/internal/feast/model"
@@ -60,17 +61,14 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
6061
return nil, err
6162
}
6263

63-
// Use a scalable transformation service like Python Transformation Service.
64-
// Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file
65-
// under the "feature_server" section.
66-
transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]
67-
if !ok {
68-
fmt.Println("Errors while reading transformation_service_endpoint info")
69-
panic("No transformation service endpoint provided in the feature_store.yaml file.")
64+
var transformationService *transformation.GrpcTransformationService
65+
if transformationServerEndpoint, ok := config.FeatureServer["transformation_service_endpoint"]; ok {
66+
// Use a scalable transformation service like Python Transformation Service.
67+
// Assume the user will define the "transformation_service_endpoint" in the feature_store.yaml file
68+
// under the "feature_server" section.
69+
transformationService, _ = transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string))
7070
}
7171

72-
transformationService, _ := transformation.NewGrpcTransformationService(config, transformationServerEndpoint.(string))
73-
7472
return &FeatureStore{
7573
config: config,
7674
registry: registry,
@@ -112,6 +110,10 @@ func (fs *FeatureStore) GetOnlineFeatures(
112110
return nil, err
113111
}
114112

113+
if len(requestedOnDemandFeatureViews) > 0 && fs.transformationService == nil {
114+
return nil, FeastTransformationServiceNotConfigured{}
115+
}
116+
115117
entityNameToJoinKeyMap, expectedJoinKeysSet, err := onlineserving.GetEntityMaps(requestedFeatureViews, entities)
116118
if err != nil {
117119
return nil, err

go/internal/feast/featurestore_test.go

+197-80
Original file line numberDiff line numberDiff line change
@@ -2,124 +2,241 @@ package feast
22

33
import (
44
"context"
5+
"log"
6+
"os"
57
"path/filepath"
68
"runtime"
79
"testing"
810

911
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/mock"
13+
"github.com/stretchr/testify/require"
1014

1115
"github.com/feast-dev/feast/go/internal/feast/onlinestore"
1216
"github.com/feast-dev/feast/go/internal/feast/registry"
17+
"github.com/feast-dev/feast/go/internal/test"
18+
"github.com/feast-dev/feast/go/protos/feast/serving"
1319
"github.com/feast-dev/feast/go/protos/feast/types"
1420
)
1521

16-
// Return absolute path to the test_repo registry regardless of the working directory
17-
func getRegistryPath() map[string]interface{} {
22+
var featureRepoBasePath string
23+
var featureRepoRegistryFile string
24+
25+
func TestMain(m *testing.M) {
1826
// Get the file path of this source file, regardless of the working directory
1927
_, filename, _, ok := runtime.Caller(0)
2028
if !ok {
21-
panic("couldn't find file path of the test file")
29+
log.Print("couldn't find file path of the test file")
30+
os.Exit(1)
2231
}
23-
registry := map[string]interface{}{
24-
"path": filepath.Join(filename, "..", "..", "..", "feature_repo/data/registry.db"),
32+
featureRepoBasePath = filepath.Join(filename, "..", "..", "test")
33+
featureRepoRegistryFile = filepath.Join(featureRepoBasePath, "feature_repo", "data", "registry.db")
34+
if err := test.SetupInitializedRepo(featureRepoBasePath); err != nil {
35+
log.Print("Could not initialize test repo: ", err)
36+
os.Exit(1)
2537
}
26-
return registry
38+
os.Exit(m.Run())
2739
}
2840

2941
func TestNewFeatureStore(t *testing.T) {
30-
t.Skip("@todo(achals): feature_repo isn't checked in yet")
31-
config := registry.RepoConfig{
32-
Project: "feature_repo",
33-
Registry: getRegistryPath(),
34-
Provider: "local",
35-
OnlineStore: map[string]interface{}{
36-
"type": "redis",
37-
},
38-
}
39-
fs, err := NewFeatureStore(&config, nil)
40-
assert.Nil(t, err)
41-
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)
42-
43-
t.Run("valid config", func(t *testing.T) {
44-
config := &registry.RepoConfig{
45-
Project: "feature_repo",
46-
Registry: getRegistryPath(),
47-
Provider: "local",
48-
OnlineStore: map[string]interface{}{
49-
"type": "redis",
42+
tests := []struct {
43+
name string
44+
config *registry.RepoConfig
45+
expectOnlineStoreType interface{}
46+
errMessage string
47+
}{
48+
{
49+
name: "valid config",
50+
config: &registry.RepoConfig{
51+
Project: "feature_repo",
52+
Registry: map[string]interface{}{
53+
"path": featureRepoRegistryFile,
54+
},
55+
Provider: "local",
56+
OnlineStore: map[string]interface{}{
57+
"type": "redis",
58+
},
5059
},
51-
FeatureServer: map[string]interface{}{
52-
"transformation_service_endpoint": "localhost:50051",
60+
expectOnlineStoreType: &onlinestore.RedisOnlineStore{},
61+
},
62+
{
63+
name: "valid config with transformation service endpoint",
64+
config: &registry.RepoConfig{
65+
Project: "feature_repo",
66+
Registry: map[string]interface{}{
67+
"path": featureRepoRegistryFile,
68+
},
69+
Provider: "local",
70+
OnlineStore: map[string]interface{}{
71+
"type": "redis",
72+
},
73+
FeatureServer: map[string]interface{}{
74+
"transformation_service_endpoint": "localhost:50051",
75+
},
5376
},
54-
}
55-
fs, err := NewFeatureStore(config, nil)
56-
assert.Nil(t, err)
57-
assert.NotNil(t, fs)
58-
assert.IsType(t, &onlinestore.RedisOnlineStore{}, fs.onlineStore)
59-
assert.NotNil(t, fs.transformationService)
60-
})
61-
62-
t.Run("missing transformation service endpoint", func(t *testing.T) {
63-
config := &registry.RepoConfig{
64-
Project: "feature_repo",
65-
Registry: getRegistryPath(),
66-
Provider: "local",
67-
OnlineStore: map[string]interface{}{
68-
"type": "redis",
77+
expectOnlineStoreType: &onlinestore.RedisOnlineStore{},
78+
},
79+
{
80+
name: "invalid online store config",
81+
config: &registry.RepoConfig{
82+
Project: "feature_repo",
83+
Registry: map[string]interface{}{
84+
"path": featureRepoRegistryFile,
85+
},
86+
Provider: "local",
87+
OnlineStore: map[string]interface{}{
88+
"type": "invalid_store",
89+
},
6990
},
70-
}
71-
defer func() {
72-
if r := recover(); r == nil {
73-
t.Errorf("The code did not panic")
91+
errMessage: "invalid_store online store type is currently not supported",
92+
},
93+
}
94+
for _, test := range tests {
95+
t.Run(test.name, func(t *testing.T) {
96+
got, err := NewFeatureStore(test.config, nil)
97+
if test.errMessage != "" {
98+
assert.Nil(t, got)
99+
require.Error(t, err)
100+
assert.ErrorContains(t, err, test.errMessage)
101+
102+
} else {
103+
require.NoError(t, err)
104+
assert.NotNil(t, got)
105+
assert.IsType(t, test.expectOnlineStoreType, got.onlineStore)
74106
}
75-
}()
76-
NewFeatureStore(config, nil)
77-
})
78-
79-
t.Run("invalid online store config", func(t *testing.T) {
80-
config := &registry.RepoConfig{
81-
Project: "feature_repo",
82-
Registry: getRegistryPath(),
83-
Provider: "local",
84-
OnlineStore: map[string]interface{}{
85-
"type": "invalid_store",
107+
})
108+
}
109+
110+
}
111+
112+
type MockRedis struct {
113+
mock.Mock
114+
}
115+
116+
func (m *MockRedis) Destruct() {}
117+
func (m *MockRedis) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]onlinestore.FeatureData, error) {
118+
args := m.Called(ctx, entityKeys, featureViewNames, featureNames)
119+
var fd [][]onlinestore.FeatureData
120+
if args.Get(0) != nil {
121+
fd = args.Get(0).([][]onlinestore.FeatureData)
122+
}
123+
return fd, args.Error(1)
124+
}
125+
126+
func TestGetOnlineFeatures(t *testing.T) {
127+
tests := []struct {
128+
name string
129+
config *registry.RepoConfig
130+
fn func(*testing.T, *FeatureStore)
131+
}{
132+
{
133+
name: "redis with simple features",
134+
config: &registry.RepoConfig{
135+
Project: "feature_repo",
136+
Registry: map[string]interface{}{
137+
"path": featureRepoRegistryFile,
138+
},
139+
Provider: "local",
140+
OnlineStore: map[string]interface{}{
141+
"type": "redis",
142+
"connection_string": "localhost:6379",
143+
},
86144
},
87-
FeatureServer: map[string]interface{}{
88-
"transformation_service_endpoint": "localhost:50051",
145+
fn: testRedisSimpleFeatures,
146+
},
147+
{
148+
name: "redis with On-demand feature views, no transformation service endpoint",
149+
config: &registry.RepoConfig{
150+
Project: "feature_repo",
151+
Registry: map[string]interface{}{
152+
"path": featureRepoRegistryFile,
153+
},
154+
Provider: "local",
155+
OnlineStore: map[string]interface{}{
156+
"type": "redis",
157+
"connection_string": "localhost:6379",
158+
},
89159
},
90-
}
91-
fs, err := NewFeatureStore(config, nil)
92-
assert.NotNil(t, err)
93-
assert.Nil(t, fs)
94-
})
160+
fn: testRedisODFVNoTransformationService,
161+
},
162+
}
163+
for _, test := range tests {
164+
t.Run(test.name, func(t *testing.T) {
165+
166+
fs, err := NewFeatureStore(test.config, nil)
167+
require.Nil(t, err)
168+
fs.onlineStore = new(MockRedis)
169+
test.fn(t, fs)
170+
})
171+
172+
}
95173
}
96174

97-
func TestGetOnlineFeaturesRedis(t *testing.T) {
98-
t.Skip("@todo(achals): feature_repo isn't checked in yet")
99-
config := registry.RepoConfig{
100-
Project: "feature_repo",
101-
Registry: getRegistryPath(),
102-
Provider: "local",
103-
OnlineStore: map[string]interface{}{
104-
"type": "redis",
105-
"connection_string": "localhost:6379",
175+
func testRedisSimpleFeatures(t *testing.T, fs *FeatureStore) {
176+
177+
featureNames := []string{"driver_hourly_stats:conv_rate",
178+
"driver_hourly_stats:acc_rate",
179+
"driver_hourly_stats:avg_daily_trips",
180+
}
181+
entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}},
182+
{Val: &types.Value_Int64Val{Int64Val: 1002}},
183+
}}}
184+
185+
results := [][]onlinestore.FeatureData{
186+
{
187+
{
188+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"},
189+
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 12.0}},
190+
},
191+
{
192+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"},
193+
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 1.0}},
194+
},
195+
{
196+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"},
197+
Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 100}},
198+
},
199+
},
200+
{
201+
202+
{
203+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "conv_rate"},
204+
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 24.0}},
205+
},
206+
{
207+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "acc_rate"},
208+
Value: types.Value{Val: &types.Value_FloatVal{FloatVal: 2.0}},
209+
},
210+
{
211+
Reference: serving.FeatureReferenceV2{FeatureViewName: "driver_hourly_stats", FeatureName: "avg_daily_trips"},
212+
Value: types.Value{Val: &types.Value_Int64Val{Int64Val: 130}},
213+
},
106214
},
107215
}
216+
ctx := context.Background()
217+
mr := fs.onlineStore.(*MockRedis)
218+
mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(results, nil)
219+
response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
220+
require.Nil(t, err)
221+
assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response
222+
}
108223

224+
func testRedisODFVNoTransformationService(t *testing.T, fs *FeatureStore) {
109225
featureNames := []string{"driver_hourly_stats:conv_rate",
110226
"driver_hourly_stats:acc_rate",
111227
"driver_hourly_stats:avg_daily_trips",
228+
"transformed_conv_rate:conv_rate_plus_val1",
112229
}
113230
entities := map[string]*types.RepeatedValue{"driver_id": {Val: []*types.Value{{Val: &types.Value_Int64Val{Int64Val: 1001}},
114231
{Val: &types.Value_Int64Val{Int64Val: 1002}},
115232
{Val: &types.Value_Int64Val{Int64Val: 1003}}}},
116233
}
117234

118-
fs, err := NewFeatureStore(&config, nil)
119-
assert.Nil(t, err)
120235
ctx := context.Background()
121-
response, err := fs.GetOnlineFeatures(
122-
ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
123-
assert.Nil(t, err)
124-
assert.Len(t, response, 4) // 3 Features + 1 entity = 4 columns (feature vectors) in response
236+
mr := fs.onlineStore.(*MockRedis)
237+
mr.On("OnlineRead", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
238+
response, err := fs.GetOnlineFeatures(ctx, featureNames, nil, entities, map[string]*types.RepeatedValue{}, true)
239+
assert.Nil(t, response)
240+
assert.ErrorAs(t, err, &FeastTransformationServiceNotConfigured{})
241+
125242
}

0 commit comments

Comments
 (0)