diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md b/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md deleted file mode 100644 index 30931adaf0f..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Storage Receiver - -`storagereceiver` is a fake receiver that creates an artificial stream of traces by: - -- repeatedly querying one of Jaeger storage backends for all traces (by service). -- tracking new traces / spans and passing them to the next component in the pipeline. - -# Getting Started - -The following settings are required: - -- `trace_storage` (no default): name of a storage backend defined in `jaegerstorage` extension - -The following settings can be optionally configured: - -- `pull_interval` (default = 0s): The delay between each iteration of pulling traces. - -```yaml -receivers: - jaeger_storage_receiver: - trace_storage: external-storage - pull_interval: 0s -``` diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go deleted file mode 100644 index e9319b8991d..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "time" - - "github.com/asaskevich/govalidator" -) - -type Config struct { - TraceStorage string `valid:"required" mapstructure:"trace_storage"` - PullInterval time.Duration `mapstructure:"pull_interval"` -} - -func (cfg *Config) Validate() error { - _, err := govalidator.ValidateStruct(cfg) - return err -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go deleted file mode 100644 index 98435b3e2cf..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/config_test.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "errors" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/confmap/confmaptest" -) - -func TestLoadConfig(t *testing.T) { - t.Parallel() - - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) - - tests := []struct { - id component.ID - expected component.Config - expectedErr error - }{ - { - id: component.NewIDWithName(componentType, ""), - expectedErr: errors.New("non zero value required"), - }, - { - id: component.NewIDWithName(componentType, "defaults"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 0, - }, - }, - { - id: component.NewIDWithName(componentType, "filled"), - expected: &Config{ - TraceStorage: "storage", - PullInterval: 2 * time.Second, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(tt.id.String()) - require.NoError(t, err) - require.NoError(t, sub.Unmarshal(cfg)) - - if tt.expectedErr != nil { - require.ErrorContains(t, component.ValidateConfig(cfg), tt.expectedErr.Error()) - } else { - require.NoError(t, component.ValidateConfig(cfg)) - assert.Equal(t, tt.expected, cfg) - } - }) - } -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go deleted file mode 100644 index 09abc498e72..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" -) - -// componentType is the name of this extension in configuration. -var componentType = component.MustNewType("jaeger_storage_receiver") - -// ID is the identifier of this extension. -var ID = component.NewID(componentType) - -func NewFactory() receiver.Factory { - return receiver.NewFactory( - componentType, - createDefaultConfig, - receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), - ) -} - -func createDefaultConfig() component.Config { - return &Config{} -} - -func createTracesReceiver(_ context.Context, set receiver.Settings, config component.Config, nextConsumer consumer.Traces) (receiver.Traces, error) { - cfg := config.(*Config) - - return newTracesReceiver(cfg, set, nextConsumer) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go deleted file mode 100644 index 8b94fa2304b..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory_test.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/receiver/receivertest" -) - -func TestCreateDefaultConfig(t *testing.T) { - cfg := createDefaultConfig().(*Config) - require.NotNil(t, cfg, "failed to create default config") - require.NoError(t, componenttest.CheckConfigStruct(cfg)) -} - -func TestCreateTracesReceiver(t *testing.T) { - cfg := createDefaultConfig().(*Config) - f := NewFactory() - r, err := f.CreateTracesReceiver(context.Background(), receivertest.NewNopSettings(), cfg, nil) - require.NoError(t, err) - assert.NotNil(t, r) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go deleted file mode 100644 index 4dbecd011d3..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/package_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "testing" - - "github.com/jaegertracing/jaeger/pkg/testutils" -) - -func TestMain(m *testing.M) { - testutils.VerifyGoLeaks(m) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go deleted file mode 100644 index d6f93ef5ccd..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "fmt" - "time" - - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/storage/spanstore" -) - -type storageReceiver struct { - cancelConsumeLoop context.CancelFunc - config *Config - settings receiver.Settings - consumedTraces map[model.TraceID]*consumedTrace - nextConsumer consumer.Traces - spanReader spanstore.Reader -} - -type consumedTrace struct { - spanIDs map[model.SpanID]struct{} -} - -func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (*storageReceiver, error) { - return &storageReceiver{ - config: config, - settings: set, - consumedTraces: make(map[model.TraceID]*consumedTrace), - nextConsumer: nextConsumer, - }, nil -} - -func (r *storageReceiver) Start(ctx context.Context, host component.Host) error { - f, err := jaegerstorage.GetStorageFactory(r.config.TraceStorage, host) - if err != nil { - return fmt.Errorf("cannot find storage factory: %w", err) - } - - if r.spanReader, err = f.CreateSpanReader(); err != nil { - return fmt.Errorf("cannot create span reader: %w", err) - } - - ctx, cancel := context.WithCancel(ctx) - r.cancelConsumeLoop = cancel - - go func() { - if err := r.consumeLoop(ctx); err != nil { - r.settings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() - - return nil -} - -func (r *storageReceiver) consumeLoop(ctx context.Context) error { - for { - services, err := r.spanReader.GetServices(ctx) - if err != nil { - r.settings.Logger.Error("Failed to get services from consumer", zap.Error(err)) - return err - } - - for _, svc := range services { - if err := r.consumeTraces(ctx, svc); err != nil { - r.settings.Logger.Error("Failed to consume traces from consumer", zap.Error(err)) - } - } - - select { - case <-ctx.Done(): - r.settings.Logger.Info("Consumer stopped") - return nil - default: - time.Sleep(r.config.PullInterval) - } - } -} - -func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { - endTime := time.Now() - traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ - ServiceName: serviceName, - StartTimeMin: endTime.Add(-1 * time.Hour), - StartTimeMax: endTime, - }) - if err != nil { - return err - } - - for _, trace := range traces { - traceID := trace.Spans[0].TraceID - if _, ok := r.consumedTraces[traceID]; !ok { - r.consumedTraces[traceID] = &consumedTrace{ - spanIDs: make(map[model.SpanID]struct{}), - } - } - r.consumeSpans(ctx, r.consumedTraces[traceID], trace.Spans) - } - - return nil -} - -func (r *storageReceiver) consumeSpans(ctx context.Context, tc *consumedTrace, spans []*model.Span) error { - // Spans are consumed one at a time because we don't know whether all spans - // in a trace have been completely exported - for _, span := range spans { - if _, ok := tc.spanIDs[span.SpanID]; !ok { - tc.spanIDs[span.SpanID] = struct{}{} - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ - { - Spans: []*model.Span{span}, - Process: span.Process, - }, - }) - if err != nil { - return err - } - r.nextConsumer.ConsumeTraces(ctx, td) - } - } - - return nil -} - -func (r *storageReceiver) Shutdown(_ context.Context) error { - if r.cancelConsumeLoop != nil { - r.cancelConsumeLoop() - } - return nil -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go deleted file mode 100644 index 794cfdbbfd2..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright (c) 2024 The Jaeger Authors. -// SPDX-License-Identifier: Apache-2.0 - -package storagereceiver - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/receiver/receivertest" - - "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" - "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/storage" - factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" - spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" -) - -var _ jaegerstorage.Extension = (*mockStorageExt)(nil) - -type mockStorageExt struct { - name string - factory *factoryMocks.Factory -} - -func (*mockStorageExt) Start(context.Context, component.Host) error { - panic("not implemented") -} - -func (*mockStorageExt) Shutdown(context.Context) error { - panic("not implemented") -} - -func (m *mockStorageExt) Factory(name string) (storage.Factory, bool) { - if m.name == name { - return m.factory, true - } - return nil, false -} - -type receiverTest struct { - storageName string - receiveName string - receiveInterval time.Duration - reportStatus func(*component.StatusEvent) - - reader *spanStoreMocks.Reader - factory *factoryMocks.Factory - host *storagetest.StorageHost - receiver *storageReceiver -} - -func withReceiver( - r *receiverTest, - fn func(r *receiverTest), -) { - reader := new(spanStoreMocks.Reader) - factory := new(factoryMocks.Factory) - host := storagetest.NewStorageHost() - host.WithExtension(jaegerstorage.ID, &mockStorageExt{ - name: r.storageName, - factory: factory, - }) - cfg := createDefaultConfig().(*Config) - cfg.TraceStorage = r.receiveName - cfg.PullInterval = r.receiveInterval - receiver, _ := newTracesReceiver( - cfg, - receivertest.NewNopSettings(), - consumertest.NewNop(), - ) - receiver.settings.ReportStatus = func(_ *component.StatusEvent) {} - - r.reader = reader - r.factory = factory - r.host = host - r.receiver = receiver - fn(r) -} - -var ( - services = []string{"example-service-1", "example-service-2"} - spans = []*model.Span{ - { - TraceID: model.NewTraceID(0, 1), - SpanID: model.NewSpanID(1), - Process: &model.Process{ - ServiceName: services[0], - }, - }, - { - TraceID: model.NewTraceID(0, 1), - SpanID: model.NewSpanID(2), - Process: &model.Process{ - ServiceName: services[0], - }, - }, - { - TraceID: model.NewTraceID(0, 2), - SpanID: model.NewSpanID(3), - Process: &model.Process{ - ServiceName: services[1], - }, - }, - { - TraceID: model.NewTraceID(0, 2), - SpanID: model.NewSpanID(4), - Process: &model.Process{ - ServiceName: services[1], - }, - }, - } -) - -func TestReceiver_NoStorageError(t *testing.T) { - r := &receiverTest{ - storageName: "", - receiveName: "foo", - } - withReceiver(r, func(r *receiverTest) { - err := r.receiver.Start(context.Background(), r.host) - require.ErrorContains(t, err, "cannot find storage factory") - }) -} - -func TestReceiver_CreateSpanReaderError(t *testing.T) { - r := &receiverTest{ - storageName: "foo", - receiveName: "foo", - } - withReceiver(r, func(r *receiverTest) { - r.factory.On("CreateSpanReader").Return(nil, errors.New("mocked error")) - - err := r.receiver.Start(context.Background(), r.host) - require.ErrorContains(t, err, "cannot create span reader") - }) -} - -func TestReceiver_GetServiceError(t *testing.T) { - r := &receiverTest{ - storageName: "external-storage", - receiveName: "external-storage", - } - withReceiver(r, func(r *receiverTest) { - r.reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return([]string{}, errors.New("mocked error")) - r.factory.On("CreateSpanReader").Return(r.reader, nil) - r.receiver.spanReader = r.reader - r.reportStatus = func(se *component.StatusEvent) { - require.ErrorContains(t, se.Err(), "mocked error") - } - - require.NoError(t, r.receiver.Start(context.Background(), r.host)) - }) -} - -func TestReceiver_Shutdown(t *testing.T) { - withReceiver(&receiverTest{}, func(r *receiverTest) { - require.NoError(t, r.receiver.Shutdown(context.Background())) - }) -} - -func TestReceiver_Start(t *testing.T) { - r := &receiverTest{ - storageName: "external-storage", - receiveName: "external-storage", - receiveInterval: 50 * time.Millisecond, - } - withReceiver(r, func(r *receiverTest) { - r.reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return([]string{}, nil) - r.factory.On("CreateSpanReader").Return(r.reader, nil) - - require.NoError(t, r.receiver.Start(context.Background(), r.host)) - // let the consumeLoop to reach the end of iteration and sleep - time.Sleep(100 * time.Millisecond) - require.NoError(t, r.receiver.Shutdown(context.Background())) - }) -} - -func TestReceiver_StartConsume(t *testing.T) { - tests := []struct { - name string - services []string - traces []*model.Trace - tracesErr error - expectedTraces []*model.Trace - }{ - { - name: "empty service", - }, - { - name: "find traces error", - services: []string{"example-service"}, - tracesErr: errors.New("failed to find traces"), - }, - { - name: "consume first trace", - services: []string{services[0]}, - traces: []*model.Trace{ - {Spans: []*model.Span{spans[0]}}, - }, - expectedTraces: []*model.Trace{ - {Spans: []*model.Span{spans[0]}}, - }, - }, - { - name: "consume second trace", - services: services, - traces: []*model.Trace{ - {Spans: []*model.Span{spans[0]}}, - {Spans: []*model.Span{spans[2], spans[3]}}, - }, - expectedTraces: []*model.Trace{ - {Spans: []*model.Span{spans[0]}}, - {Spans: []*model.Span{spans[2]}}, - {Spans: []*model.Span{spans[3]}}, - }, - }, - { - name: "re-consume first trace with new spans", - services: services, - traces: []*model.Trace{ - {Spans: []*model.Span{spans[0], spans[1]}}, - {Spans: []*model.Span{spans[2], spans[3]}}, - }, - expectedTraces: []*model.Trace{ - {Spans: []*model.Span{spans[0]}}, - {Spans: []*model.Span{spans[2]}}, - {Spans: []*model.Span{spans[3]}}, - // span at index 1 is consumed last - {Spans: []*model.Span{spans[1]}}, - }, - }, - } - - withReceiver(&receiverTest{}, func(r *receiverTest) { - sink := &consumertest.TracesSink{} - r.receiver.nextConsumer = sink - - ctx, cancelFunc := context.WithCancel(context.Background()) - r.receiver.cancelConsumeLoop = cancelFunc - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - reader := new(spanStoreMocks.Reader) - reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return(test.services, nil) - reader.On( - "FindTraces", - mock.AnythingOfType("*context.cancelCtx"), - mock.AnythingOfType("*spanstore.TraceQueryParameters"), - ).Return(test.traces, test.tracesErr) - r.receiver.spanReader = reader - - require.NoError(t, r.receiver.Shutdown(ctx)) - require.NoError(t, r.receiver.consumeLoop(ctx)) - - expectedTraces := make([]ptrace.Traces, 0) - for _, trace := range test.expectedTraces { - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ - { - Spans: []*model.Span{trace.Spans[0]}, - Process: trace.Spans[0].Process, - }, - }) - require.NoError(t, err) - expectedTraces = append(expectedTraces, td) - } - actualTraces := sink.AllTraces() - assert.Equal(t, expectedTraces, actualTraces) - }) - } - }) -} diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml b/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml deleted file mode 100644 index e590e8f1694..00000000000 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/testdata/config.yaml +++ /dev/null @@ -1,6 +0,0 @@ -jaeger_storage_receiver: -jaeger_storage_receiver/defaults: - trace_storage: storage -jaeger_storage_receiver/filled: - trace_storage: storage - pull_interval: 2s