Skip to content

Commit

Permalink
Merge branch 'hotrod-intergration-test-for-v1v2' of https://github.co…
Browse files Browse the repository at this point in the history
…m/Saumya40-codes/jaeger into hotrod-intergration-test-for-v1v2
  • Loading branch information
Saumya40-codes committed Oct 30, 2024
2 parents c056950 + 32a1c9a commit 69541a5
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 116 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci-lint-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ jobs:

- uses: ./.github/actions/block-pr-from-main-branch

- run: make lint-nocommit
- run: |
git fetch origin main
make lint-nocommit
dco-check:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ lint-license:

.PHONY: lint-nocommit
lint-nocommit:
@if git diff main | grep '@no''commit' ; then \
@if git diff origin/main | grep '@no''commit' ; then \
echo "❌ Cannot merge PR that contains @no""commit string" ; \
GIT_PAGER=cat git diff -G '@no''commit' main ; \
GIT_PAGER=cat git diff -G '@no''commit' origin/main ; \
false ; \
else \
echo "✅ Changes do not contain @no""commit string" ; \
Expand Down
23 changes: 9 additions & 14 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -42,10 +41,10 @@ const otlpPort = 4317
type E2EStorageIntegration struct {
integration.StorageIntegration

SkipStorageCleaner bool
ConfigFile string
BinaryName string
HealthCheckEndpoint string
SkipStorageCleaner bool
ConfigFile string
BinaryName string
HealthCheckPort int // overridable for Kafka tests which run two binaries and need different ports

// EnvVarOverrides contains a map of environment variables to set.
// The key in the map is the environment variable to override and the value
Expand Down Expand Up @@ -160,10 +159,11 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}

func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
healthCheckEndpoint := s.HealthCheckEndpoint
if healthCheckEndpoint == "" {
healthCheckEndpoint = "http://localhost:13133/status"
healthCheckPort := s.HealthCheckPort
if healthCheckPort == 0 {
healthCheckPort = ports.CollectorV2HealthChecks
}
healthCheckEndpoint := fmt.Sprintf("http://localhost:%d/status", healthCheckPort)
t.Logf("Checking if %s is available on %s", s.BinaryName, healthCheckEndpoint)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Expand All @@ -187,11 +187,6 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
t.Logf("HTTP response not OK: %v", string(body))
return false
}
// for backwards compatibility with other healthchecks
if !strings.HasSuffix(healthCheckEndpoint, "/status") {
t.Logf("OK HTTP from endpoint that is not healthcheckv2")
return true
}

var healthResponse struct {
Status string `json:"status"`
Expand All @@ -203,7 +198,7 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {

// Check if the status field in the JSON is "StatusOK"
if healthResponse.Status != "StatusOK" {
t.Logf("Received non-K status %s: %s", healthResponse.Status, string(body))
t.Logf("Received non-OK status %s: %s", healthResponse.Status, string(body))
return false
}
return true
Expand Down
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func TestKafkaStorage(t *testing.T) {
t.Log("Collector initialized")

ingester := &E2EStorageIntegration{
BinaryName: "jaeger-v2-ingester",
ConfigFile: "../../config-kafka-ingester.yaml",
HealthCheckEndpoint: "http://localhost:14133/status",
BinaryName: "jaeger-v2-ingester",
ConfigFile: "../../config-kafka-ingester.yaml",
HealthCheckPort: 14133,
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
Expand Down
18 changes: 12 additions & 6 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ func (h *HTTPGateway) RegisterRoutes(router *mux.Router) {

// addRoute adds a new endpoint to the router with given path and handler function.
// This code is mostly copied from ../http_handler.
func (h *HTTPGateway) addRoute(
func (*HTTPGateway) addRoute(
router *mux.Router,
f func(http.ResponseWriter, *http.Request),
route string,
_ ...any, /* args */
) *mux.Route {
var handler http.Handler = http.HandlerFunc(f)
traceMiddleware := otelhttp.NewHandler(
otelhttp.WithRouteTag(route, handler),
route,
otelhttp.WithTracerProvider(h.Tracer))
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
handler = otelhttp.WithRouteTag(route, handler)
handler = spanNameHandler(route, handler)
return router.HandleFunc(route, handler.ServeHTTP)
}

// tryHandleError checks if the passed error is not nil and handles it by writing
Expand Down Expand Up @@ -242,3 +240,11 @@ func (h *HTTPGateway) getOperations(w http.ResponseWriter, r *http.Request) {
}
h.marshalResponse(&api_v3.GetOperationsResponse{Operations: apiOperations}, w)
}

func spanNameHandler(spanName string, handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context())
span.SetName(spanName)
handler.ServeHTTP(w, r)
})
}
17 changes: 12 additions & 5 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,10 @@ func (aH *APIHandler) handleFunc(
) *mux.Route {
route := aH.formatRoute(routeFmt, args...)
var handler http.Handler = http.HandlerFunc(f)
traceMiddleware := otelhttp.NewHandler(
otelhttp.WithRouteTag(route, traceResponseHandler(handler)),
route,
otelhttp.WithTracerProvider(aH.tracer))
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
handler = traceResponseHandler(handler)
handler = otelhttp.WithRouteTag(route, handler)
handler = spanNameHandler(route, handler)
return router.HandleFunc(route, handler.ServeHTTP)
}

func (aH *APIHandler) formatRoute(route string, args ...any) string {
Expand Down Expand Up @@ -532,3 +531,11 @@ func traceResponseHandler(handler http.Handler) http.Handler {
handler.ServeHTTP(w, r)
})
}

func spanNameHandler(spanName string, handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
span := trace.SpanFromContext(r.Context())
span.SetName(spanName)
handler.ServeHTTP(w, r)
})
}
3 changes: 0 additions & 3 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,6 @@ func TestGetTrace(t *testing.T) {
require.NoError(t, err)
assert.Empty(t, response.Errors)

assert.Len(t, exporter.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, "/api/traces/{traceID}", exporter.GetSpans()[0].Name)

traces := extractTraces(t, &response)
assert.Len(t, traces[0].Spans, 2)
assert.Len(t, traces[0].Spans[1].References, testCase.numSpanRefs)
Expand Down
54 changes: 34 additions & 20 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
Expand All @@ -23,7 +21,6 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
Expand Down Expand Up @@ -91,7 +88,6 @@ func NewServer(
return nil, err
}
registerGRPCHandlers(grpcServer, querySvc, metricsQuerySvc, telset)

httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,13 +172,23 @@ func createGRPCServerOTEL(
telset telemetery.Setting,
) (*grpc.Server, error) {
var grpcOpts []configgrpc.ToServerOption
unaryInterceptors := []grpc.UnaryServerInterceptor{
bearertoken.NewUnaryServerInterceptor(),
}
streamInterceptors := []grpc.StreamServerInterceptor{
bearertoken.NewStreamServerInterceptor(),
}

//nolint:contextcheck
if tm.Enabled {
//nolint:contextcheck
grpcOpts = append(grpcOpts,
configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))),
configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))),
)
unaryInterceptors = append(unaryInterceptors, tenancy.NewGuardingUnaryInterceptor(tm))
streamInterceptors = append(streamInterceptors, tenancy.NewGuardingStreamInterceptor(tm))
}

