diff --git a/.github/workflows/ci-lint-checks.yaml b/.github/workflows/ci-lint-checks.yaml index 30523637c64..b665fd533ea 100644 --- a/.github/workflows/ci-lint-checks.yaml +++ b/.github/workflows/ci-lint-checks.yaml @@ -47,7 +47,9 @@ jobs: - uses: ./.github/actions/block-pr-from-main-branch - - run: make lint-nocommit + - run: | + git fetch origin main + make lint-nocommit dco-check: runs-on: ubuntu-latest diff --git a/Makefile b/Makefile index 3229e2583e6..50d19d2ea7d 100644 --- a/Makefile +++ b/Makefile @@ -153,9 +153,9 @@ lint-license: .PHONY: lint-nocommit lint-nocommit: - @if git diff main | grep '@no''commit' ; then \ + @if git diff origin/main | grep '@no''commit' ; then \ echo "❌ Cannot merge PR that contains @no""commit string" ; \ - GIT_PAGER=cat git diff -G '@no''commit' main ; \ + GIT_PAGER=cat git diff -G '@no''commit' origin/main ; \ false ; \ else \ echo "✅ Changes do not contain @no""commit string" ; \ diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 9e4843f030c..c8681e4ed7e 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -13,7 +13,6 @@ import ( "os" "os/exec" "path/filepath" - "strings" "testing" "time" @@ -42,10 +41,10 @@ const otlpPort = 4317 type E2EStorageIntegration struct { integration.StorageIntegration - SkipStorageCleaner bool - ConfigFile string - BinaryName string - HealthCheckEndpoint string + SkipStorageCleaner bool + ConfigFile string + BinaryName string + HealthCheckPort int // overridable for Kafka tests which run two binaries and need different ports // EnvVarOverrides contains a map of environment variables to set. // The key in the map is the environment variable to override and the value @@ -160,10 +159,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { } func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { - healthCheckEndpoint := s.HealthCheckEndpoint - if healthCheckEndpoint == "" { - healthCheckEndpoint = "http://localhost:13133/status" + healthCheckPort := s.HealthCheckPort + if healthCheckPort == 0 { + healthCheckPort = ports.CollectorV2HealthChecks } + healthCheckEndpoint := fmt.Sprintf("http://localhost:%d/status", healthCheckPort) t.Logf("Checking if %s is available on %s", s.BinaryName, healthCheckEndpoint) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() @@ -187,11 +187,6 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { t.Logf("HTTP response not OK: %v", string(body)) return false } - // for backwards compatibility with other healthchecks - if !strings.HasSuffix(healthCheckEndpoint, "/status") { - t.Logf("OK HTTP from endpoint that is not healthcheckv2") - return true - } var healthResponse struct { Status string `json:"status"` @@ -203,7 +198,7 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool { // Check if the status field in the JSON is "StatusOK" if healthResponse.Status != "StatusOK" { - t.Logf("Received non-K status %s: %s", healthResponse.Status, string(body)) + t.Logf("Received non-OK status %s: %s", healthResponse.Status, string(body)) return false } return true diff --git a/cmd/jaeger/internal/integration/kafka_test.go b/cmd/jaeger/internal/integration/kafka_test.go index 218624788f0..6e9e4d810ef 100644 --- a/cmd/jaeger/internal/integration/kafka_test.go +++ b/cmd/jaeger/internal/integration/kafka_test.go @@ -52,9 +52,9 @@ func TestKafkaStorage(t *testing.T) { t.Log("Collector initialized") ingester := &E2EStorageIntegration{ - BinaryName: "jaeger-v2-ingester", - ConfigFile: "../../config-kafka-ingester.yaml", - HealthCheckEndpoint: "http://localhost:14133/status", + BinaryName: "jaeger-v2-ingester", + ConfigFile: "../../config-kafka-ingester.yaml", + HealthCheckPort: 14133, StorageIntegration: integration.StorageIntegration{ CleanUp: purge, GetDependenciesReturnsSource: true, diff --git a/cmd/query/app/apiv3/http_gateway.go b/cmd/query/app/apiv3/http_gateway.go index a573b74d952..1abbcece1f8 100644 --- a/cmd/query/app/apiv3/http_gateway.go +++ b/cmd/query/app/apiv3/http_gateway.go @@ -60,18 +60,16 @@ func (h *HTTPGateway) RegisterRoutes(router *mux.Router) { // addRoute adds a new endpoint to the router with given path and handler function. // This code is mostly copied from ../http_handler. -func (h *HTTPGateway) addRoute( +func (*HTTPGateway) addRoute( router *mux.Router, f func(http.ResponseWriter, *http.Request), route string, _ ...any, /* args */ ) *mux.Route { var handler http.Handler = http.HandlerFunc(f) - traceMiddleware := otelhttp.NewHandler( - otelhttp.WithRouteTag(route, handler), - route, - otelhttp.WithTracerProvider(h.Tracer)) - return router.HandleFunc(route, traceMiddleware.ServeHTTP) + handler = otelhttp.WithRouteTag(route, handler) + handler = spanNameHandler(route, handler) + return router.HandleFunc(route, handler.ServeHTTP) } // tryHandleError checks if the passed error is not nil and handles it by writing @@ -242,3 +240,11 @@ func (h *HTTPGateway) getOperations(w http.ResponseWriter, r *http.Request) { } h.marshalResponse(&api_v3.GetOperationsResponse{Operations: apiOperations}, w) } + +func spanNameHandler(spanName string, handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + span := trace.SpanFromContext(r.Context()) + span.SetName(spanName) + handler.ServeHTTP(w, r) + }) +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 49d8edff4e8..e00472babb5 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -132,11 +132,10 @@ func (aH *APIHandler) handleFunc( ) *mux.Route { route := aH.formatRoute(routeFmt, args...) var handler http.Handler = http.HandlerFunc(f) - traceMiddleware := otelhttp.NewHandler( - otelhttp.WithRouteTag(route, traceResponseHandler(handler)), - route, - otelhttp.WithTracerProvider(aH.tracer)) - return router.HandleFunc(route, traceMiddleware.ServeHTTP) + handler = traceResponseHandler(handler) + handler = otelhttp.WithRouteTag(route, handler) + handler = spanNameHandler(route, handler) + return router.HandleFunc(route, handler.ServeHTTP) } func (aH *APIHandler) formatRoute(route string, args ...any) string { @@ -532,3 +531,11 @@ func traceResponseHandler(handler http.Handler) http.Handler { handler.ServeHTTP(w, r) }) } + +func spanNameHandler(spanName string, handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + span := trace.SpanFromContext(r.Context()) + span.SetName(spanName) + handler.ServeHTTP(w, r) + }) +} diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index 1aee673085a..54ebd5ea732 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -329,9 +329,6 @@ func TestGetTrace(t *testing.T) { require.NoError(t, err) assert.Empty(t, response.Errors) - assert.Len(t, exporter.GetSpans(), 1, "HTTP request was traced and span reported") - assert.Equal(t, "/api/traces/{traceID}", exporter.GetSpans()[0].Name) - traces := extractTraces(t, &response) assert.Len(t, traces[0].Spans, 2) assert.Len(t, traces[0].Spans[1].References, testCase.numSpanRefs) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index f1f7b4284b7..a26508e4b15 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -12,9 +12,7 @@ import ( "net/http" "strings" "sync" - "time" - "github.com/gorilla/handlers" "github.com/soheilhy/cmux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -23,7 +21,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" @@ -91,7 +88,6 @@ func NewServer( return nil, err } registerGRPCHandlers(grpcServer, querySvc, metricsQuerySvc, telset) - httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset) if err != nil { return nil, err @@ -176,13 +172,23 @@ func createGRPCServerOTEL( telset telemetery.Setting, ) (*grpc.Server, error) { var grpcOpts []configgrpc.ToServerOption + unaryInterceptors := []grpc.UnaryServerInterceptor{ + bearertoken.NewUnaryServerInterceptor(), + } + streamInterceptors := []grpc.StreamServerInterceptor{ + bearertoken.NewStreamServerInterceptor(), + } + + //nolint:contextcheck if tm.Enabled { - //nolint:contextcheck - grpcOpts = append(grpcOpts, - configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))), - configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))), - ) + unaryInterceptors = append(unaryInterceptors, tenancy.NewGuardingUnaryInterceptor(tm)) + streamInterceptors = append(streamInterceptors, tenancy.NewGuardingStreamInterceptor(tm)) } + + grpcOpts = append(grpcOpts, + configgrpc.WithGrpcServerOption(grpc.ChainUnaryInterceptor(unaryInterceptors...)), + configgrpc.WithGrpcServerOption(grpc.ChainStreamInterceptor(streamInterceptors...)), + ) return options.GRPC.ToServer( ctx, telset.Host, @@ -252,20 +258,28 @@ func createHTTPServer( telset telemetery.Setting, ) (*httpServer, error) { handler, staticHandlerCloser := initRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset) - handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders) - handler = handlers.CompressHandler(handler) - recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true) - - errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel) - server := &httpServer{ - Server: &http.Server{ - Handler: recoveryHandler(handler), - ErrorLog: errorLog, - ReadHeaderTimeout: 2 * time.Second, + handler = recoveryhandler.NewRecoveryHandler(telset.Logger, true)(handler) + hs, err := queryOpts.HTTP.ToServer( + ctx, + telset.Host, + component.TelemetrySettings{ + Logger: telset.Logger, + TracerProvider: telset.TracerProvider, + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return noop.NewMeterProvider() + }, }, + handler, + ) + if err != nil { + return nil, errors.Join(err, staticHandlerCloser.Close()) + } + server := &httpServer{ + Server: hs, staticHandlerCloser: staticHandlerCloser, } + // TODO why doesn't OTEL helper do that already? if queryOpts.HTTP.TLSSetting != nil { tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided if err != nil { @@ -293,7 +307,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { return nil, err } - s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTP.Endpoint) + s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx) if err != nil { return nil, err } diff --git a/plugin/metrics/prometheus/metricsstore/reader.go b/plugin/metrics/prometheus/metricsstore/reader.go index 86cb6922e3d..df9fddd6f23 100644 --- a/plugin/metrics/prometheus/metricsstore/reader.go +++ b/plugin/metrics/prometheus/metricsstore/reader.go @@ -342,7 +342,7 @@ func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.R } token = tokenFromFile } - return bearertoken.RoundTripper{ + return &bearertoken.RoundTripper{ Transport: httpTransport, OverrideFromCtx: c.TokenOverrideFromContext, StaticToken: token, diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 0f08667e194..b5ceac8a3aa 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -123,13 +123,13 @@ func (f *Factory) newRemoteStorage( if c.Auth != nil { return nil, fmt.Errorf("authenticator is not supported") } - tenancyMgr := tenancy.NewManager(&c.Tenancy) unaryInterceptors := []grpc.UnaryClientInterceptor{ bearertoken.NewUnaryClientInterceptor(), } streamInterceptors := []grpc.StreamClientInterceptor{ - tenancy.NewClientStreamInterceptor(tenancyMgr), + bearertoken.NewStreamClientInterceptor(), } + tenancyMgr := tenancy.NewManager(&c.Tenancy) if tenancyMgr.Enabled { unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr)) streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr)) diff --git a/plugin/storage/grpc/shared/archive.go b/plugin/storage/grpc/shared/archive.go index cff3aacd635..2bd14685d27 100644 --- a/plugin/storage/grpc/shared/archive.go +++ b/plugin/storage/grpc/shared/archive.go @@ -33,7 +33,7 @@ type archiveWriter struct { // GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage func (r *archiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - stream, err := r.client.GetArchiveTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ + stream, err := r.client.GetArchiveTrace(ctx, &storage_v1.GetTraceRequest{ TraceID: traceID, }) if status.Code(err) == codes.NotFound { diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 99f42bde006..4469e9ce2b7 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -12,11 +12,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/bearertoken" _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration "github.com/jaegertracing/jaeger/proto-gen/storage_v1" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -30,9 +28,6 @@ var ( _ StoragePlugin = (*GRPCClient)(nil) _ ArchiveStoragePlugin = (*GRPCClient)(nil) _ PluginCapabilities = (*GRPCClient)(nil) - - // upgradeContext composites several steps of upgrading context - upgradeContext = composeContextUpgradeFuncs(upgradeContextWithBearerToken) ) // GRPCClient implements shared.StoragePlugin and reads/writes spans and dependencies @@ -58,36 +53,6 @@ func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) * } } -// ContextUpgradeFunc is a functional type that can be composed to upgrade context -type ContextUpgradeFunc func(ctx context.Context) context.Context - -// composeContextUpgradeFuncs composes ContextUpgradeFunc and returns a composed function -// to run the given func in strict order. -func composeContextUpgradeFuncs(funcs ...ContextUpgradeFunc) ContextUpgradeFunc { - return func(ctx context.Context) context.Context { - for _, fun := range funcs { - ctx = fun(ctx) - } - return ctx - } -} - -// upgradeContextWithBearerToken turns the context into a gRPC outgoing context with bearer token -// in the request metadata, if the original context has bearer token attached. -// Otherwise returns original context. -func upgradeContextWithBearerToken(ctx context.Context) context.Context { - bearerToken, hasToken := bearertoken.GetBearerToken(ctx) - if hasToken { - md, ok := metadata.FromOutgoingContext(ctx) - if !ok { - md = metadata.New(nil) - } - md.Set(BearerTokenKey, bearerToken) - return metadata.NewOutgoingContext(ctx, md) - } - return ctx -} - // DependencyReader implements shared.StoragePlugin. func (c *GRPCClient) DependencyReader() dependencystore.Reader { return c @@ -117,7 +82,7 @@ func (c *GRPCClient) ArchiveSpanWriter() spanstore.Writer { // GetTrace takes a traceID and returns a Trace associated with that traceID func (c *GRPCClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - stream, err := c.readerClient.GetTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{ + stream, err := c.readerClient.GetTrace(ctx, &storage_v1.GetTraceRequest{ TraceID: traceID, }) if status.Code(err) == codes.NotFound { @@ -132,7 +97,7 @@ func (c *GRPCClient) GetTrace(ctx context.Context, traceID model.TraceID) (*mode // GetServices returns a list of all known services func (c *GRPCClient) GetServices(ctx context.Context) ([]string, error) { - resp, err := c.readerClient.GetServices(upgradeContext(ctx), &storage_v1.GetServicesRequest{}) + resp, err := c.readerClient.GetServices(ctx, &storage_v1.GetServicesRequest{}) if err != nil { return nil, fmt.Errorf("plugin error: %w", err) } @@ -145,7 +110,7 @@ func (c *GRPCClient) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - resp, err := c.readerClient.GetOperations(upgradeContext(ctx), &storage_v1.GetOperationsRequest{ + resp, err := c.readerClient.GetOperations(ctx, &storage_v1.GetOperationsRequest{ Service: query.ServiceName, SpanKind: query.SpanKind, }) @@ -173,7 +138,7 @@ func (c *GRPCClient) GetOperations( // FindTraces retrieves traces that match the traceQuery func (c *GRPCClient) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - stream, err := c.readerClient.FindTraces(upgradeContext(ctx), &storage_v1.FindTracesRequest{ + stream, err := c.readerClient.FindTraces(ctx, &storage_v1.FindTracesRequest{ Query: &storage_v1.TraceQueryParameters{ ServiceName: query.ServiceName, OperationName: query.OperationName, @@ -212,7 +177,7 @@ func (c *GRPCClient) FindTraces(ctx context.Context, query *spanstore.TraceQuery // FindTraceIDs retrieves traceIDs that match the traceQuery func (c *GRPCClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - resp, err := c.readerClient.FindTraceIDs(upgradeContext(ctx), &storage_v1.FindTraceIDsRequest{ + resp, err := c.readerClient.FindTraceIDs(ctx, &storage_v1.FindTraceIDsRequest{ Query: &storage_v1.TraceQueryParameters{ ServiceName: query.ServiceName, OperationName: query.OperationName, diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index da8d82ce4a4..a8bb88fcf34 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -15,11 +15,9 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/proto-gen/storage_v1" grpcMocks "github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -116,22 +114,6 @@ func TestNewGRPCClient(t *testing.T) { assert.Implements(t, (*storage_v1.StreamingSpanWriterPluginClient)(nil), client.streamWriterClient) } -func TestContextUpgradeWithToken(t *testing.T) { - testBearerToken := "test-bearer-token" - ctx := bearertoken.ContextWithBearerToken(context.Background(), testBearerToken) - upgradedToken := upgradeContextWithBearerToken(ctx) - md, ok := metadata.FromOutgoingContext(upgradedToken) - assert.Truef(t, ok, "Expected metadata in context") - bearerTokenFromMetadata := md.Get(BearerTokenKey) - assert.Equal(t, []string{testBearerToken}, bearerTokenFromMetadata) -} - -func TestContextUpgradeWithoutToken(t *testing.T) { - upgradedToken := upgradeContextWithBearerToken(context.Background()) - _, ok := metadata.FromOutgoingContext(upgradedToken) - assert.Falsef(t, ok, "Expected no metadata in context") -} - func TestGRPCClientGetServices(t *testing.T) { withGRPCClient(func(r *grpcClientTest) { r.spanReader.On("GetServices", mock.Anything, &storage_v1.GetServicesRequest{}).