From aac7f3b6661d1060a403c9a1a75b59051567a96a Mon Sep 17 00:00:00 2001 From: Timon Pike Date: Fri, 6 Oct 2023 14:12:32 -0700 Subject: [PATCH 1/3] add test for transformation --- .../transformation/transformation_test.go | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 go/internal/feast/transformation/transformation_test.go diff --git a/go/internal/feast/transformation/transformation_test.go b/go/internal/feast/transformation/transformation_test.go new file mode 100644 index 0000000000..141ec52435 --- /dev/null +++ b/go/internal/feast/transformation/transformation_test.go @@ -0,0 +1,58 @@ +package transformation + +import ( + "github.com/apache/arrow/go/v8/arrow" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/core" + prototypes "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func createFeature(name string, valueType prototypes.ValueType_Enum) *core.FeatureSpecV2 { + return &core.FeatureSpecV2{ + Name: name, + ValueType: valueType, + } +} + +func createOnDemandFeatureView(name string, featureSources map[string][]*core.FeatureSpecV2, features ...*core.FeatureSpecV2) *model.OnDemandFeatureView { + sources := make(map[string]*core.OnDemandSource) + for viewName, features := range featureSources { + sources[viewName] = &core.OnDemandSource{ + Source: &core.OnDemandSource_FeatureViewProjection{ + FeatureViewProjection: &core.FeatureViewProjection{ + FeatureViewName: viewName, + FeatureColumns: features, + JoinKeyMap: map[string]string{}, + }, + }, + } + } + + proto := &core.OnDemandFeatureView{ + Spec: &core.OnDemandFeatureViewSpec{ + Name: name, + Sources: sources, + Features: features, + }, + } + return model.NewOnDemandFeatureViewFromProto(proto) +} + +func TestCallTransformations(t *testing.T) { + featASpec := createFeature("featA", prototypes.ValueType_INT32) + featBSpec := createFeature("featB", prototypes.ValueType_INT32) + onDemandFeature1 := createFeature("featC", prototypes.ValueType_FLOAT) + odfv := createOnDemandFeatureView("odfv", + map[string][]*core.FeatureSpecV2{"viewA": {featASpec}, "viewB": {featBSpec}}, + onDemandFeature1) + + retrievedFeatures := make(map[string]arrow.Array) + requestContextArrow := make(map[string]arrow.Array) + + _, err := CallTransformations(odfv, retrievedFeatures, requestContextArrow, func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { + return 1 + }, 1, false) + assert.Error(t, err) +} From c8fefecd5bb684001961db9cfaa0be79b29f7a94 Mon Sep 17 00:00:00 2001 From: Timon Pike Date: Fri, 6 Oct 2023 14:38:45 -0700 Subject: [PATCH 2/3] error immediately and don't wait for close to respond --- go/internal/feast/transformation/transformation.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 882fd5a21b..74294442be 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -170,14 +170,14 @@ func CallTransformations( } } }) - err := proc.Close() - if err != nil { - return nil, err - } resp := <-transformResp if resp.err != nil { return nil, resp.err } + err := proc.Close() + if err != nil { + return nil, err + } result := make([]*onlineserving.FeatureVector, 0) for idx, field := range resp.outRecord.Schema().Fields() { From f1fe084a7d953fdd572e74cb919078eddf785723 Mon Sep 17 00:00:00 2001 From: Timon Pike Date: Fri, 6 Oct 2023 14:55:26 -0700 Subject: [PATCH 3/3] try closing the process inside the process definition, don't have the parent wait. --- go/internal/feast/transformation/transformation.go | 5 +---- go/internal/feast/transformation/transformation_test.go | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 74294442be..01a18dd4d8 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -169,15 +169,12 @@ func CallTransformations( transformResp <- TransformChannel{outRecord, nil} } } + p.Close() }) resp := <-transformResp if resp.err != nil { return nil, resp.err } - err := proc.Close() - if err != nil { - return nil, err - } result := make([]*onlineserving.FeatureVector, 0) for idx, field := range resp.outRecord.Schema().Fields() { diff --git a/go/internal/feast/transformation/transformation_test.go b/go/internal/feast/transformation/transformation_test.go index 141ec52435..b70557cf4f 100644 --- a/go/internal/feast/transformation/transformation_test.go +++ b/go/internal/feast/transformation/transformation_test.go @@ -40,7 +40,7 @@ func createOnDemandFeatureView(name string, featureSources map[string][]*core.Fe return model.NewOnDemandFeatureViewFromProto(proto) } -func TestCallTransformations(t *testing.T) { +func TestCallTransformationsFailsWithError(t *testing.T) { featASpec := createFeature("featA", prototypes.ValueType_INT32) featBSpec := createFeature("featB", prototypes.ValueType_INT32) onDemandFeature1 := createFeature("featC", prototypes.ValueType_FLOAT)