grpcOpts = append(grpcOpts,
configgrpc.WithGrpcServerOption(grpc.ChainUnaryInterceptor(unaryInterceptors...)),
configgrpc.WithGrpcServerOption(grpc.ChainStreamInterceptor(streamInterceptors...)),
)
return options.GRPC.ToServer(
ctx,
telset.Host,
Expand Down Expand Up @@ -252,20 +258,28 @@ func createHTTPServer(
telset telemetery.Setting,
) (*httpServer, error) {
handler, staticHandlerCloser := initRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset)
handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders)
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true)

errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel)
server := &httpServer{
Server: &http.Server{
Handler: recoveryHandler(handler),
ErrorLog: errorLog,
ReadHeaderTimeout: 2 * time.Second,
handler = recoveryhandler.NewRecoveryHandler(telset.Logger, true)(handler)
hs, err := queryOpts.HTTP.ToServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
},
handler,
)
if err != nil {
return nil, errors.Join(err, staticHandlerCloser.Close())
}
server := &httpServer{
Server: hs,
staticHandlerCloser: staticHandlerCloser,
}

// TODO why doesn't OTEL helper do that already?
if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided
if err != nil {
Expand Down Expand Up @@ -293,7 +307,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
return nil, err
}

s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTP.Endpoint)
s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/metrics/prometheus/metricsstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func getHTTPRoundTripper(c *config.Configuration, logger *zap.Logger) (rt http.R
}
token = tokenFromFile
}
return bearertoken.RoundTripper{
return &bearertoken.RoundTripper{
Transport: httpTransport,
OverrideFromCtx: c.TokenOverrideFromContext,
StaticToken: token,
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (f *Factory) newRemoteStorage(
if c.Auth != nil {
return nil, fmt.Errorf("authenticator is not supported")
}
tenancyMgr := tenancy.NewManager(&c.Tenancy)
unaryInterceptors := []grpc.UnaryClientInterceptor{
bearertoken.NewUnaryClientInterceptor(),
}
streamInterceptors := []grpc.StreamClientInterceptor{
tenancy.NewClientStreamInterceptor(tenancyMgr),
bearertoken.NewStreamClientInterceptor(),
}
tenancyMgr := tenancy.NewManager(&c.Tenancy)
if tenancyMgr.Enabled {
unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr))
streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr))
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type archiveWriter struct {

// GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage
func (r *archiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
stream, err := r.client.GetArchiveTrace(upgradeContext(ctx), &storage_v1.GetTraceRequest{
stream, err := r.client.GetArchiveTrace(ctx, &storage_v1.GetTraceRequest{
TraceID: traceID,
})
if status.Code(err) == codes.NotFound {
Expand Down
Loading

0 comments on commit 69541a5

Please sign in to comment.