diff --git a/go/internal/feast/registry/http.go b/go/internal/feast/registry/http.go index 440bc83f54..5b611500b3 100644 --- a/go/internal/feast/registry/http.go +++ b/go/internal/feast/registry/http.go @@ -93,23 +93,13 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu } defer resp.Body.Close() - buffer := make([]byte, BUFFER_SIZE) - - for { - n, err := resp.Body.Read(buffer) - if err != nil && err != io.EOF { - return err - } - - if n > 0 { - if err := messageProcessor(buffer[:n]); err != nil { - return err - } - } + buffer, err := io.ReadAll(resp.Body) + if err != nil { + return err + } - if err == io.EOF { - break - } + if err := messageProcessor(buffer); err != nil { + return err } return nil @@ -118,11 +108,11 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu func (r *HttpRegistryStore) loadEntities(registry *core.Registry) error { url := fmt.Sprintf("%s/projects/%s/entities?allow_cache=true", r.endpoint, r.project) return r.loadProtobufMessages(url, func(data []byte) error { - entity := &core.Entity{} - if err := proto.Unmarshal(data, entity); err != nil { + entity_list := &core.EntityList{} + if err := proto.Unmarshal(data, entity_list); err != nil { return err } - registry.Entities = append(registry.Entities, entity) + registry.Entities = append(registry.Entities, entity_list.GetEntities()...) return nil }) } @@ -130,11 +120,11 @@ func (r *HttpRegistryStore) loadEntities(registry *core.Registry) error { func (r *HttpRegistryStore) loadDatasources(registry *core.Registry) error { url := fmt.Sprintf("%s/projects/%s/data_sources?allow_cache=true", r.endpoint, r.project) return r.loadProtobufMessages(url, func(data []byte) error { - data_source := &core.DataSource{} - if err := proto.Unmarshal(data, data_source); err != nil { + data_source_list := &core.DataSourceList{} + if err := proto.Unmarshal(data, data_source_list); err != nil { return err } - registry.DataSources = append(registry.DataSources, data_source) + registry.DataSources = append(registry.DataSources, data_source_list.GetDatasources()...) return nil }) } @@ -154,11 +144,11 @@ func (r *HttpRegistryStore) loadFeatureViews(registry *core.Registry) error { func (r *HttpRegistryStore) loadOnDemandFeatureViews(registry *core.Registry) error { url := fmt.Sprintf("%s/projects/%s/on_demand_feature_views?allow_cache=true", r.endpoint, r.project) return r.loadProtobufMessages(url, func(data []byte) error { - od_feature_view := &core.OnDemandFeatureView{} - if err := proto.Unmarshal(data, od_feature_view); err != nil { + od_feature_view_list := &core.OnDemandFeatureViewList{} + if err := proto.Unmarshal(data, od_feature_view_list); err != nil { return err } - registry.OnDemandFeatureViews = append(registry.OnDemandFeatureViews, od_feature_view) + registry.OnDemandFeatureViews = append(registry.OnDemandFeatureViews, od_feature_view_list.GetOndemandfeatureviews()...) return nil }) } @@ -166,11 +156,11 @@ func (r *HttpRegistryStore) loadOnDemandFeatureViews(registry *core.Registry) er func (r *HttpRegistryStore) loadFeatureServices(registry *core.Registry) error { url := fmt.Sprintf("%s/projects/%s/feature_services?allow_cache=true", r.endpoint, r.project) return r.loadProtobufMessages(url, func(data []byte) error { - feature_service := &core.FeatureService{} - if err := proto.Unmarshal(data, feature_service); err != nil { + feature_service_list := &core.FeatureServiceList{} + if err := proto.Unmarshal(data, feature_service_list); err != nil { return err } - registry.FeatureServices = append(registry.FeatureServices, feature_service) + registry.FeatureServices = append(registry.FeatureServices, feature_service_list.GetFeatureservices()...) return nil }) } diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index ac7140364e..3d9c0f49fa 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -10,6 +10,7 @@ import ( "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/cdata" "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/rs/zerolog/log" "google.golang.org/protobuf/types/known/timestamppb" "github.com/feast-dev/feast/go/internal/feast/model" @@ -74,7 +75,7 @@ func AugmentResponseWithOnDemandTransforms( return nil, err } result = append(result, onDemandFeatures...) - + // Release memory used by requestContextArrow for _, arrowArray := range requestContextArrow { arrowArray.Release() @@ -132,7 +133,8 @@ func CallTransformations( defer func() { if e := recover(); e != nil { ret = -1 - err = e.(error) + log.Error().Err(err).Msg("") + err = fmt.Errorf("python transformation callback error: %v", e) } }() diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index d129086f45..9c31851823 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -268,3 +268,7 @@ message DataSource { AthenaOptions athena_options = 35; } } + +message DataSourceList { + repeated DataSource datasources = 1; +} \ No newline at end of file diff --git a/protos/feast/core/Entity.proto b/protos/feast/core/Entity.proto index d8d8bedc5e..915402804f 100644 --- a/protos/feast/core/Entity.proto +++ b/protos/feast/core/Entity.proto @@ -58,3 +58,7 @@ message EntityMeta { google.protobuf.Timestamp created_timestamp = 1; google.protobuf.Timestamp last_updated_timestamp = 2; } + +message EntityList { + repeated Entity entities = 1; +} diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 80d32eb4de..b143ba73f4 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -96,3 +96,7 @@ message LoggingConfig { map config = 2; } } + +message FeatureServiceList { + repeated FeatureService featureservices = 1; +} \ No newline at end of file diff --git a/protos/feast/core/FeatureView.proto b/protos/feast/core/FeatureView.proto index f4351b6d07..f48e2676a7 100644 --- a/protos/feast/core/FeatureView.proto +++ b/protos/feast/core/FeatureView.proto @@ -97,4 +97,8 @@ message FeatureViewMeta { message MaterializationInterval { google.protobuf.Timestamp start_time = 1; google.protobuf.Timestamp end_time = 2; +} + +message FeatureViewList { + repeated FeatureView featureviews = 1; } \ No newline at end of file diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 50bf8b6f55..af3d7c3488 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -87,3 +87,7 @@ message UserDefinedFunction { // The string representation of the udf string body_text = 3; } + +message OnDemandFeatureViewList { + repeated OnDemandFeatureView ondemandfeatureviews = 1; +} \ No newline at end of file diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index 3b8c24d12f..03e0ef5955 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -93,6 +93,7 @@ def __init__( self.cached_registry_proto = self.proto(allow_cache=False) self.stop_thread = False self.refresh_cache_thread = threading.Thread(target=self._refresh_cache) + self.refresh_cache_thread.daemon = True self.refresh_cache_thread.start() def _refresh_cache(self): diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index f19c357acb..372a344c8a 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -227,6 +227,7 @@ def __init__( ) self.stop_thread = False self.refresh_cache_thread = threading.Thread(target=self._refresh_cache) + self.refresh_cache_thread.daemon = True self.refresh_cache_thread.start() def _refresh_cache(self):