diff --git a/go.mod b/go.mod index 83eb788e1a..65a667f729 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,8 @@ require ( inet.af/netaddr v0.0.0-20220811202034-502d2d690317 // indirect ) +require github.com/rs/zerolog v1.21.0 + require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect diff --git a/go.sum b/go.sum index b97c0b0850..b9d2676f94 100644 --- a/go.sum +++ b/go.sum @@ -352,6 +352,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.21.0 h1:Q3vdXlfLNT+OftyBHsU0Y445MD+8m8axjKgf2si0QcM= github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index 13a6512c11..1f7bbbaab7 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -38,7 +38,7 @@ type Registry struct { cachedRegistry *core.Registry cachedRegistryProtoLastUpdated time.Time cachedRegistryProtoTtl time.Duration - mu sync.Mutex + mu sync.RWMutex } func NewRegistry(registryConfig *RegistryConfig, repoPath string, project string) (*Registry, error) { @@ -74,7 +74,7 @@ func (r *Registry) InitializeRegistry() error { return err } registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION} - r.registryStore.UpdateRegistryProto(registryProto) + r.registryStore.UpdateRegistryProto(registryProto) } go r.RefreshRegistryOnInterval() return nil @@ -187,6 +187,8 @@ func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) { */ func (r *Registry) ListEntities(project string) ([]*model.Entity, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedEntities, ok := r.cachedEntities[project]; !ok { return []*model.Entity{}, nil } else { @@ -206,6 +208,8 @@ func (r *Registry) ListEntities(project string) ([]*model.Entity, error) { */ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { return []*model.FeatureView{}, nil } else { @@ -225,6 +229,8 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error */ func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok { return []*model.FeatureView{}, nil } else { @@ -244,6 +250,8 @@ func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, */ func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { return []*model.FeatureService{}, nil } else { @@ -263,6 +271,8 @@ func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService, */ func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return []*model.OnDemandFeatureView{}, nil } else { @@ -277,6 +287,8 @@ func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFe } func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedEntities, ok := r.cachedEntities[project]; !ok { return nil, fmt.Errorf("no cached entities found for project %s", project) } else { @@ -289,6 +301,8 @@ func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error) } func (r *Registry) GetFeatureView(project, featureViewName string) (*model.FeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached feature views found for project %s", project) } else { @@ -301,6 +315,8 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu } func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached stream feature views found for project %s", project) } else { @@ -313,6 +329,8 @@ func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) ( } func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { return nil, fmt.Errorf("no cached feature services found for project %s", project) } else { @@ -325,6 +343,8 @@ func (r *Registry) GetFeatureService(project, featureServiceName string) (*model } func (r *Registry) GetOnDemandFeatureView(project, onDemandFeatureViewName string) (*model.OnDemandFeatureView, error) { + r.mu.RLock() + defer r.mu.RUnlock() if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached on demand feature views found for project %s", project) } else { diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 8e4f344f58..451b16dd52 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "os" + "runtime" "strconv" "strings" "time" @@ -17,6 +18,7 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + "github.com/rs/zerolog/log" httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -204,17 +206,17 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { featureNames = append(featureNames, vector.Name) result := make(map[string]interface{}) if status { - var statuses []string - for _, status := range vector.Statuses { - statuses = append(statuses, status.String()) - } - var timestamps []string - for _, timestamp := range vector.Timestamps { - timestamps = append(timestamps, timestamp.AsTime().Format(time.RFC3339)) - } - - result["statuses"] = statuses - result["event_timestamps"] = timestamps + var statuses []string + for _, status := range vector.Statuses { + statuses = append(statuses, status.String()) + } + var timestamps []string + for _, timestamp := range vector.Timestamps { + timestamps = append(timestamps, timestamp.AsTime().Format(time.RFC3339)) + } + + result["statuses"] = statuses + result["event_timestamps"] = timestamps } // Note, that vector.Values is an Arrow Array, but this type implements JSON Marshaller. // So, it's not necessary to pre-process it in any way. @@ -275,9 +277,34 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { } func releaseCGOMemory(featureVectors []*onlineserving.FeatureVector) { - for _, vector := range featureVectors { - vector.Values.Release() - } + for _, vector := range featureVectors { + vector.Values.Release() + } +} + +func logStackTrace() { + // Create a buffer for storing the stack trace + const size = 4096 + buf := make([]byte, size) + + // Retrieve the stack trace and write it to the buffer + stackSize := runtime.Stack(buf, false) + + // Log the stack trace using zerolog + log.Error().Str("stack_trace", string(buf[:stackSize])).Msg("") +} + +func recoverMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if r := recover(); r != nil { + log.Printf("Panic recovered: %v", r) + logStackTrace() + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } + }() + next.ServeHTTP(w, r) + }) } func (s *httpServer) Serve(host string, port int) error { @@ -287,9 +314,9 @@ func (s *httpServer) Serve(host string, port int) error { defer tracer.Stop() } mux := httptrace.NewServeMux() - mux.HandleFunc("/get-online-features", s.getOnlineFeatures) + mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures))) mux.HandleFunc("/health", healthCheckHandler) - s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux} + s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second} err := s.server.ListenAndServe() // Don't return the error if it's caused by graceful shutdown using Stop() if err == http.ErrServerClosed { diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index f9db3e88f4..56d6a97b43 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -682,22 +682,30 @@ def serve_command( keep_alive_timeout: int, go: bool, ): - """Start a feature server locally on a given port.""" - store = create_feature_store(ctx) - - if go: - # Turn on Go feature retrieval. - store.config.go_feature_serving = True - - store.serve( - host=host, - port=port, - type_=type_, - no_access_log=no_access_log, - no_feature_log=no_feature_log, - workers=workers, - keep_alive_timeout=keep_alive_timeout, - ) + try: + """Start a feature server locally on a given port.""" + store = create_feature_store(ctx) + + if go: + # Turn on Go feature retrieval. + store.config.go_feature_serving = True + + store.serve( + host=host, + port=port, + type_=type_, + no_access_log=no_access_log, + no_feature_log=no_feature_log, + workers=workers, + keep_alive_timeout=keep_alive_timeout, + ) + except Exception as exception: + _logger.exception( + "Failed to start feature server with exception: %s", str(exception) + ) + raise Exception( + "Failed to start feature server with exception: " + str(exception) + ) @cli.command("serve_transformations") diff --git a/sdk/python/feast/ui_server.py b/sdk/python/feast/ui_server.py index 6e1dce86de..c195d4cc6b 100644 --- a/sdk/python/feast/ui_server.py +++ b/sdk/python/feast/ui_server.py @@ -4,7 +4,7 @@ import importlib_resources import uvicorn -from fastapi import FastAPI, Response +from fastapi import FastAPI, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles @@ -73,6 +73,10 @@ def read_registry(): media_type="application/octet-stream", ) + @app.get("/health") + def health(): + return Response(status_code=status.HTTP_200_OK) + # For all other paths (such as paths that would otherwise be handled by react router), pass to React @app.api_route("/p/{path_name:path}", methods=["GET"]) def catch_all():