Skip to content

Commit

Permalink
Merge pull request #54 from ExpediaGroup/tipike/odfv_testing
Browse files Browse the repository at this point in the history
Remove blocking during closing the chuld process
  • Loading branch information
piket authored Oct 6, 2023
2 parents f676400 + f1fe084 commit 0894c2d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
5 changes: 1 addition & 4 deletions go/internal/feast/transformation/transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,8 @@ func CallTransformations(
transformResp <- TransformChannel{outRecord, nil}
}
}
p.Close()
})
err := proc.Close()
if err != nil {
return nil, err
}
resp := <-transformResp
if resp.err != nil {
return nil, resp.err
Expand Down
58 changes: 58 additions & 0 deletions go/internal/feast/transformation/transformation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transformation

import (
"github.com/apache/arrow/go/v8/arrow"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/protos/feast/core"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/stretchr/testify/assert"
"testing"
)

func createFeature(name string, valueType prototypes.ValueType_Enum) *core.FeatureSpecV2 {
return &core.FeatureSpecV2{
Name: name,
ValueType: valueType,
}
}

func createOnDemandFeatureView(name string, featureSources map[string][]*core.FeatureSpecV2, features ...*core.FeatureSpecV2) *model.OnDemandFeatureView {
sources := make(map[string]*core.OnDemandSource)
for viewName, features := range featureSources {
sources[viewName] = &core.OnDemandSource{
Source: &core.OnDemandSource_FeatureViewProjection{
FeatureViewProjection: &core.FeatureViewProjection{
FeatureViewName: viewName,
FeatureColumns: features,
JoinKeyMap: map[string]string{},
},
},
}
}

proto := &core.OnDemandFeatureView{
Spec: &core.OnDemandFeatureViewSpec{
Name: name,
Sources: sources,
Features: features,
},
}
return model.NewOnDemandFeatureViewFromProto(proto)
}

func TestCallTransformationsFailsWithError(t *testing.T) {
featASpec := createFeature("featA", prototypes.ValueType_INT32)
featBSpec := createFeature("featB", prototypes.ValueType_INT32)
onDemandFeature1 := createFeature("featC", prototypes.ValueType_FLOAT)
odfv := createOnDemandFeatureView("odfv",
map[string][]*core.FeatureSpecV2{"viewA": {featASpec}, "viewB": {featBSpec}},
onDemandFeature1)

retrievedFeatures := make(map[string]arrow.Array)
requestContextArrow := make(map[string]arrow.Array)

_, err := CallTransformations(odfv, retrievedFeatures, requestContextArrow, func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int {
return 1
}, 1, false)
assert.Error(t, err)
}

0 comments on commit 0894c2d

Please sign in to comment.