From bcc01f37ba9d3966d6da1691434c3fdb14d4577d Mon Sep 17 00:00:00 2001 From: piket Date: Mon, 9 Oct 2023 09:42:46 -0700 Subject: [PATCH] Revert "Remove blocking during closing the chuld process" --- .../feast/transformation/transformation.go | 5 +- .../transformation/transformation_test.go | 58 ------------------- 2 files changed, 4 insertions(+), 59 deletions(-) delete mode 100644 go/internal/feast/transformation/transformation_test.go diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 01a18dd4d8..882fd5a21b 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -169,8 +169,11 @@ func CallTransformations( transformResp <- TransformChannel{outRecord, nil} } } - p.Close() }) + err := proc.Close() + if err != nil { + return nil, err + } resp := <-transformResp if resp.err != nil { return nil, resp.err diff --git a/go/internal/feast/transformation/transformation_test.go b/go/internal/feast/transformation/transformation_test.go deleted file mode 100644 index b70557cf4f..0000000000 --- a/go/internal/feast/transformation/transformation_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package transformation - -import ( - "github.com/apache/arrow/go/v8/arrow" - "github.com/feast-dev/feast/go/internal/feast/model" - "github.com/feast-dev/feast/go/protos/feast/core" - prototypes "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/stretchr/testify/assert" - "testing" -) - -func createFeature(name string, valueType prototypes.ValueType_Enum) *core.FeatureSpecV2 { - return &core.FeatureSpecV2{ - Name: name, - ValueType: valueType, - } -} - -func createOnDemandFeatureView(name string, featureSources map[string][]*core.FeatureSpecV2, features ...*core.FeatureSpecV2) *model.OnDemandFeatureView { - sources := make(map[string]*core.OnDemandSource) - for viewName, features := range featureSources { - sources[viewName] = &core.OnDemandSource{ - Source: &core.OnDemandSource_FeatureViewProjection{ - FeatureViewProjection: &core.FeatureViewProjection{ - FeatureViewName: viewName, - FeatureColumns: features, - JoinKeyMap: map[string]string{}, - }, - }, - } - } - - proto := &core.OnDemandFeatureView{ - Spec: &core.OnDemandFeatureViewSpec{ - Name: name, - Sources: sources, - Features: features, - }, - } - return model.NewOnDemandFeatureViewFromProto(proto) -} - -func TestCallTransformationsFailsWithError(t *testing.T) { - featASpec := createFeature("featA", prototypes.ValueType_INT32) - featBSpec := createFeature("featB", prototypes.ValueType_INT32) - onDemandFeature1 := createFeature("featC", prototypes.ValueType_FLOAT) - odfv := createOnDemandFeatureView("odfv", - map[string][]*core.FeatureSpecV2{"viewA": {featASpec}, "viewB": {featBSpec}}, - onDemandFeature1) - - retrievedFeatures := make(map[string]arrow.Array) - requestContextArrow := make(map[string]arrow.Array) - - _, err := CallTransformations(odfv, retrievedFeatures, requestContextArrow, func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int { - return 1 - }, 1, false) - assert.Error(t, err) -}