Skip to content

Commit

Permalink
feat: remove async handling code
Browse files Browse the repository at this point in the history
  • Loading branch information
kruskall committed Dec 29, 2023
1 parent dca9298 commit 06cce55
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 217 deletions.
81 changes: 11 additions & 70 deletions input/elasticapm/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (

errEmptyBody = errors.New("empty body")

// ErrQueueFull may be returned by HandleStream when the internal
// queue is full.
ErrQueueFull = errors.New("queue is full")

batchPool sync.Pool
)

Expand Down Expand Up @@ -243,7 +239,6 @@ func (p *Processor) readBatch(
// Callers must not access result concurrently with HandleStream.
func (p *Processor) HandleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
reader io.Reader,
batchSize int,
Expand All @@ -256,25 +251,15 @@ func (p *Processor) HandleStream(
//
// The semaphore defaults to 200 (N), only allowing N requests to read
// an cache Y events (determined by batchSize) from the batch.
//
// Clients can set async to true which makes the processor process the
// events in the background. Returns with an error `ErrQueueFull`
// if the semaphore is full. When asynchronous processing is requested,
// the batches are decoded synchronously, but the batch is processed
// asynchronously.
if err := p.semAcquire(ctx, async); err != nil {
if err := p.semAcquire(ctx); err != nil {
return fmt.Errorf("cannot acquire semaphore: %w", err)
}
sr := p.getStreamReader(reader)

// Release the semaphore on early exit; this will be set to false
// for asynchronous requests once we may no longer exit early.
shouldReleaseSemaphore := true
// Release the semaphore on early exit
defer func() {
sr.release()
if shouldReleaseSemaphore {
p.sem.Release(1)
}
p.sem.Release(1)
}()

// The first item is the metadata object.
Expand All @@ -292,80 +277,42 @@ func (p *Processor) HandleStream(
}
}

if async {
// The semaphore is released by handleStream
shouldReleaseSemaphore = false
}
first := true
for {
err := p.handleStream(ctx, async, baseEvent, batchSize, sr, processor, result, first)
err := p.handleStream(ctx, baseEvent, batchSize, sr, processor, result)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("cannot handle stream: %w", err)
}
if first {
first = false
}
}
}

func (p *Processor) handleStream(
ctx context.Context,
async bool,
baseEvent *modelpb.APMEvent,
batchSize int,
sr *streamReader,
processor modelpb.BatchProcessor,
result *Result,
first bool,
) (readErr error) {
// Async requests will re-aquire the semaphore if it has more events than
// `batchSize`. In that event, the semaphore will be acquired again. If
// the semaphore is full, `ErrQueueFull` is returned.
) error {
// The first iteration will not acquire the semaphore since it's already
// acquired in the caller function.
var n int
if async {
if !first {
if err := p.semAcquire(ctx, async); err != nil {
return fmt.Errorf("cannot re-acquire semaphore: %w", err)
}
}
defer func() {
// If no events have been read on an asynchronous request, release
// the semaphore since the processing goroutine isn't scheduled.
if n == 0 {
p.sem.Release(1)
}
}()
}
var batch modelpb.Batch
if b, ok := batchPool.Get().(*modelpb.Batch); ok {
batch = (*b)[:0]
}
n, readErr = p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
n, readErr := p.readBatch(ctx, baseEvent, batchSize, &batch, sr, result)
if n == 0 {
// No events to process, return the batch to the pool.
batchPool.Put(&batch)
return readErr
}
// Async requests are processed in the background and once the batch has
// been processed, the semaphore is released.
if async {
go func() {
defer p.sem.Release(1)
if err := p.processBatch(ctx, processor, &batch); err != nil {
p.logger.Error("failed handling async request", zap.Error(err))
}
}()
} else {
if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n

if err := p.processBatch(ctx, processor, &batch); err != nil {
return fmt.Errorf("cannot process batch: %w", err)
}
result.Accepted += n
return readErr
}

Expand All @@ -392,16 +339,10 @@ func (p *Processor) getStreamReader(r io.Reader) *streamReader {
}
}

