diff --git a/go.mod b/go.mod index 65a667f729..6780e3b384 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,8 @@ require ( ) require github.com/rs/zerolog v1.21.0 +require github.com/jbenet/goprocess v0.1.4 +require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect diff --git a/go.sum b/go.sum index b9d2676f94..a37a6379b7 100644 --- a/go.sum +++ b/go.sum @@ -226,8 +226,13 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa h1:FEZID0R3+pkWLvjmZJ2iL+SZTcb2+/PgVvoyQss/q/I= +github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 161db62faf..bb40434d11 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -3,7 +3,9 @@ package transformation import ( "errors" "fmt" + "github.com/jbenet/goprocess" "runtime" + "runtime/debug" "strings" "unsafe" @@ -19,6 +21,8 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + + _ "github.com/ianlancetaylor/cgosymbolizer" ) /* @@ -28,6 +32,10 @@ Each record batch is being passed as two pointers: pointer to array (data) and p Python function is expected to return number of rows added to the output record batch. */ type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int +type TransformChannel struct { + outRecord arrow.Record + err error +} func AugmentResponseWithOnDemandTransforms( onDemandFeatureViews []*model.OnDemandFeatureView, @@ -63,7 +71,6 @@ func AugmentResponseWithOnDemandTransforms( for _, vector := range features { retrievedFeatures[vector.Name] = vector.Values } - onDemandFeatures, err := CallTransformations( odfv, retrievedFeatures, @@ -95,40 +102,6 @@ func CallTransformations( fullFeatureNames bool, ) ([]*onlineserving.FeatureVector, error) { - inputArr := cdata.CArrowArray{} - inputSchema := cdata.CArrowSchema{} - - outArr := cdata.CArrowArray{} - outSchema := cdata.CArrowSchema{} - - defer cdata.ReleaseCArrowArray(&inputArr) - defer cdata.ReleaseCArrowArray(&outArr) - defer cdata.ReleaseCArrowSchema(&inputSchema) - defer cdata.ReleaseCArrowSchema(&outSchema) - - inputArrPtr := uintptr(unsafe.Pointer(&inputArr)) - inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema)) - - outArrPtr := uintptr(unsafe.Pointer(&outArr)) - outSchemaPtr := uintptr(unsafe.Pointer(&outSchema)) - - inputFields := make([]arrow.Field, 0) - inputColumns := make([]arrow.Array, 0) - for name, arr := range retrievedFeatures { - inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) - inputColumns = append(inputColumns, arr) - } - for name, arr := range requestContext { - inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) - inputColumns = append(inputColumns, arr) - } - - inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows)) - defer inputRecord.Release() - - cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema) - - // Recover from a panic from FFI so the server doesn't crash var err error defer func() { if e := recover(); e != nil { @@ -146,19 +119,67 @@ func CallTransformations( } } }() - ret := callback(featureView.Base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames) + transformResp := make(chan TransformChannel) + proc := goprocess.Background() + proc.Go(func(p goprocess.Process) { + debug.SetPanicOnFault(true) + inputArr := cdata.CArrowArray{} + inputSchema := cdata.CArrowSchema{} + + outArr := cdata.CArrowArray{} + outSchema := cdata.CArrowSchema{} + + defer cdata.ReleaseCArrowArray(&inputArr) + defer cdata.ReleaseCArrowArray(&outArr) + defer cdata.ReleaseCArrowSchema(&inputSchema) + defer cdata.ReleaseCArrowSchema(&outSchema) + + inputArrPtr := uintptr(unsafe.Pointer(&inputArr)) + inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema)) + + outArrPtr := uintptr(unsafe.Pointer(&outArr)) + outSchemaPtr := uintptr(unsafe.Pointer(&outSchema)) + + inputFields := make([]arrow.Field, 0) + inputColumns := make([]arrow.Array, 0) + for name, arr := range retrievedFeatures { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } + for name, arr := range requestContext { + inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()}) + inputColumns = append(inputColumns, arr) + } - if ret != numRows { - return nil, errors.New("python transformation callback failed") - } + inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows)) + defer inputRecord.Release() + + cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema) + + ret := callback(featureView.Base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames) - outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema) + if ret != numRows { + transformResp <- TransformChannel{nil, errors.New("python transformation callback failed")} + } else { + outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema) + if err != nil { + transformResp <- TransformChannel{nil, err} + } else { + transformResp <- TransformChannel{outRecord, nil} + } + } + }) + err = proc.Close() if err != nil { return nil, err } + resp := <-transformResp + if resp.err != nil { + return nil, resp.err + } result := make([]*onlineserving.FeatureVector, 0) - for idx, field := range outRecord.Schema().Fields() { + for idx, field := range resp.outRecord.Schema().Fields() { dropFeature := true if featureView.Base.Projection != nil { @@ -192,7 +213,7 @@ func CallTransformations( result = append(result, &onlineserving.FeatureVector{ Name: field.Name, - Values: outRecord.Column(idx), + Values: resp.outRecord.Column(idx), Statuses: statuses, Timestamps: timestamps, })