diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 882fd5a21b..01a18dd4d8 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -169,11 +169,8 @@ func CallTransformations( transformResp <- TransformChannel{outRecord, nil} } } + p.Close() }) - err := proc.Close() - if err != nil { - return nil, err - } resp := <-transformResp if resp.err != nil { return nil, resp.err diff --git a/go/internal/feast/transformation/transformation_test.go b/go/internal/feast/transformation/transformation_test.go new file mode 100644 index 0000000000..b70557cf4f --- /dev/null +++ b/go/internal/feast/transformation/transformation_test.go @@ -0,0 +1,58 @@ +package transformation + +import ( + "github.com/apache/arrow/go/v8/arrow" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/core" + prototypes "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/stretchr/testify/assert" + "testing" +) + +func createFeature(name string, valueType prototypes.ValueType_Enum) *core.FeatureSpecV2 { + return &core.FeatureSpecV2{ + Name: name, + ValueType: valueType, + } +} + +func createOnDemandFeatureView(name string, featureSources map[string][]*core.FeatureSpecV2, features ...*core.FeatureSpecV2) *model.OnDemandFeatureView { + sources := make(map[string]*core.OnDemandSource) + for viewName, features := range featureSources { + sources[viewName] = &core.OnDemandSource{ + Source: &core.OnDemandSource_FeatureViewProjection{ + FeatureViewProjection: &core.FeatureViewProjection{ + FeatureViewName: viewName, + FeatureColumns: features, + JoinKeyMap: map[string]string{}, + }, + }, + } + } + + proto := &core.OnDemandFeatureView{ + Spec: &core.OnDemandFeatureViewSpec{ + Name: name, + Sources: sources, + Features: features, + }, + } + return model.NewOnDemandFeatureViewFromProto(proto) +} + +func TestCallTransformationsFailsWithError(t *testing.T) { + featASpec := createFeature("featA", prototypes.ValueType_INT32) + featBSpec := createFeature("featB", prototypes.ValueType_INT32) + onDemandFeature1 := createFeature("featC", prototypes.ValueType_FLOAT) + odfv := createOnDemandFeatureView("odfv", + map[string][]*core.FeatureSpecV2{"viewA": {featASpec}, "viewB": {featBSpec}}, + onDemandFeature1) + + retrievedFeatures := make(map[string]arrow.Array) + requestContextArrow := make(map[string]arrow.Array) + + _, err := CallTransformations(odfv, retrievedFeatures, requestContextArrow, func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { + return 1 + }, 1, false) + assert.Error(t, err) +}