Skip to content

Commit

Permalink
fix: Handle panic errors (#46)
Browse files Browse the repository at this point in the history
Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Oct 3, 2023
1 parent a096dd3 commit 3bb2697
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 35 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ require (
inet.af/netaddr v0.0.0-20220811202034-502d2d690317 // indirect
)

require github.com/rs/zerolog v1.21.0

require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.21.0 h1:Q3vdXlfLNT+OftyBHsU0Y445MD+8m8axjKgf2si0QcM=
github.com/rs/zerolog v1.21.0/go.mod h1:ZPhntP/xmq1nnND05hhpAh2QMhSsA4UN3MGZ6O2J3hM=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
Expand Down
24 changes: 22 additions & 2 deletions go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Registry struct {
cachedRegistry *core.Registry
cachedRegistryProtoLastUpdated time.Time
cachedRegistryProtoTtl time.Duration
mu sync.Mutex
mu sync.RWMutex
}

func NewRegistry(registryConfig *RegistryConfig, repoPath string, project string) (*Registry, error) {
Expand Down Expand Up @@ -74,7 +74,7 @@ func (r *Registry) InitializeRegistry() error {
return err
}
registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION}
r.registryStore.UpdateRegistryProto(registryProto)
r.registryStore.UpdateRegistryProto(registryProto)
}
go r.RefreshRegistryOnInterval()
return nil
Expand Down Expand Up @@ -187,6 +187,8 @@ func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
*/

func (r *Registry) ListEntities(project string) ([]*model.Entity, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedEntities, ok := r.cachedEntities[project]; !ok {
return []*model.Entity{}, nil
} else {
Expand All @@ -206,6 +208,8 @@ func (r *Registry) ListEntities(project string) ([]*model.Entity, error) {
*/

func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
return []*model.FeatureView{}, nil
} else {
Expand All @@ -225,6 +229,8 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error
*/

func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return []*model.FeatureView{}, nil
} else {
Expand All @@ -244,6 +250,8 @@ func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView,
*/

func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return []*model.FeatureService{}, nil
} else {
Expand All @@ -263,6 +271,8 @@ func (r *Registry) ListFeatureServices(project string) ([]*model.FeatureService,
*/

func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
return []*model.OnDemandFeatureView{}, nil
} else {
Expand All @@ -277,6 +287,8 @@ func (r *Registry) ListOnDemandFeatureViews(project string) ([]*model.OnDemandFe
}

func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedEntities, ok := r.cachedEntities[project]; !ok {
return nil, fmt.Errorf("no cached entities found for project %s", project)
} else {
Expand All @@ -289,6 +301,8 @@ func (r *Registry) GetEntity(project, entityName string) (*model.Entity, error)
}

func (r *Registry) GetFeatureView(project, featureViewName string) (*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached feature views found for project %s", project)
} else {
Expand All @@ -301,6 +315,8 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu
}

func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached stream feature views found for project %s", project)
} else {
Expand All @@ -313,6 +329,8 @@ func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (
}

func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return nil, fmt.Errorf("no cached feature services found for project %s", project)
} else {
Expand All @@ -325,6 +343,8 @@ func (r *Registry) GetFeatureService(project, featureServiceName string) (*model
}

func (r *Registry) GetOnDemandFeatureView(project, onDemandFeatureViewName string) (*model.OnDemandFeatureView, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached on demand feature views found for project %s", project)
} else {
Expand Down
59 changes: 43 additions & 16 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/rs/zerolog/log"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
Expand Down Expand Up @@ -204,17 +206,17 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
featureNames = append(featureNames, vector.Name)
result := make(map[string]interface{})
if status {
var statuses []string
for _, status := range vector.Statuses {
statuses = append(statuses, status.String())
}
var timestamps []string
for _, timestamp := range vector.Timestamps {
timestamps = append(timestamps, timestamp.AsTime().Format(time.RFC3339))
}

result["statuses"] = statuses
result["event_timestamps"] = timestamps
var statuses []string
for _, status := range vector.Statuses {
statuses = append(statuses, status.String())
}
var timestamps []string
for _, timestamp := range vector.Timestamps {
timestamps = append(timestamps, timestamp.AsTime().Format(time.RFC3339))
}

result["statuses"] = statuses
result["event_timestamps"] = timestamps
}
// Note, that vector.Values is an Arrow Array, but this type implements JSON Marshaller.
// So, it's not necessary to pre-process it in any way.
Expand Down Expand Up @@ -275,9 +277,34 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
}

func releaseCGOMemory(featureVectors []*onlineserving.FeatureVector) {
for _, vector := range featureVectors {
vector.Values.Release()
}
for _, vector := range featureVectors {
vector.Values.Release()
}
}

func logStackTrace() {
// Create a buffer for storing the stack trace
const size = 4096
buf := make([]byte, size)

// Retrieve the stack trace and write it to the buffer
stackSize := runtime.Stack(buf, false)

// Log the stack trace using zerolog
log.Error().Str("stack_trace", string(buf[:stackSize])).Msg("")
}

func recoverMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {
if r := recover(); r != nil {
log.Printf("Panic recovered: %v", r)
logStackTrace()
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}()
next.ServeHTTP(w, r)
})
}

func (s *httpServer) Serve(host string, port int) error {
Expand All @@ -287,9 +314,9 @@ func (s *httpServer) Serve(host string, port int) error {
defer tracer.Stop()
}
mux := httptrace.NewServeMux()
mux.HandleFunc("/get-online-features", s.getOnlineFeatures)
mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))
mux.HandleFunc("/health", healthCheckHandler)
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux}
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second}
err := s.server.ListenAndServe()
// Don't return the error if it's caused by graceful shutdown using Stop()
if err == http.ErrServerClosed {
Expand Down
40 changes: 24 additions & 16 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,22 +682,30 @@ def serve_command(
keep_alive_timeout: int,
go: bool,
):
"""Start a feature server locally on a given port."""
store = create_feature_store(ctx)

if go:
# Turn on Go feature retrieval.
store.config.go_feature_serving = True

store.serve(
host=host,
port=port,
type_=type_,
no_access_log=no_access_log,
no_feature_log=no_feature_log,
workers=workers,
keep_alive_timeout=keep_alive_timeout,
)
try:
"""Start a feature server locally on a given port."""
store = create_feature_store(ctx)

if go:
# Turn on Go feature retrieval.
store.config.go_feature_serving = True

store.serve(
host=host,
port=port,
type_=type_,
no_access_log=no_access_log,
no_feature_log=no_feature_log,
workers=workers,
keep_alive_timeout=keep_alive_timeout,
)
except Exception as exception:
_logger.exception(
"Failed to start feature server with exception: %s", str(exception)
)
raise Exception(
"Failed to start feature server with exception: " + str(exception)
)


@cli.command("serve_transformations")
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/feast/ui_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import importlib_resources
import uvicorn
from fastapi import FastAPI, Response
from fastapi import FastAPI, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles

Expand Down Expand Up @@ -73,6 +73,10 @@ def read_registry():
media_type="application/octet-stream",
)

@app.get("/health")
def health():
return Response(status_code=status.HTTP_200_OK)

# For all other paths (such as paths that would otherwise be handled by react router), pass to React
@app.api_route("/p/{path_name:path}", methods=["GET"])
def catch_all():
Expand Down

0 comments on commit 3bb2697

Please sign in to comment.