Skip to content

Commit

Permalink
create an isolated child process to run unsafe code and CFFI, handle …
Browse files Browse the repository at this point in the history
…any fatal errors within the process as panics.
  • Loading branch information
piket committed Oct 5, 2023
1 parent 6288924 commit 134956a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 42 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ require (
)

require github.com/rs/zerolog v1.21.0
require github.com/jbenet/goprocess v0.1.4
require github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,13 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa h1:FEZID0R3+pkWLvjmZJ2iL+SZTcb2+/PgVvoyQss/q/I=
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20230801000641-8736a9d41aaa/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down
105 changes: 63 additions & 42 deletions go/internal/feast/transformation/transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package transformation
import (
"errors"
"fmt"
"github.com/jbenet/goprocess"
"runtime"
"runtime/debug"
"strings"
"unsafe"

Expand All @@ -19,6 +21,8 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"

_ "github.com/ianlancetaylor/cgosymbolizer"
)

/*
Expand All @@ -28,6 +32,10 @@ Each record batch is being passed as two pointers: pointer to array (data) and p
Python function is expected to return number of rows added to the output record batch.
*/
type TransformationCallback func(ODFVName string, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr uintptr, fullFeatureNames bool) int
type TransformChannel struct {
outRecord arrow.Record
err error
}

func AugmentResponseWithOnDemandTransforms(
onDemandFeatureViews []*model.OnDemandFeatureView,
Expand Down Expand Up @@ -63,7 +71,6 @@ func AugmentResponseWithOnDemandTransforms(
for _, vector := range features {
retrievedFeatures[vector.Name] = vector.Values
}

onDemandFeatures, err := CallTransformations(
odfv,
retrievedFeatures,
Expand Down Expand Up @@ -95,40 +102,6 @@ func CallTransformations(
fullFeatureNames bool,
) ([]*onlineserving.FeatureVector, error) {

inputArr := cdata.CArrowArray{}
inputSchema := cdata.CArrowSchema{}

outArr := cdata.CArrowArray{}
outSchema := cdata.CArrowSchema{}

defer cdata.ReleaseCArrowArray(&inputArr)
defer cdata.ReleaseCArrowArray(&outArr)
defer cdata.ReleaseCArrowSchema(&inputSchema)
defer cdata.ReleaseCArrowSchema(&outSchema)

inputArrPtr := uintptr(unsafe.Pointer(&inputArr))
inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema))

outArrPtr := uintptr(unsafe.Pointer(&outArr))
outSchemaPtr := uintptr(unsafe.Pointer(&outSchema))

inputFields := make([]arrow.Field, 0)
inputColumns := make([]arrow.Array, 0)
for name, arr := range retrievedFeatures {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}
for name, arr := range requestContext {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}

inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows))
defer inputRecord.Release()

cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema)

// Recover from a panic from FFI so the server doesn't crash
var err error
defer func() {
if e := recover(); e != nil {
Expand All @@ -146,19 +119,67 @@ func CallTransformations(
}
}
}()
ret := callback(featureView.Base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames)
transformResp := make(chan TransformChannel)
proc := goprocess.Background()
proc.Go(func(p goprocess.Process) {
debug.SetPanicOnFault(true)
inputArr := cdata.CArrowArray{}
inputSchema := cdata.CArrowSchema{}

outArr := cdata.CArrowArray{}
outSchema := cdata.CArrowSchema{}

defer cdata.ReleaseCArrowArray(&inputArr)
defer cdata.ReleaseCArrowArray(&outArr)
defer cdata.ReleaseCArrowSchema(&inputSchema)
defer cdata.ReleaseCArrowSchema(&outSchema)

inputArrPtr := uintptr(unsafe.Pointer(&inputArr))
inputSchemaPtr := uintptr(unsafe.Pointer(&inputSchema))

outArrPtr := uintptr(unsafe.Pointer(&outArr))
outSchemaPtr := uintptr(unsafe.Pointer(&outSchema))

inputFields := make([]arrow.Field, 0)
inputColumns := make([]arrow.Array, 0)
for name, arr := range retrievedFeatures {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}
for name, arr := range requestContext {
inputFields = append(inputFields, arrow.Field{Name: name, Type: arr.DataType()})
inputColumns = append(inputColumns, arr)
}

if ret != numRows {
return nil, errors.New("python transformation callback failed")
}
inputRecord := array.NewRecord(arrow.NewSchema(inputFields, nil), inputColumns, int64(numRows))
defer inputRecord.Release()

cdata.ExportArrowRecordBatch(inputRecord, &inputArr, &inputSchema)

ret := callback(featureView.Base.Name, inputArrPtr, inputSchemaPtr, outArrPtr, outSchemaPtr, fullFeatureNames)

outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema)
if ret != numRows {
transformResp <- TransformChannel{nil, errors.New("python transformation callback failed")}
} else {
outRecord, err := cdata.ImportCRecordBatch(&outArr, &outSchema)
if err != nil {
transformResp <- TransformChannel{nil, err}
} else {
transformResp <- TransformChannel{outRecord, nil}
}
}
})
err = proc.Close()
if err != nil {
return nil, err
}
resp := <-transformResp
if resp.err != nil {
return nil, resp.err
}

result := make([]*onlineserving.FeatureVector, 0)
for idx, field := range outRecord.Schema().Fields() {
for idx, field := range resp.outRecord.Schema().Fields() {
dropFeature := true

if featureView.Base.Projection != nil {
Expand Down Expand Up @@ -192,7 +213,7 @@ func CallTransformations(

result = append(result, &onlineserving.FeatureVector{
Name: field.Name,
Values: outRecord.Column(idx),
Values: resp.outRecord.Column(idx),
Statuses: statuses,
Timestamps: timestamps,
})
Expand Down

0 comments on commit 134956a

Please sign in to comment.