From 575343a4e632b6cf23f36597f61a11683f446d1a Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Thu, 14 Mar 2024 00:43:33 -0700 Subject: [PATCH] Add logging with span context to getOnlineFeatures function --- go/internal/feast/server/http_server.go | 34 ++++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index e885acbd57..6b540c1cb8 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -18,6 +18,7 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -145,14 +146,22 @@ type getOnlineFeaturesRequest struct { func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *httpServer { return &httpServer{fs: fs, loggingService: loggingService} } +func logWithSpanContext(span tracer.Span) zerolog.Logger { + logger := zerolog.New(os.Stderr).With(). + Int64("trace_id", int64(span.Context().TraceID())). + Int64("span_id", int64(span.Context().SpanID())). + Logger() + + return logger +} func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { var err error - // if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" { span, ctx := tracer.StartSpanFromContext(r.Context(), "getOnlineFeatures", tracer.ResourceName("/get-online-features")) defer span.Finish(tracer.WithError(err)) - // } + + logSpanContext := logWithSpanContext(span) if r.Method != "POST" { http.NotFound(w, r) @@ -166,7 +175,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { var err error status, err = strconv.ParseBool(statusQuery) if err != nil { - log.Error().Err(err).Msg("Error parsing status query parameter") + logSpanContext.Error().Err(err).Msg("Error parsing status query parameter") writeJSONError(w, fmt.Errorf("Error parsing status query parameter: %+v", err), http.StatusBadRequest) return } @@ -176,7 +185,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { var request getOnlineFeaturesRequest err = decoder.Decode(&request) if err != nil { - log.Error().Err(err).Msg("Error decoding JSON request data") + logSpanContext.Error().Err(err).Msg("Error decoding JSON request data") writeJSONError(w, fmt.Errorf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError) return } @@ -184,7 +193,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if request.FeatureService != nil { featureService, err = s.fs.GetFeatureService(*request.FeatureService) if err != nil { - log.Error().Err(err).Msg("Error getting feature service from registry") + logSpanContext.Error().Err(err).Msg("Error getting feature service from registry") writeJSONError(w, fmt.Errorf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError) return } @@ -207,7 +216,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { request.FullFeatureNames) if err != nil { - log.Error().Err(err).Msg("Error getting feature vector") + logSpanContext.Error().Err(err).Msg("Error getting feature vector") writeJSONError(w, fmt.Errorf("Error getting feature vector: %+v", err), http.StatusInternalServerError) return } @@ -249,7 +258,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { err = json.NewEncoder(w).Encode(response) if err != nil { - log.Error().Err(err).Msg("Error encoding response") + logSpanContext.Error().Err(err).Msg("Error encoding response") writeJSONError(w, fmt.Errorf("Error encoding response: %+v", err), http.StatusInternalServerError) return } @@ -257,7 +266,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil { logger, err := s.loggingService.GetOrCreateLogger(featureService) if err != nil { - log.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name) + logSpanContext.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name) writeJSONError(w, fmt.Errorf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError) return } @@ -270,7 +279,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { for _, vector := range featureVectors[len(request.Entities):] { values, err := types.ArrowValuesToProtoValues(vector.Values) if err != nil { - log.Error().Err(err).Msg("Couldn't convert arrow values into protobuf") + logSpanContext.Error().Err(err).Msg("Couldn't convert arrow values into protobuf") writeJSONError(w, fmt.Errorf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError) return } @@ -341,12 +350,7 @@ func recoverMiddleware(next http.Handler) http.Handler { func (s *httpServer) Serve(host string, port int) error { if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" { - enableMetrics, exists := os.LookupEnv("ENABLE_RUNTIME_METRICS") - if exists && strings.ToLower(enableMetrics) == "true" { - tracer.Start(tracer.WithRuntimeMetrics()) - } else { - tracer.Start() - } + tracer.Start() defer tracer.Stop() } mux := httptrace.NewServeMux()