From 1d22c3778bf8be15c10eaffc89058992b2946679 Mon Sep 17 00:00:00 2001 From: kruskal <99559985+kruskall@users.noreply.github.com> Date: Wed, 29 Nov 2023 02:06:55 +0100 Subject: [PATCH] feat: compute tbs entry size before writing and flush to avoid limit error --- .../sampling/eventstorage/storage.go | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 872d5ec948f..b2e5124782b 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -20,6 +20,8 @@ const ( entryMetaTraceSampled = 's' entryMetaTraceUnsampled = 'u' entryMetaTraceEvent = 'e' + + defaultPendingSize = 1024 ) var ( @@ -65,8 +67,9 @@ func (s *Storage) NewShardedReadWriter() *ShardedReadWriter { // The returned ReadWriter must be closed when it is no longer needed. func (s *Storage) NewReadWriter() *ReadWriter { return &ReadWriter{ - s: s, - txn: s.db.NewTransaction(true), + s: s, + txn: s.db.NewTransaction(true), + pendingSize: defaultPendingSize, } } @@ -105,6 +108,7 @@ type ReadWriter struct { // be unmodified until the end of a transaction. readKeyBuf []byte pendingWrites int + pendingSize int64 } // Close closes the writer. Any writes that have not been flushed may be lost. @@ -126,19 +130,10 @@ func (rw *ReadWriter) Close() { // files exceeds the configured threshold. func (rw *ReadWriter) Flush(limit int64) error { const flushErrFmt = "failed to flush pending writes: %w" - if current, limitReached := rw.s.limitReached(limit); limitReached { - // Discard the txn and re-create it if the soft limit has been reached. - rw.txn.Discard() - rw.txn = rw.s.db.NewTransaction(true) - rw.pendingWrites = 0 - return fmt.Errorf( - flushErrFmt+" (current: %d, limit: %d)", - ErrLimitReached, current, limit, - ) - } err := rw.txn.Commit() rw.txn = rw.s.db.NewTransaction(true) rw.pendingWrites = 0 + rw.pendingSize = defaultPendingSize if err != nil { return fmt.Errorf(flushErrFmt, err) } @@ -185,18 +180,26 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb. func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error { rw.pendingWrites++ - err := rw.txn.SetEntry(e.WithTTL(opts.TTL)) - // Attempt to flush if there are 200 or more uncommitted writes. - // This ensures calls to ReadTraceEvents are not slowed down; - // ReadTraceEvents uses an iterator, which must sort all keys - // of uncommitted writes. - // The 200 value yielded a good balance between read and write speed: - // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643 - if rw.pendingWrites >= 200 { + entrySize := int64(estimateSize(e)) + 10 + lsm, vlog := rw.s.db.Size() + + if rw.pendingSize+entrySize+lsm+vlog >= opts.StorageLimitInBytes { + if err := rw.Flush(opts.StorageLimitInBytes); err != nil { + return err + } + } else if rw.pendingWrites >= 200 { + // Attempt to flush if there are 200 or more uncommitted writes. + // This ensures calls to ReadTraceEvents are not slowed down; + // ReadTraceEvents uses an iterator, which must sort all keys + // of uncommitted writes. + // The 200 value yielded a good balance between read and write speed: + // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643 if err := rw.Flush(opts.StorageLimitInBytes); err != nil { return err } } + err := rw.txn.SetEntry(e.WithTTL(opts.TTL)) + // If the transaction is already too big to accommodate the new entry, flush // the existing transaction and set the entry on a new one, otherwise, // returns early. @@ -206,9 +209,14 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error { if err := rw.Flush(opts.StorageLimitInBytes); err != nil { return err } + rw.pendingSize += entrySize return rw.txn.SetEntry(e) } +func estimateSize(e *badger.Entry) int { + return len(e.Key) + len(e.Value) + 12 + 2 +} + // DeleteTraceEvent deletes the trace event from storage. func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error { key := append(append([]byte(traceID), ':'), id...)