From 23ac871745649eb3a62d5121f7953c43a14b9216 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Tue, 6 Feb 2024 18:54:09 +0100 Subject: [PATCH] fix: do not stop sampling processor when failing to delete trace events (#12509) * fix: do not stop sampling processor when failing to delete trace events The sampling processor should never stop when apm-server is running. Instead log an error on Warn level and skip the current event. * fix: handle ErrTxnTooBig when deleting trace events (cherry picked from commit 6f0be721473125d0e1e219e79e8d5c656b901704) --- x-pack/apm-server/sampling/eventstorage/storage.go | 11 +++++++++++ x-pack/apm-server/sampling/processor.go | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 91fa71b6060..816f22a2d1b 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -243,6 +243,17 @@ func estimateSize(e *badger.Entry) int64 { // DeleteTraceEvent deletes the trace event from storage. func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error { key := append(append([]byte(traceID), ':'), id...) + err := rw.txn.Delete(key) + // If the transaction is already too big to accommodate the new entry, flush + // the existing transaction and set the entry on a new one, otherwise, + // returns early. + if err != badger.ErrTxnTooBig { + return err + } + if err := rw.Flush(); err != nil { + return err + } + return rw.txn.Delete(key) } diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index d91160ef4bb..10e464948f7 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -529,11 +529,11 @@ func (p *Processor) Run() error { switch event.Type() { case modelpb.TransactionEventType: if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Transaction.Id); err != nil { - return errors.Wrap(err, "failed to delete transaction from local storage") + p.logger.With(logp.Error(err)).Warn("failed to delete transaction from local storage") } case modelpb.SpanEventType: if err := p.eventStore.DeleteTraceEvent(event.Trace.Id, event.Span.Id); err != nil { - return errors.Wrap(err, "failed to delete span from local storage") + p.logger.With(logp.Error(err)).Warn("failed to delete span from local storage") } } }