Skip to content

Commit

Permalink
feat: compute tbs entry size before writing and flush to avoid limit …
Browse files Browse the repository at this point in the history
…error
  • Loading branch information
kruskall committed Nov 29, 2023
1 parent 84c49e7 commit 1d22c37
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
entryMetaTraceSampled = 's'
entryMetaTraceUnsampled = 'u'
entryMetaTraceEvent = 'e'

defaultPendingSize = 1024
)

var (
Expand Down Expand Up @@ -65,8 +67,9 @@ func (s *Storage) NewShardedReadWriter() *ShardedReadWriter {
// The returned ReadWriter must be closed when it is no longer needed.
func (s *Storage) NewReadWriter() *ReadWriter {
return &ReadWriter{
s: s,
txn: s.db.NewTransaction(true),
s: s,
txn: s.db.NewTransaction(true),
pendingSize: defaultPendingSize,
}
}

Expand Down Expand Up @@ -105,6 +108,7 @@ type ReadWriter struct {
// be unmodified until the end of a transaction.
readKeyBuf []byte
pendingWrites int
pendingSize int64
}

// Close closes the writer. Any writes that have not been flushed may be lost.
Expand All @@ -126,19 +130,10 @@ func (rw *ReadWriter) Close() {
// files exceeds the configured threshold.
func (rw *ReadWriter) Flush(limit int64) error {
const flushErrFmt = "failed to flush pending writes: %w"
if current, limitReached := rw.s.limitReached(limit); limitReached {
// Discard the txn and re-create it if the soft limit has been reached.
rw.txn.Discard()
rw.txn = rw.s.db.NewTransaction(true)
rw.pendingWrites = 0
return fmt.Errorf(
flushErrFmt+" (current: %d, limit: %d)",
ErrLimitReached, current, limit,
)
}
err := rw.txn.Commit()
rw.txn = rw.s.db.NewTransaction(true)
rw.pendingWrites = 0
rw.pendingSize = defaultPendingSize
if err != nil {
return fmt.Errorf(flushErrFmt, err)
}
Expand Down Expand Up @@ -185,18 +180,26 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb.

func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
rw.pendingWrites++
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if rw.pendingWrites >= 200 {
entrySize := int64(estimateSize(e)) + 10
lsm, vlog := rw.s.db.Size()

if rw.pendingSize+entrySize+lsm+vlog >= opts.StorageLimitInBytes {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
} else if rw.pendingWrites >= 200 {
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
}
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))

// If the transaction is already too big to accommodate the new entry, flush
// the existing transaction and set the entry on a new one, otherwise,
// returns early.
Expand All @@ -206,9 +209,14 @@ func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
return err
}
rw.pendingSize += entrySize
return rw.txn.SetEntry(e)
}

func estimateSize(e *badger.Entry) int {
return len(e.Key) + len(e.Value) + 12 + 2
}

// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
Expand Down

0 comments on commit 1d22c37

Please sign in to comment.