func (p *Processor) semAcquire(ctx context.Context, async bool) error {
func (p *Processor) semAcquire(ctx context.Context) error {
sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

if async {
if ok := p.sem.TryAcquire(1); !ok {
return ErrQueueFull
}
return nil
}
return p.sem.Acquire(ctx, 1)
}

Expand Down
154 changes: 7 additions & 147 deletions input/elasticapm/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -58,7 +57,7 @@ func TestHandleStreamReaderError(t *testing.T) {

var actualResult Result
err := sp.HandleStream(
context.Background(), false, &modelpb.APMEvent{},
context.Background(), &modelpb.APMEvent{},
reader, 10, nopBatchProcessor{}, &actualResult,
)
assert.ErrorIs(t, err, readErr)
Expand All @@ -79,9 +78,6 @@ func TestHandleStreamBatchProcessorError(t *testing.T) {
}{{
name: "NotQueueFull",
err: errors.New("queue is not full, something else is wrong"),
}, {
name: "QueueFull",
err: ErrQueueFull,
}} {
sp := NewProcessor(Config{
MaxEventSize: 100 * 1024,
Expand All @@ -93,7 +89,7 @@ func TestHandleStreamBatchProcessorError(t *testing.T) {

var actualResult Result
err := sp.HandleStream(
context.Background(), false, &modelpb.APMEvent{},
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, processor, &actualResult,
)
assert.ErrorIs(t, err, test.err)
Expand Down Expand Up @@ -191,7 +187,7 @@ func TestHandleStreamErrors(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), false, &modelpb.APMEvent{},
context.Background(), &modelpb.APMEvent{},
strings.NewReader(test.payload), 10,
nopBatchProcessor{}, &actualResult,
)
Expand Down Expand Up @@ -226,7 +222,7 @@ func TestHandleStream(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), false, &modelpb.APMEvent{},
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
&Result{},
)
Expand Down Expand Up @@ -265,7 +261,7 @@ func TestHandleStreamRUMv3(t *testing.T) {
})
var result Result
err := p.HandleStream(
context.Background(), false, &modelpb.APMEvent{},
context.Background(), &modelpb.APMEvent{},
strings.NewReader(payload), 10, batchProcessor,
&result,
)
Expand Down Expand Up @@ -316,7 +312,7 @@ func TestHandleStreamBaseEvent(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
err := p.HandleStream(
context.Background(), false, &baseEvent,
context.Background(), &baseEvent,
strings.NewReader(payload), 10, batchProcessor,
&Result{},
)
Expand Down Expand Up @@ -353,7 +349,7 @@ func TestLabelLeak(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})
var actualResult Result
err := p.HandleStream(context.Background(), false, baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult)
err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult)
require.NoError(t, err)

txs := processed
Expand All @@ -374,142 +370,6 @@ func TestLabelLeak(t *testing.T) {
assert.Equal(t, modelpb.Labels{"ci_commit": {Global: true, Value: "unknown"}}, modelpb.Labels(txs[1].Labels))
}

func TestConcurrentAsync(t *testing.T) {
smallBatch := validMetadata + "\n" + validTransaction + "\n"
bigBatch := validMetadata + "\n" + strings.Repeat(validTransaction+"\n", 2000)

type testCase struct {
payload string
sem int64
requests int
fullSem bool
}

test := func(tc testCase) (pResult Result) {
var wg sync.WaitGroup
var mu sync.Mutex
p := NewProcessor(Config{
MaxEventSize: 100 * 1024,
Semaphore: semaphore.NewWeighted(tc.sem),
})
if tc.fullSem {
for i := int64(0); i < tc.sem; i++ {
p.semAcquire(context.Background(), false)
}
}
handleStream := func(ctx context.Context, bp *accountProcessor) {
wg.Add(1)
go func() {
defer wg.Done()
var result Result
base := &modelpb.APMEvent{
Host: &modelpb.Host{
Ip: []*modelpb.IP{
modelpb.MustParseIP("192.0.0.1"),
},
},
}
err := p.HandleStream(ctx, true, base, strings.NewReader(tc.payload), 10, bp, &result)
if err != nil {
result.addError(err)
}
if !tc.fullSem {
select {
case <-bp.batch:
case <-ctx.Done():
}
}
mu.Lock()
if len(result.Errors) > 0 {
pResult.Errors = append(pResult.Errors, result.Errors...)
}
mu.Unlock()
}()
}
batchProcessor := &accountProcessor{batch: make(chan *modelpb.Batch, tc.requests)}
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
for i := 0; i < tc.requests; i++ {
handleStream(ctx, batchProcessor)
}
wg.Wait()
if !tc.fullSem {
// Try to acquire the lock to make sure all the requests have been handled
// and the locks have been released.
for i := int64(0); i < tc.sem; i++ {
p.semAcquire(context.Background(), false)
}
}
processed := batchProcessor.processed.Load()
pResult.Accepted += int(processed)
return
}

t.Run("semaphore_full", func(t *testing.T) {
res := test(testCase{
sem: 2,
requests: 3,
fullSem: true,
payload: smallBatch,
})
assert.Equal(t, 0, res.Accepted)
assert.Equal(t, 3, len(res.Errors))
for _, err := range res.Errors {
assert.ErrorIs(t, err, ErrQueueFull)
}
})
t.Run("semaphore_undersized", func(t *testing.T) {
res := test(testCase{
sem: 2,
requests: 100,
payload: bigBatch,
})
// When the semaphore is full, `ErrQueueFull` is returned.
assert.Greater(t, len(res.Errors), 0)
for _, err := range res.Errors {
assert.ErrorIs(t, err, ErrQueueFull)
}
})
t.Run("semaphore_empty", func(t *testing.T) {
res := test(testCase{
sem: 5,
requests: 5,
payload: smallBatch,
})
assert.Equal(t, 5, res.Accepted)
assert.Equal(t, 0, len(res.Errors))

res = test(testCase{
sem: 5,
requests: 5,
payload: bigBatch,
})
assert.GreaterOrEqual(t, res.Accepted, 5)
// all the request will return with an error since only 50 events of
// each (5 requests * batch size) will be processed.
assert.Equal(t, 5, len(res.Errors))
})
t.Run("semaphore_empty_incorrect_metadata", func(t *testing.T) {
res := test(testCase{
sem: 5,
requests: 5,
payload: `{"metadata": {"siervice":{}}}`,
})
assert.Equal(t, 0, res.Accepted)
assert.Len(t, res.Errors, 5)

incorrectEvent := `{"metadata": {"service": {"name": "testsvc", "environment": "staging", "version": null, "agent": {"name": "python", "version": "6.9.1"}, "language": {"name": "python", "version": "3.10.4"}, "runtime": {"name": "CPython", "version": "3.10.4"}, "framework": {"name": "flask", "version": "2.1.1"}}, "process": {"pid": 2112739, "ppid": 2112738, "argv": ["/home/stuart/workspace/sdh/581/venv/lib/python3.10/site-packages/flask/__main__.py", "run"], "title": null}, "system": {"hostname": "slaptop", "architecture": "x86_64", "platform": "linux"}, "labels": {"ci_commit": "unknown", "numeric": 1}}}
{"some_incorrect_event": {}}`
res = test(testCase{
sem: 5,
requests: 2,
payload: incorrectEvent,
})
assert.Equal(t, 0, res.Accepted)
assert.Len(t, res.Errors, 2)
})
}

type nopBatchProcessor struct{}

func (nopBatchProcessor) ProcessBatch(context.Context, *modelpb.Batch) error {
Expand Down

0 comments on commit 06cce55

Please sign in to comment.