diff --git a/input/elasticapm/processor.go b/input/elasticapm/processor.go index 9c8ee0f4..00d1d5c2 100644 --- a/input/elasticapm/processor.go +++ b/input/elasticapm/processor.go @@ -39,6 +39,8 @@ import ( var ( errUnrecognizedObject = errors.New("did not recognize object type") + errEmptyBody = errors.New("empty body") + // ErrQueueFull may be returned by HandleStream when the internal // queue is full. ErrQueueFull = errors.New("queue is full") @@ -114,6 +116,9 @@ func (p *Processor) readMetadata(reader *streamReader, out *modelpb.APMEvent) er body, err := reader.ReadAhead() if err != nil { if err == io.EOF { + if len(reader.LatestLine()) == 0 { + return errEmptyBody + } return &InvalidInputError{ Message: "EOF while reading metadata", Document: string(reader.LatestLine()), @@ -274,6 +279,9 @@ func (p *Processor) HandleStream( // The first item is the metadata object. if err := p.readMetadata(sr, baseEvent); err != nil { + if err == errEmptyBody { + return nil + } // no point in continuing if we couldn't read the metadata if _, ok := err.(*InvalidInputError); ok { return err diff --git a/input/elasticapm/processor_test.go b/input/elasticapm/processor_test.go index 7d96b3cb..12c6960e 100644 --- a/input/elasticapm/processor_test.go +++ b/input/elasticapm/processor_test.go @@ -169,6 +169,8 @@ func TestHandleStreamErrors(t *testing.T) { Document: invalidEventType, }, }, + }, { + name: "EmptyEvent", }, { name: "TooLargeEvent", payload: validMetadata + "\n" + tooLargeEvent + "\n",