Skip to content

Commit

Permalink
fix: defined list protos for feast objects (#47)
Browse files Browse the repository at this point in the history
* fix: defined list protos for feast objects and use of them in Go
---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Oct 3, 2023
1 parent 3bb2697 commit 7979543
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 30 deletions.
46 changes: 18 additions & 28 deletions go/internal/feast/registry/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,13 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu
}
defer resp.Body.Close()

buffer := make([]byte, BUFFER_SIZE)

for {
n, err := resp.Body.Read(buffer)
if err != nil && err != io.EOF {
return err
}

if n > 0 {
if err := messageProcessor(buffer[:n]); err != nil {
return err
}
}
buffer, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

if err == io.EOF {
break
}
if err := messageProcessor(buffer); err != nil {
return err
}

return nil
Expand All @@ -118,23 +108,23 @@ func (r *HttpRegistryStore) loadProtobufMessages(url string, messageProcessor fu
func (r *HttpRegistryStore) loadEntities(registry *core.Registry) error {
url := fmt.Sprintf("%s/projects/%s/entities?allow_cache=true", r.endpoint, r.project)
return r.loadProtobufMessages(url, func(data []byte) error {
entity := &core.Entity{}
if err := proto.Unmarshal(data, entity); err != nil {
entity_list := &core.EntityList{}
if err := proto.Unmarshal(data, entity_list); err != nil {
return err
}
registry.Entities = append(registry.Entities, entity)
registry.Entities = append(registry.Entities, entity_list.GetEntities()...)
return nil
})
}

func (r *HttpRegistryStore) loadDatasources(registry *core.Registry) error {
url := fmt.Sprintf("%s/projects/%s/data_sources?allow_cache=true", r.endpoint, r.project)
return r.loadProtobufMessages(url, func(data []byte) error {
data_source := &core.DataSource{}
if err := proto.Unmarshal(data, data_source); err != nil {
data_source_list := &core.DataSourceList{}
if err := proto.Unmarshal(data, data_source_list); err != nil {
return err
}
registry.DataSources = append(registry.DataSources, data_source)
registry.DataSources = append(registry.DataSources, data_source_list.GetDatasources()...)
return nil
})
}
Expand All @@ -154,23 +144,23 @@ func (r *HttpRegistryStore) loadFeatureViews(registry *core.Registry) error {
func (r *HttpRegistryStore) loadOnDemandFeatureViews(registry *core.Registry) error {
url := fmt.Sprintf("%s/projects/%s/on_demand_feature_views?allow_cache=true", r.endpoint, r.project)
return r.loadProtobufMessages(url, func(data []byte) error {
od_feature_view := &core.OnDemandFeatureView{}
if err := proto.Unmarshal(data, od_feature_view); err != nil {
od_feature_view_list := &core.OnDemandFeatureViewList{}
if err := proto.Unmarshal(data, od_feature_view_list); err != nil {
return err
}
registry.OnDemandFeatureViews = append(registry.OnDemandFeatureViews, od_feature_view)
registry.OnDemandFeatureViews = append(registry.OnDemandFeatureViews, od_feature_view_list.GetOndemandfeatureviews()...)
return nil
})
}

func (r *HttpRegistryStore) loadFeatureServices(registry *core.Registry) error {
url := fmt.Sprintf("%s/projects/%s/feature_services?allow_cache=true", r.endpoint, r.project)
return r.loadProtobufMessages(url, func(data []byte) error {
feature_service := &core.FeatureService{}
if err := proto.Unmarshal(data, feature_service); err != nil {
feature_service_list := &core.FeatureServiceList{}
if err := proto.Unmarshal(data, feature_service_list); err != nil {
return err
}
registry.FeatureServices = append(registry.FeatureServices, feature_service)
registry.FeatureServices = append(registry.FeatureServices, feature_service_list.GetFeatureservices()...)
return nil
})
}
Expand Down
6 changes: 4 additions & 2 deletions go/internal/feast/transformation/transformation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/cdata"
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/feast-dev/feast/go/internal/feast/model"
Expand Down Expand Up @@ -74,7 +75,7 @@ func AugmentResponseWithOnDemandTransforms(
return nil, err
}
result = append(result, onDemandFeatures...)

// Release memory used by requestContextArrow
for _, arrowArray := range requestContextArrow {
arrowArray.Release()
Expand Down Expand Up @@ -132,7 +133,8 @@ func CallTransformations(
defer func() {
if e := recover(); e != nil {
ret = -1
err = e.(error)
log.Error().Err(err).Msg("")
err = fmt.Errorf("python transformation callback error: %v", e)
}
}()

Expand Down
4 changes: 4 additions & 0 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,7 @@ message DataSource {
AthenaOptions athena_options = 35;
}
}

message DataSourceList {
repeated DataSource datasources = 1;
}
4 changes: 4 additions & 0 deletions protos/feast/core/Entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ message EntityMeta {
google.protobuf.Timestamp created_timestamp = 1;
google.protobuf.Timestamp last_updated_timestamp = 2;
}

message EntityList {
repeated Entity entities = 1;
}
4 changes: 4 additions & 0 deletions protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ message LoggingConfig {
map<string, string> config = 2;
}
}

message FeatureServiceList {
repeated FeatureService featureservices = 1;
}
4 changes: 4 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ message FeatureViewMeta {
message MaterializationInterval {
google.protobuf.Timestamp start_time = 1;
google.protobuf.Timestamp end_time = 2;
}

message FeatureViewList {
repeated FeatureView featureviews = 1;
}
4 changes: 4 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,7 @@ message UserDefinedFunction {
// The string representation of the udf
string body_text = 3;
}

message OnDemandFeatureViewList {
repeated OnDemandFeatureView ondemandfeatureviews = 1;
}
1 change: 1 addition & 0 deletions sdk/python/feast/infra/registry/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def __init__(
self.cached_registry_proto = self.proto(allow_cache=False)
self.stop_thread = False
self.refresh_cache_thread = threading.Thread(target=self._refresh_cache)
self.refresh_cache_thread.daemon = True
self.refresh_cache_thread.start()

def _refresh_cache(self):
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ def __init__(
)
self.stop_thread = False
self.refresh_cache_thread = threading.Thread(target=self._refresh_cache)
self.refresh_cache_thread.daemon = True
self.refresh_cache_thread.start()

def _refresh_cache(self):
Expand Down

0 comments on commit 7979543

Please sign in to comment.