Skip to content

Commit

Permalink
add tests to token support in batch processor
Browse files Browse the repository at this point in the history
  • Loading branch information
dmolenda-sumo authored and pmm-sumo committed Feb 2, 2021
1 parent 694e572 commit 2024d26
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 3 deletions.
6 changes: 3 additions & 3 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ func (bp *batchProcessor) startProcessingCycle() {

func (bp *batchProcessor) processItemIfTokenUnchanged(itemAndContext itemWithContext, currentToken string, currentContext context.Context) (string, context.Context) {
newToken := getTokenFromContext(itemAndContext.ctx)
if currentToken != newToken {
if currentToken != newToken && currentToken != "" {
bp.timer.Stop()
bp.sendItems(currentContext, statBatchSizeTriggerSend)
bp.resetTimer()
currentToken = newToken
currentContext = itemAndContext.ctx
}
currentToken = newToken
currentContext = itemAndContext.ctx
bp.processItem(itemAndContext)
return currentToken, currentContext
}
Expand Down
93 changes: 93 additions & 0 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

"go.opentelemetry.io/collector/client"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -115,6 +117,97 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
assert.Equal(t, (requestCount*spansPerRequest)%int(cfg.SendBatchSize), sink.AllTraces()[len(sink.AllTraces())-1].SpanCount())
}

func TestBatchProcessorSpansWithDifferentTokens(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 50
spansPerRequest := 100
for requestNum := 0; requestNum < requestCount; requestNum++ {
tokenName := fmt.Sprintf("TOKEN%v", requestNum+1)
td := testdata.GenerateTraceDataManySpansSameResource(spansPerRequest)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex))
}
ctx := client.NewContext(context.Background(), &client.Client{IP: "", Token: tokenName})
assert.NoError(t, batcher.ConsumeTraces(ctx, td))
}

// Added to test logic that check for empty resources.
td := testdata.GenerateTraceDataEmpty()
batcher.ConsumeTraces(context.Background(), td)

// wait for all spans to be reported
for {
if sink.SpansCount() == requestCount*spansPerRequest {
break
}
<-time.After(cfg.Timeout)
}

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*spansPerRequest, sink.SpansCount())
for i := 0; i < len(sink.AllTraces()); i++ {
assert.Equal(t, spansPerRequest, sink.AllTraces()[i].SpanCount())
}
}

func TestBatchProcessorMergingSpansWithTheSameTokens(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 100
spansPerRequest := 50
numberOrRequestsWithTheSameToken := 5
for requestNum := 0; requestNum < requestCount/numberOrRequestsWithTheSameToken; requestNum++ {
tokenName := fmt.Sprintf("TOKEN%v", requestNum+1)
for sameTokenRequestNum := 0; sameTokenRequestNum < numberOrRequestsWithTheSameToken; sameTokenRequestNum++ {
td := testdata.GenerateTraceDataManySpansSameResource(spansPerRequest)
spans := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(requestNum*numberOrRequestsWithTheSameToken+sameTokenRequestNum, spanIndex))
}
ctx := client.NewContext(context.Background(), &client.Client{IP: "", Token: tokenName})
assert.NoError(t, batcher.ConsumeTraces(ctx, td))
}
}

// Added to test logic that check for empty resources.
td := testdata.GenerateTraceDataEmpty()
batcher.ConsumeTraces(context.Background(), td)

// wait for all spans to be reported
for {
if sink.SpansCount() == requestCount*spansPerRequest {
break
}
<-time.After(cfg.Timeout)
}

require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*spansPerRequest, sink.SpansCount())
numberOfFullBatches := spansPerRequest * numberOrRequestsWithTheSameToken / int(cfg.SendBatchMaxSize)
for i := 0; i < len(sink.AllTraces()); i = i + numberOfFullBatches + 1 {
for j := 0; j < numberOfFullBatches; j++ {
assert.Equal(t, int(cfg.SendBatchMaxSize), sink.AllTraces()[i+j].SpanCount())
}
assert.Equal(t, spansPerRequest*numberOrRequestsWithTheSameToken%int(cfg.SendBatchMaxSize), sink.AllTraces()[i+numberOfFullBatches].SpanCount())
}
}

func TestBatchProcessorSentBySize(t *testing.T) {
views := MetricViews()
require.NoError(t, view.Register(views...))
Expand Down

0 comments on commit 2024d26

Please sign in to comment.