Skip to content

Commit

Permalink
expand Result struct to include accepted signal type details (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs authored Dec 11, 2024
1 parent 4855547 commit 209af03
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 35 deletions.
22 changes: 15 additions & 7 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,27 @@ func (p *Processor) handleStream(
if n == 0 {
return readErr
}

if err := p.processBatch(ctx, processor, batch); err != nil {
if err := processor.ProcessBatch(ctx, batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
for _, v := range *batch {
switch v.Type() {
case modelpb.ErrorEventType:
result.AcceptedDetails.Error++
case modelpb.SpanEventType:
result.AcceptedDetails.Span++
case modelpb.TransactionEventType:
result.AcceptedDetails.Transaction++
case modelpb.MetricEventType:
result.AcceptedDetails.Metric++
case modelpb.LogEventType:
result.AcceptedDetails.Log++
}
}
result.Accepted += n
return readErr
}

// processBatch processes the batch and returns the events to the pool after it's been processed.
func (p *Processor) processBatch(ctx context.Context, processor modelpb.BatchProcessor, batch *modelpb.Batch) error {
return processor.ProcessBatch(ctx, batch)
}

// getStreamReader returns a streamReader that reads ND-JSON lines from r.
func (p *Processor) getStreamReader(r io.Reader) *streamReader {
return &streamReader{
Expand Down
106 changes: 79 additions & 27 deletions input/elasticapm/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,17 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

const batchSize = 10

type readerFunc func([]byte) (int, error)

func (f readerFunc) Read(p []byte) (int, error) {
return f(p)
}

func TestHandleStreamReaderError(t *testing.T) {
readErr := errors.New("read failed")
cnt := 5
var calls int
var reader readerFunc = func(p []byte) (int, error) {
calls++
Expand All @@ -43,7 +52,7 @@ func TestHandleStreamReaderError(t *testing.T) {
}
buf := bytes.NewBuffer(nil)
buf.WriteString(validMetadata + "\n")
for i := 0; i < 5; i++ {
for i := 0; i < cnt; i++ {
buf.WriteString(validTransaction + "\n")
}
return copy(p, buf.Bytes()), nil
Expand All @@ -56,17 +65,20 @@ func TestHandleStreamReaderError(t *testing.T) {

var actualResult Result
err := sp.HandleStream(
context.Background(), &modelpb.APMEvent{},
reader, 10, nopBatchProcessor{}, &actualResult,
context.Background(),
&modelpb.APMEvent{},
reader,
batchSize,
nopBatchProcessor{},
&actualResult,
)
assert.ErrorIs(t, err, readErr)
assert.Equal(t, Result{Accepted: 5}, actualResult)
}

type readerFunc func([]byte) (int, error)

func (f readerFunc) Read(p []byte) (int, error) {
return f(p)
assert.Equal(t, Result{
Accepted: cnt,
AcceptedDetails: AcceptedDetails{
Transaction: cnt,
},
}, actualResult)
}

func TestHandleStreamBatchProcessorError(t *testing.T) {
Expand All @@ -85,11 +97,14 @@ func TestHandleStreamBatchProcessorError(t *testing.T) {
processor := modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error {
return test.err
})

var actualResult Result
err := sp.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, processor, &actualResult,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
processor,
&actualResult,
)
assert.ErrorIs(t, err, test.err)
assert.Zero(t, actualResult)
Expand Down Expand Up @@ -186,9 +201,12 @@ func TestHandleStreamErrors(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(test.payload), 10,
nopBatchProcessor{}, &actualResult,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(test.payload),
batchSize,
nopBatchProcessor{},
&actualResult,
)
assert.Equal(t, test.err, err)
assert.Zero(t, actualResult.Accepted)
Expand Down Expand Up @@ -220,13 +238,31 @@ func TestHandleStream(t *testing.T) {
MaxEventSize: 100 * 1024,
Semaphore: semaphore.NewWeighted(1),
})

var actualResult Result
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
&Result{},
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
batchProcessor,
&actualResult,
)
require.NoError(t, err)

// Assert that batch result is properly populated.
assert.Equal(t, Result{
Accepted: 5,
AcceptedDetails: AcceptedDetails{
Transaction: 1,
Span: 1,
Metric: 1,
Log: 1,
Error: 1,
},
}, actualResult)

// Assert that processor is properly executed.
processors := make([]modelpb.APMEventType, len(events))
for i, event := range events {
processors[i] = event.Type()
Expand Down Expand Up @@ -260,8 +296,11 @@ func TestHandleStreamRUMv3(t *testing.T) {
})
var result Result
err := p.HandleStream(
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
context.Background(),
&modelpb.APMEvent{},
strings.NewReader(payload),
batchSize,
batchProcessor,
&result,
)
require.NoError(t, err)
Expand Down Expand Up @@ -311,8 +350,11 @@ func TestHandleStreamBaseEvent(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), &baseEvent,
strings.NewReader(payload), 10, batchProcessor,
context.Background(),
&baseEvent,
strings.NewReader(payload),
batchSize,
batchProcessor,
&Result{},
)
require.NoError(t, err)
Expand Down Expand Up @@ -347,12 +389,18 @@ func TestLabelLeak(t *testing.T) {
MaxEventSize: 100 * 1024,
Semaphore: semaphore.NewWeighted(1),
})
var actualResult Result
err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult)
err := p.HandleStream(
context.Background(),
baseEvent,
strings.NewReader(payload),
batchSize,
batchProcessor,
&Result{})
require.NoError(t, err)

txs := processed
assert.Len(t, txs, 2)

// Assert first tx
assert.Equal(t, modelpb.NumericLabels{
"time_set": {Value: 1652185276},
Expand All @@ -365,8 +413,12 @@ func TestLabelLeak(t *testing.T) {
}, modelpb.Labels(txs[0].Labels))

// Assert second tx
assert.Equal(t, modelpb.NumericLabels{"numeric": {Global: true, Value: 1}}, modelpb.NumericLabels(txs[1].NumericLabels))
assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels))
assert.Equal(t, modelpb.NumericLabels{
"numeric": {Global: true, Value: 1},
}, modelpb.NumericLabels(txs[1].NumericLabels))
assert.Equal(t, modelpb.Labels{
"ci_commit": {Global: true, Value: "unknown"},
}, modelpb.Labels(txs[1].Labels))
}

type nopBatchProcessor struct{}
Expand Down
19 changes: 18 additions & 1 deletion input/elasticapm/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,37 @@ import (

type Result struct {
errorsSpace [5]error

// Errors holds a limited number of errors that occurred while
// processing the event stream. If the limit is reached, the
// counters above are still incremented.
Errors []error
// Accepted holds the number of valid events accepted.

// Accepted holds the total number of valid events accepted.
Accepted int

// AcceptedDetails provides a detailed breakdown of the count
// of valid events categorized by signal type.
AcceptedDetails AcceptedDetails

// TooLarge holds the number of events that were rejected due
// to exceeding the event size limit.
TooLarge int

// Invalid holds the number of events that were rejected due
// to being invalid, excluding those that are counted by TooLarge.
Invalid int
}

// ProcessedDetail holds the number of events processed for each type.
type AcceptedDetails struct {
Transaction int
Span int
Metric int
Log int
Error int
}

func (r *Result) addError(err error) {
var invalid *InvalidInputError
if errors.As(err, &invalid) {
Expand Down

0 comments on commit 209af03

Please sign in to comment.