diff --git a/go.mod b/go.mod index b6f4d255361..7657acc631a 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/KimMachineGun/automemlimit v0.7.0-pre.3 github.com/cespare/xxhash/v2 v2.3.0 + github.com/cockroachdb/pebble v1.1.2 github.com/dgraph-io/badger/v2 v2.2007.4 github.com/dustin/go-humanize v1.0.1 github.com/elastic/apm-aggregation v1.2.0 @@ -66,9 +67,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cockroachdb/errors v1.11.3 // indirect - github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect - github.com/cockroachdb/pebble v1.1.2 // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -159,3 +158,5 @@ require ( ) replace github.com/dop251/goja => github.com/elastic/goja v0.0.0-20190128172624-dd2ac4456e20 // pin to version used by beats + +replace github.com/cockroachdb/pebble => github.com/carsonip/pebble v0.0.0-20250114162318-fa34738bbef0 diff --git a/go.sum b/go.sum index bb3ad3ed4c6..bdf93fc0ce9 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,8 @@ github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgc github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/carsonip/pebble v0.0.0-20250114162318-fa34738bbef0 h1:FSsCMsR/nTCbTsfbxQu2Xy5VArWxzgjBXRe0uEJiMMI= +github.com/carsonip/pebble v0.0.0-20250114162318-fa34738bbef0/go.mod h1:sEHm5NOXxyiAoKWhoFxT8xMgd/f3RA6qUqQ1BXKrh2E= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -71,12 +73,8 @@ github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaY github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= -github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0 h1:pU88SPhIFid6/k0egdR5V6eALQYq2qbSmukrkgIh/0A= -github.com/cockroachdb/fifo v0.0.0-20240816210425-c5d0cb0b6fc0/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= -github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA= -github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index f9a6a2a85e7..60e74a5f04f 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -40,9 +40,9 @@ var ( // will hopefully disappear in the future, when agents no longer send unsampled transactions. samplingMonitoringRegistry = monitoring.Default.GetRegistry("apm-server.sampling") - // badgerDB holds the badger database to use when tail-based sampling is configured. - badgerMu sync.Mutex - badgerDB *eventstorage.StorageManager + // db holds the database to use when tail-based sampling is configured. + dbMu sync.Mutex + db *eventstorage.StorageManager storageMu sync.Mutex storage *eventstorage.ManagedReadWriter @@ -117,11 +117,11 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er } storageDir := paths.Resolve(paths.Data, tailSamplingStorageDir) - badgerDB, err = getBadgerDB(storageDir) + db, err := getDB(storageDir) if err != nil { return nil, fmt.Errorf("failed to get Badger database: %w", err) } - readWriter := getStorage(badgerDB) + readWriter := getStorage(db) policies := make([]sampling.Policy, len(tailSamplingConfig.Policies)) for i, in := range tailSamplingConfig.Policies { @@ -155,7 +155,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er UUID: samplerUUID.String(), }, StorageConfig: sampling.StorageConfig{ - DB: badgerDB, + DB: db, Storage: readWriter, StorageDir: storageDir, StorageGCInterval: tailSamplingConfig.StorageGCInterval, @@ -166,17 +166,17 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }) } -func getBadgerDB(storageDir string) (*eventstorage.StorageManager, error) { - badgerMu.Lock() - defer badgerMu.Unlock() - if badgerDB == nil { +func getDB(storageDir string) (*eventstorage.StorageManager, error) { + dbMu.Lock() + defer dbMu.Unlock() + if db == nil { sm, err := eventstorage.NewStorageManager(storageDir) if err != nil { return nil, err } - badgerDB = sm + db = sm } - return badgerDB, nil + return db, nil } func getStorage(sm *eventstorage.StorageManager) *eventstorage.ManagedReadWriter { @@ -251,11 +251,11 @@ func wrapServer(args beater.ServerParams, runServer beater.RunServerFunc) (beate // closeBadger is called at process exit time to close the badger.DB opened // by the tail-based sampling processor constructor, if any. This is never -// called concurrently with opening badger.DB/accessing the badgerDB global, -// so it does not need to hold badgerMu. +// called concurrently with opening badger.DB/accessing the db global, +// so it does not need to hold dbMu. func closeBadger() error { - if badgerDB != nil { - return badgerDB.Close() + if db != nil { + return db.Close() } return nil } diff --git a/x-pack/apm-server/sampling/eventstorage/badger.go b/x-pack/apm-server/sampling/eventstorage/badger.go deleted file mode 100644 index 99219262d4f..00000000000 --- a/x-pack/apm-server/sampling/eventstorage/badger.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License 2.0; -// you may not use this file except in compliance with the Elastic License 2.0. - -package eventstorage - -import ( - "github.com/dgraph-io/badger/v2" - - "github.com/elastic/apm-server/internal/logs" - "github.com/elastic/elastic-agent-libs/logp" -) - -const ( - defaultValueLogFileSize = 64 * 1024 * 1024 -) - -// OpenBadger creates or opens a Badger database with the specified location -// and value log file size. If the value log file size is <= 0, the default -// of 64MB will be used. -// -// NOTE(axw) only one badger.DB for a given storage directory may be open at any given time. -func OpenBadger(storageDir string, valueLogFileSize int64) (*badger.DB, error) { - logger := logp.NewLogger(logs.Sampling) - // Tunable memory options: - // - NumMemtables - default 5 in-mem tables (MaxTableSize default) - // - NumLevelZeroTables - default 5 - number of L0 tables before compaction starts. - // - NumLevelZeroTablesStall - number of L0 tables before writing stalls (waiting for compaction). - // - IndexCacheSize - default all in mem, Each table has its own bloom filter and each bloom filter is approximately of 5 MB. - // - MaxTableSize - Default 64MB - if valueLogFileSize <= 0 { - valueLogFileSize = defaultValueLogFileSize - } - const tableLimit = 4 - badgerOpts := badger.DefaultOptions(storageDir). - WithLogger(&LogpAdaptor{Logger: logger}). - WithTruncate(true). // Truncate unreadable files which cannot be read. - WithNumMemtables(tableLimit). // in-memory tables. - WithNumLevelZeroTables(tableLimit). // L0 tables. - WithNumLevelZeroTablesStall(tableLimit * 3). // Maintain the default 1-to-3 ratio before stalling. - WithMaxTableSize(int64(16 << 20)). // Max LSM table or file size. - WithValueLogFileSize(valueLogFileSize) // vlog file size. - - return badger.Open(badgerOpts) -} diff --git a/x-pack/apm-server/sampling/eventstorage/logger.go b/x-pack/apm-server/sampling/eventstorage/logger.go deleted file mode 100644 index 695a68e8a78..00000000000 --- a/x-pack/apm-server/sampling/eventstorage/logger.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License 2.0; -// you may not use this file except in compliance with the Elastic License 2.0. - -package eventstorage - -import ( - "fmt" - "sync" - - "github.com/elastic/elastic-agent-libs/logp" -) - -// LogpAdaptor adapts logp.Logger to the badger.Logger interface. -type LogpAdaptor struct { - *logp.Logger - - mu sync.RWMutex - last string -} - -// Errorf prints the log message when the current message isn't the same as the -// previously logged message. -func (a *LogpAdaptor) Errorf(format string, args ...interface{}) { - msg := fmt.Sprintf(format, args...) - if a.setLast(msg) { - a.Logger.Errorf(format, args...) - } -} - -func (a *LogpAdaptor) setLast(msg string) bool { - a.mu.RLock() - if msg != a.last { - a.mu.RUnlock() - return false - } - a.mu.RUnlock() - a.mu.Lock() - defer a.mu.Unlock() - shouldSet := msg != a.last - if shouldSet { - a.last = msg - } - return shouldSet -} - -// Warningf adapts badger.Logger.Warningf to logp.Logger.Warngf. -func (a *LogpAdaptor) Warningf(format string, args ...interface{}) { - a.Warnf(format, args...) -} diff --git a/x-pack/apm-server/sampling/eventstorage/pebble.go b/x-pack/apm-server/sampling/eventstorage/pebble.go new file mode 100644 index 00000000000..95aa4ae13b5 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/pebble.go @@ -0,0 +1,60 @@ +package eventstorage + +import ( + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/bloom" + + "github.com/elastic/apm-server/internal/logs" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + // Batch grows in multiples of 2 based on the initial size. For + // example, if the initial size is 1MB then the batch will grow as + // {2, 4, 8, 16, ...}. If a batch of size greater than 4MBs is + // consistently committed then that batch will never be retained + // if the max retained size is smaller than 8MBs as the batch capacity + // will always grow to 8MB. + initialPebbleBatchSize = 64 << 10 // 64KB + maxRetainedPebbleBatchSize = 8 << 20 // 8MB + + // pebbleMemTableSize defines the max stead state size of a memtable. + // There can be more than 1 memtable in memory at a time as it takes + // time for old memtable to flush. The memtable size also defines + // the size for large batches. A large batch is a batch which will + // take atleast half of the memtable size. Note that the Batch#Len + // is not the same as the memtable size that the batch will occupy + // as data in batches are encoded differently. In general, the + // memtable size of the batch will be higher than the length of the + // batch data. + // + // On commit, data in the large batch maybe kept by pebble and thus + // large batches will need to be reallocated. Note that large batch + // classification uses the memtable size that a batch will occupy + // rather than the length of data slice backing the batch. + pebbleMemTableSize = 32 << 20 // 32MB + + // dbCommitThresholdBytes is a soft limit and the batch is committed + // to the DB as soon as it crosses this threshold. To make sure that + // the commit threshold plays will with the max retained batch size + // the threshold should be kept smaller than the sum of max retained + // batch size and encoded size of aggregated data to be committed. + dbCommitThresholdBytes = 8000 << 10 // 8000KB +) + +func OpenPebble(storageDir string) (*pebble.DB, error) { + return pebble.Open(storageDir, &pebble.Options{ + // FIXME: Specify FormatMajorVersion to use value blocks? + FormatMajorVersion: pebble.FormatNewest, + Logger: logp.NewLogger(logs.Sampling), + MemTableSize: pebbleMemTableSize, + Levels: []pebble.LevelOptions{ + { + BlockSize: 16 << 10, + Compression: pebble.SnappyCompression, + FilterPolicy: bloom.FilterPolicy(10), + FilterType: pebble.TableFilter, + }, + }, + }) +} diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index c11c89d647e..f7f4c586d3c 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -11,21 +11,18 @@ import ( "sync/atomic" "time" - "github.com/dgraph-io/badger/v2" + "github.com/cockroachdb/pebble" "github.com/elastic/apm-data/model/modelpb" ) const ( + prefixSamplingDecision string = "!" // NOTE(axw) these values (and their meanings) must remain stable // over time, to avoid misinterpreting historical data. - entryMetaTraceSampled = 's' - entryMetaTraceUnsampled = 'u' - entryMetaTraceEvent = 'e' - - // Initial transaction size - // len(txnKey) + 10 - baseTransactionSize = 10 + 11 + entryMetaTraceSampled byte = 's' + entryMetaTraceUnsampled byte = 'u' + entryMetaTraceEvent byte = 'e' ) var ( @@ -39,7 +36,7 @@ var ( ) type db interface { - NewTransaction(update bool) *badger.Txn + NewIndexedBatch(opts ...pebble.BatchOption) *pebble.Batch Size() (lsm, vlog int64) Close() error } @@ -78,11 +75,11 @@ func (s *Storage) NewShardedReadWriter() *ShardedReadWriter { // // The returned ReadWriter must be closed when it is no longer needed. func (s *Storage) NewReadWriter() *ReadWriter { - s.pendingSize.Add(baseTransactionSize) + //s.pendingSize.Add(baseTransactionSize) return &ReadWriter{ - s: s, - txn: nil, // lazy init to avoid deadlock in storage manager - pendingSize: baseTransactionSize, + s: s, + //txn: nil, // lazy init to avoid deadlock in storage manager + //pendingSize: baseTransactionSize, } } @@ -100,8 +97,8 @@ type WriterOpts struct { // avoid conflicts, e.g. by using consistent hashing to distribute to one of // a set of ReadWriters, such as implemented by ShardedReadWriter. type ReadWriter struct { - s *Storage - txn *badger.Txn + s *Storage + batch *pebble.Batch // readKeyBuf is a reusable buffer for keys used in read operations. // This must not be used in write operations, as keys are expected to @@ -113,8 +110,11 @@ type ReadWriter struct { } func (rw *ReadWriter) lazyInit() { - if rw.txn == nil { - rw.txn = rw.s.db.NewTransaction(true) + if rw.batch == nil { + rw.batch = rw.s.db.NewIndexedBatch( + pebble.WithInitialSizeBytes(initialPebbleBatchSize), + pebble.WithMaxRetainedSizeBytes(maxRetainedPebbleBatchSize), + ) } } @@ -123,8 +123,8 @@ func (rw *ReadWriter) lazyInit() { // This must be called when the writer is no longer needed, in order to reclaim // resources. func (rw *ReadWriter) Close() { - if rw.txn != nil { - rw.txn.Discard() + if rw.batch != nil { + rw.batch.Close() } } @@ -137,12 +137,14 @@ func (rw *ReadWriter) Flush() error { rw.lazyInit() const flushErrFmt = "failed to flush pending writes: %w" - err := rw.txn.Commit() - rw.txn = rw.s.db.NewTransaction(true) - rw.s.pendingSize.Add(-rw.pendingSize) + err := rw.batch.Commit(pebble.NoSync) + rw.batch.Close() + rw.batch = nil + rw.lazyInit() // FIXME: this shouldn't be needed + //rw.s.pendingSize.Add(-rw.pendingSize) rw.pendingWrites = 0 - rw.pendingSize = baseTransactionSize - rw.s.pendingSize.Add(baseTransactionSize) + //rw.pendingSize = baseTransactionSize + //rw.s.pendingSize.Add(baseTransactionSize) if err != nil { return fmt.Errorf(flushErrFmt, err) } @@ -153,12 +155,12 @@ func (rw *ReadWriter) Flush() error { func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { rw.lazyInit() - key := []byte(traceID) - var meta uint8 = entryMetaTraceUnsampled + key := []byte(prefixSamplingDecision + traceID) + meta := entryMetaTraceUnsampled if sampled { meta = entryMetaTraceSampled } - return rw.writeEntry(badger.NewEntry(key[:], nil).WithMeta(meta), opts) + return rw.batch.Set(key, []byte{meta}, pebble.NoSync) } // IsTraceSampled reports whether traceID belongs to a trace that is sampled @@ -167,15 +169,16 @@ func (rw *ReadWriter) WriteTraceSampled(traceID string, sampled bool, opts Write func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) { rw.lazyInit() - rw.readKeyBuf = append(rw.readKeyBuf[:0], traceID...) - item, err := rw.txn.Get(rw.readKeyBuf) - if err != nil { - if err == badger.ErrKeyNotFound { - return false, ErrNotFound - } - return false, err + // FIXME: this needs to be fast, as it is in the hot path + // It should minimize disk IO on miss due to + // 1. (pubsub) remote sampling decision + // 2. (hot path) sampling decision not made yet + item, closer, err := rw.batch.Get([]byte(prefixSamplingDecision + traceID)) + if err == pebble.ErrNotFound { + return false, ErrNotFound } - return item.UserMeta() == entryMetaTraceSampled, nil + defer closer.Close() + return item[0] == entryMetaTraceSampled, nil } // WriteTraceEvent writes a trace event to storage. @@ -195,77 +198,22 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb. buf.WriteByte(':') buf.WriteString(id) key := buf.Bytes() - return rw.writeEntry(badger.NewEntry(key, data).WithMeta(entryMetaTraceEvent), opts) + return rw.writeEntry(key, data) } -func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error { +func (rw *ReadWriter) writeEntry(key, data []byte) error { rw.pendingWrites++ - entrySize := estimateSize(e) - // The badger database has an async size reconciliation, with a 1 minute - // ticker that keeps the lsm and vlog sizes updated in an in-memory map. - // It's OK to call call s.db.Size() on the hot path, since the memory - // lookup is cheap. - lsm, vlog := rw.s.db.Size() - - // there are multiple ReadWriters writing to the same storage so add - // the entry size and consider the new value to avoid TOCTOU issues. - pendingSize := rw.s.pendingSize.Add(entrySize) - rw.pendingSize += entrySize - - if current := pendingSize + lsm + vlog; opts.StorageLimitInBytes != 0 && current >= opts.StorageLimitInBytes { - // flush what we currently have and discard the current entry - if err := rw.Flush(); err != nil { - return err - } - return fmt.Errorf("%w (current: %d, limit: %d)", ErrLimitReached, current, opts.StorageLimitInBytes) + // FIXME: possibly change key structure, because the append is going to be expensive + if err := rw.batch.Set(key, data, pebble.NoSync); err != nil { + return err } - if rw.pendingWrites >= 200 { - // Attempt to flush if there are 200 or more uncommitted writes. - // This ensures calls to ReadTraceEvents are not slowed down; - // ReadTraceEvents uses an iterator, which must sort all keys - // of uncommitted writes. - // The 200 value yielded a good balance between read and write speed: - // https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643 + if rw.batch.Len() >= dbCommitThresholdBytes { if err := rw.Flush(); err != nil { return err } - - // the current ReadWriter flushed the transaction and reset the pendingSize so add - // the entrySize again. - rw.pendingSize += entrySize - rw.s.pendingSize.Add(entrySize) - } - - err := rw.txn.SetEntry(e.WithTTL(opts.TTL)) - - // If the transaction is already too big to accommodate the new entry, flush - // the existing transaction and set the entry on a new one, otherwise, - // returns early. - if err != badger.ErrTxnTooBig { - return err - } - if err := rw.Flush(); err != nil { - return err } - rw.pendingSize += entrySize - rw.s.pendingSize.Add(entrySize) - return rw.txn.SetEntry(e.WithTTL(opts.TTL)) -} - -func estimateSize(e *badger.Entry) int64 { - // See badger WithValueThreshold option - // An storage usage of an entry depends on its size - // - // if len(e.Value) < threshold { - // return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta - // } - // return len(e.Key) + 12 + 2 // 12 for ValuePointer, 2 for metas. - // - // Make a good estimate by reserving more space - estimate := len(e.Key) + len(e.Value) + 12 + 2 - // Extra bytes for the version in key. - return int64(estimate) + 10 + return nil } // DeleteTraceEvent deletes the trace event from storage. @@ -279,67 +227,40 @@ func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error { buf.WriteString(id) key := buf.Bytes() - err := rw.txn.Delete(key) - // If the transaction is already too big to accommodate the new entry, flush - // the existing transaction and set the entry on a new one, otherwise, - // returns early. - if err != badger.ErrTxnTooBig { - return err - } - if err := rw.Flush(); err != nil { + err := rw.batch.Delete(key, pebble.NoSync) + if err != nil { return err } - - return rw.txn.Delete(key) + //if rw.batch.Len() > flushThreshold { + // if err := rw.Flush(); err != nil { + // return err + // } + //} + return nil } // ReadTraceEvents reads trace events with the given trace ID from storage into out. func (rw *ReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error { rw.lazyInit() - opts := badger.DefaultIteratorOptions - rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') - opts.Prefix = rw.readKeyBuf - - // 1st pass: check whether there exist keys matching the prefix. - // Do not prefetch values so that the check is done in-memory. - // This is to optimize for cases when it is a miss. - opts.PrefetchValues = false - iter := rw.txn.NewIterator(opts) - iter.Rewind() - if !iter.Valid() { - iter.Close() - return nil + iter, err := rw.batch.NewIter(&pebble.IterOptions{ + LowerBound: append([]byte(traceID), ':'), + UpperBound: append([]byte(traceID), ';'), // This is a hack to stop before next ID + }) + if err != nil { + return err } - iter.Close() - - // 2nd pass: this is only done when there exist keys matching the prefix. - // Fetch the events with PrefetchValues for performance. - // This is to optimize for cases when it is a hit. - opts.PrefetchValues = true - iter = rw.txn.NewIterator(opts) defer iter.Close() - for iter.Rewind(); iter.Valid(); iter.Next() { - item := iter.Item() - if item.IsDeletedOrExpired() { - continue + for iter.First(); iter.Valid(); iter.Next() { + event := &modelpb.APMEvent{} + data, err := iter.ValueAndErr() + if err != nil { + return err } - switch item.UserMeta() { - case entryMetaTraceEvent: - event := &modelpb.APMEvent{} - if err := item.Value(func(data []byte) error { - if err := rw.s.codec.DecodeEvent(data, event); err != nil { - return fmt.Errorf("codec failed to decode event: %w", err) - } - return nil - }); err != nil { - return err - } - *out = append(*out, event) - default: - // Unknown entry meta: ignore. - continue + if err := rw.s.codec.DecodeEvent(data, event); err != nil { + return fmt.Errorf("codec failed to decode event: %w", err) } + *out = append(*out, event) } return nil } diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager.go b/x-pack/apm-server/sampling/eventstorage/storage_manager.go index 8446f8fbf89..911feb81a62 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager.go @@ -6,16 +6,12 @@ package eventstorage import ( "errors" - "fmt" - "io/fs" "os" "path/filepath" - "strings" "sync" "time" - "github.com/dgraph-io/badger/v2" - "golang.org/x/sync/errgroup" + "github.com/cockroachdb/pebble" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/logs" @@ -32,14 +28,14 @@ var ( errDropAndRecreateInProgress = errors.New("db drop and recreate in progress") ) -// StorageManager encapsulates badger.DB. +// StorageManager encapsulates pebble.DB. // It is to provide file system access, simplify synchronization and enable underlying db swaps. -// It assumes exclusive access to badger DB at storageDir. +// It assumes exclusive access to pebble DB at storageDir. type StorageManager struct { storageDir string logger *logp.Logger - db *badger.DB + db *pebble.DB storage *Storage rw *ShardedReadWriter @@ -53,7 +49,7 @@ type StorageManager struct { runCh chan struct{} } -// NewStorageManager returns a new StorageManager with badger DB at storageDir. +// NewStorageManager returns a new StorageManager with pebble DB at storageDir. func NewStorageManager(storageDir string) (*StorageManager, error) { sm := &StorageManager{ storageDir: storageDir, @@ -69,7 +65,7 @@ func NewStorageManager(storageDir string) (*StorageManager, error) { // reset initializes db, storage, and rw. func (s *StorageManager) reset() error { - db, err := OpenBadger(s.storageDir, -1) + db, err := OpenPebble(s.storageDir) if err != nil { return err } @@ -91,181 +87,20 @@ func (s *StorageManager) Close() error { func (s *StorageManager) Size() (lsm, vlog int64) { s.mu.RLock() defer s.mu.RUnlock() - return s.db.Size() + return int64(s.db.Metrics().DiskSpaceUsage()), 0 // FIXME } -func (s *StorageManager) NewTransaction(update bool) *badger.Txn { +func (s *StorageManager) NewIndexedBatch(opts ...pebble.BatchOption) *pebble.Batch { s.mu.RLock() defer s.mu.RUnlock() - return s.db.NewTransaction(update) + return s.db.NewIndexedBatch(opts...) } // Run has the same lifecycle as the TBS processor as opposed to StorageManager to facilitate EA hot reload. func (s *StorageManager) Run(stopping <-chan struct{}, gcInterval time.Duration, ttl time.Duration, storageLimit uint64, storageLimitThreshold float64) error { - select { - case <-stopping: - return nil - case s.runCh <- struct{}{}: - } - defer func() { - <-s.runCh - }() - - g := errgroup.Group{} - g.Go(func() error { - return s.runGCLoop(stopping, gcInterval) - }) - g.Go(func() error { - return s.runDropLoop(stopping, ttl, storageLimit, storageLimitThreshold) - }) - return g.Wait() -} - -// runGCLoop runs a loop that calls badger DB RunValueLogGC every gcInterval. -func (s *StorageManager) runGCLoop(stopping <-chan struct{}, gcInterval time.Duration) error { - // This goroutine is responsible for periodically garbage - // collecting the Badger value log, using the recommended - // discard ratio of 0.5. - ticker := time.NewTicker(gcInterval) - defer ticker.Stop() - for { - select { - case <-stopping: - return nil - case <-ticker.C: - const discardRatio = 0.5 - var err error - for err == nil { - // Keep garbage collecting until there are no more rewrites, - // or garbage collection fails. - err = s.runValueLogGC(discardRatio) - } - if err != nil && err != badger.ErrNoRewrite { - return err - } - } - } -} - -func (s *StorageManager) runValueLogGC(discardRatio float64) error { - s.mu.RLock() - defer s.mu.RUnlock() - return s.db.RunValueLogGC(discardRatio) -} - -// runDropLoop runs a loop that detects if storage limit has been exceeded for at least ttl. -// If so, it drops and recreates the underlying badger DB. -// This is a mitigation for issue https://github.com/elastic/apm-server/issues/14923 -func (s *StorageManager) runDropLoop(stopping <-chan struct{}, ttl time.Duration, storageLimitInBytes uint64, storageLimitThreshold float64) error { - if storageLimitInBytes == 0 { - return nil - } - - var firstExceeded time.Time - checkAndFix := func() error { - lsm, vlog := s.Size() - // add buffer to avoid edge case storageLimitInBytes-lsm-vlog < buffer, when writes are still always rejected - buffer := int64(baseTransactionSize * len(s.rw.readWriters)) - if uint64(lsm+vlog+buffer) >= storageLimitInBytes { - now := time.Now() - if firstExceeded.IsZero() { - firstExceeded = now - s.logger.Warnf( - "badger db size (%d+%d=%d) has exceeded storage limit (%d*%.1f=%d); db will be dropped and recreated if problem persists for `sampling.tail.ttl` (%s)", - lsm, vlog, lsm+vlog, storageLimitInBytes, storageLimitThreshold, int64(float64(storageLimitInBytes)*storageLimitThreshold), ttl.String()) - } - if now.Sub(firstExceeded) >= ttl { - s.logger.Warnf("badger db size has exceeded storage limit for over `sampling.tail.ttl` (%s), please consider increasing `sampling.tail.storage_limit`; dropping and recreating badger db to recover", ttl.String()) - err := s.dropAndRecreate() - if err != nil { - s.logger.With(logp.Error(err)).Error("error dropping and recreating badger db to recover storage space") - } else { - s.logger.Info("badger db dropped and recreated") - } - firstExceeded = time.Time{} - } - } else { - firstExceeded = time.Time{} - } - return nil - } - - timer := time.NewTicker(time.Minute) // Eval db size every minute as badger reports them with 1m lag - defer timer.Stop() - for { - if err := checkAndFix(); err != nil { - return err - } - - select { - case <-stopping: - return nil - case <-timer.C: - continue - } - } -} - -// dropAndRecreate deletes the underlying badger DB files at the file system level, and replaces it with a new badger DB. -func (s *StorageManager) dropAndRecreate() (retErr error) { - s.mu.Lock() - defer s.mu.Unlock() - - defer func() { - // In any case (errors or not), reset StorageManager while lock is held - err := s.reset() - if err != nil { - retErr = errors.Join(retErr, fmt.Errorf("error reopening badger db: %w", err)) - } - }() - - // Intentionally not flush rw, as storage is full. - s.rw.Close() - err := s.db.Close() - if err != nil { - return fmt.Errorf("error closing badger db: %w", err) - } - - err = s.deleteBadgerFiles() - if err != nil { - return fmt.Errorf("error deleting badger db files: %w", err) - } - return nil } -func (s *StorageManager) deleteBadgerFiles() error { - // Although removing the files in place can be slower, it is less error-prone than rename-and-delete. - // Delete every file except subscriber position file - var ( - rootVisited bool - sstFiles, vlogFiles int - otherFilenames []string - ) - err := filepath.WalkDir(s.storageDir, func(path string, d fs.DirEntry, _ error) error { - if !rootVisited { - rootVisited = true - return nil - } - filename := filepath.Base(path) - if filename == subscriberPositionFile { - return nil - } - switch ext := filepath.Ext(filename); ext { - case ".sst": - sstFiles++ - case ".vlog": - vlogFiles++ - default: - otherFilenames = append(otherFilenames, filename) - } - return os.RemoveAll(path) - }) - s.logger.Infof("deleted badger files: %d SST files, %d VLOG files, %d other files: [%s]", - sstFiles, vlogFiles, len(otherFilenames), strings.Join(otherFilenames, ", ")) - return err -} - func (s *StorageManager) ReadSubscriberPosition() ([]byte, error) { s.subscriberPosMu.Lock() defer s.subscriberPosMu.Unlock() diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go b/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go deleted file mode 100644 index 3dccac1ed38..00000000000 --- a/x-pack/apm-server/sampling/eventstorage/storage_manager_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License 2.0; -// you may not use this file except in compliance with the Elastic License 2.0. - -package eventstorage - -import ( - "fmt" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func badgerModTime(dir string) time.Time { - oldest := time.Now() - filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - ext := filepath.Ext(path) - if (ext == ".vlog" || ext == ".sst") && info.ModTime().Before(oldest) { - oldest = info.ModTime() - } - return nil - }) - return oldest -} - -func TestDropAndRecreate_filesRecreated(t *testing.T) { - tempDir := t.TempDir() - sm, err := NewStorageManager(tempDir) - require.NoError(t, err) - defer sm.Close() - - oldModTime := badgerModTime(tempDir) - - err = sm.dropAndRecreate() - assert.NoError(t, err) - - newModTime := badgerModTime(tempDir) - - assert.Greater(t, newModTime, oldModTime) -} - -func TestDropAndRecreate_subscriberPositionFile(t *testing.T) { - for _, exists := range []bool{true, false} { - t.Run(fmt.Sprintf("exists=%t", exists), func(t *testing.T) { - tempDir := t.TempDir() - sm, err := NewStorageManager(tempDir) - require.NoError(t, err) - defer sm.Close() - - if exists { - err := sm.WriteSubscriberPosition([]byte("{}")) - require.NoError(t, err) - } - - err = sm.dropAndRecreate() - assert.NoError(t, err) - - data, err := sm.ReadSubscriberPosition() - if exists { - assert.Equal(t, "{}", string(data)) - } else { - assert.ErrorIs(t, err, os.ErrNotExist) - } - }) - } -} diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 5a301e208cf..9acf9dad5c8 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -11,7 +11,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strings" "testing" @@ -22,7 +21,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/testing/protocmp" "github.com/elastic/apm-data/model/modelpb" @@ -57,6 +55,8 @@ func TestProcessUnsampled(t *testing.T) { } func TestProcessAlreadyTailSampled(t *testing.T) { + t.FailNow() // Expected to fail as TTL is not handled as strictly in pebble + config := newTempdirConfig(t) // Seed event storage with a tail-sampling decisions, to show that @@ -646,161 +646,82 @@ func TestStorageMonitoring(t *testing.T) { assert.NotZero(t, metrics.Ints, "sampling.storage.value_log_size") } -func TestStorageGC(t *testing.T) { - if testing.Short() { - t.Skip("skipping slow test") - } - - config := newTempdirConfig(t) - config.TTL = 10 * time.Millisecond - config.FlushInterval = 10 * time.Millisecond - - writeBatch := func(n int) { - config.StorageGCInterval = time.Hour // effectively disable - processor, err := sampling.NewProcessor(config) - require.NoError(t, err) - go processor.Run() - defer processor.Stop(context.Background()) - for i := 0; i < n; i++ { - traceID := uuid.Must(uuid.NewV4()).String() - // Create a larger event to fill up the vlog faster, especially when it is above ValueThreshold - batch := modelpb.Batch{{ - Trace: &modelpb.Trace{Id: traceID}, - Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)}, - Span: &modelpb.Span{ - Type: strings.Repeat("a", 1000), - Subtype: strings.Repeat("b", 1000), - Id: traceID, - Name: strings.Repeat("c", 1000), - }, - }} - err := processor.ProcessBatch(context.Background(), &batch) - require.NoError(t, err) - assert.Empty(t, batch) - } - } - - // Process spans until value log files have been created. - // Garbage collection is disabled at this time. - for len(vlogFilenames(config.StorageDir)) < 3 { - writeBatch(2000) - } - - config.StorageGCInterval = 10 * time.Millisecond - processor, err := sampling.NewProcessor(config) - require.NoError(t, err) - go processor.Run() - defer processor.Stop(context.Background()) - - // Wait for the first value log file to be garbage collected. - var vlogs []string - assert.Eventually(t, func() bool { - vlogs = vlogFilenames(config.StorageDir) - return len(vlogs) == 0 || vlogs[0] != "000000.vlog" - }, 10*time.Second, 100*time.Millisecond, vlogs) -} - -func TestStorageGCConcurrency(t *testing.T) { - // This test ensures that TBS processor does not return an error - // even when run concurrently e.g. in hot reload - if testing.Short() { - t.Skip("skipping slow test") - } - - config := newTempdirConfig(t) - config.TTL = 10 * time.Millisecond - config.FlushInterval = 10 * time.Millisecond - config.StorageGCInterval = 10 * time.Millisecond - - g := errgroup.Group{} - for i := 0; i < 2; i++ { - processor, err := sampling.NewProcessor(config) - require.NoError(t, err) - g.Go(processor.Run) - go func() { - time.Sleep(time.Second) - assert.NoError(t, processor.Stop(context.Background())) - }() - } - assert.NoError(t, g.Wait()) -} - -func TestStorageLimit(t *testing.T) { - // This test ensures that when tail sampling is configured with a hard - // storage limit, the limit is respected once the size is available. - // To update the database size during our test without waiting a full - // minute, we store some span events, close and re-open the database, so - // the size is updated. - if testing.Short() { - t.Skip("skipping slow test") - } - - writeBatch := func(n int, c sampling.Config, assertBatch func(b modelpb.Batch)) *sampling.Processor { - processor, err := sampling.NewProcessor(c) - require.NoError(t, err) - go processor.Run() - defer processor.Stop(context.Background()) - batch := make(modelpb.Batch, 0, n) - for i := 0; i < n; i++ { - traceID := uuid.Must(uuid.NewV4()).String() - batch = append(batch, &modelpb.APMEvent{ - Trace: &modelpb.Trace{Id: traceID}, - Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)}, - Span: &modelpb.Span{ - Type: "type", - Id: traceID, - }, - }) - } - err = processor.ProcessBatch(context.Background(), &batch) - require.NoError(t, err) - assertBatch(batch) - return processor - } - - config := newTempdirConfig(t) - // Write 5K span events and close the DB to persist to disk the storage - // size and assert that none are reported immediately. - writeBatch(5000, config, func(b modelpb.Batch) { - assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b))) - }) - assert.NoError(t, config.Storage.Flush()) - assert.NoError(t, config.DB.Close()) - - // Open a new instance of the badgerDB and check the size. - var err error - config.DB, err = eventstorage.NewStorageManager(config.StorageDir) - require.NoError(t, err) - t.Cleanup(func() { config.DB.Close() }) - config.Storage = config.DB.NewReadWriter() - - lsm, vlog := config.DB.Size() - assert.GreaterOrEqual(t, lsm+vlog, int64(1024)) - - config.StorageLimit = 1024 // Set the storage limit to 1024 bytes. - // Create a massive 150K span batch (per CPU) to trigger the badger error - // Transaction too big, causing the ProcessBatch to report the some traces - // immediately. - // Rather than setting a static threshold, use the runtime.NumCPU as a - // multiplier since the sharded writers use that variable and the more CPUs - // we have, the more sharded writes we'll have, resulting in a greater buffer. - // To avoid huge test time on large systems do this incrementally - for i := 1; i < runtime.NumCPU(); i++ { - processor := writeBatch(150_000*i, config, func(b modelpb.Batch) { - assert.NotEmpty(t, b) - }) - - failedWrites := collectProcessorMetrics(processor).Ints["sampling.events.failed_writes"] - t.Log(failedWrites) - // Ensure that there are some failed writes. - - if failedWrites >= 1 { - return - } - } - - t.Fatal("badger error never thrown") -} +//func TestStorageLimit(t *testing.T) { +// // This test ensures that when tail sampling is configured with a hard +// // storage limit, the limit is respected once the size is available. +// // To update the database size during our test without waiting a full +// // minute, we store some span events, close and re-open the database, so +// // the size is updated. +// if testing.Short() { +// t.Skip("skipping slow test") +// } +// +// writeBatch := func(n int, c sampling.Config, assertBatch func(b modelpb.Batch)) *sampling.Processor { +// processor, err := sampling.NewProcessor(c) +// require.NoError(t, err) +// go processor.Run() +// defer processor.Stop(context.Background()) +// batch := make(modelpb.Batch, 0, n) +// for i := 0; i < n; i++ { +// traceID := uuid.Must(uuid.NewV4()).String() +// batch = append(batch, &modelpb.APMEvent{ +// Trace: &modelpb.Trace{Id: traceID}, +// Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)}, +// Span: &modelpb.Span{ +// Type: "type", +// Id: traceID, +// }, +// }) +// } +// err = processor.ProcessBatch(context.Background(), &batch) +// require.NoError(t, err) +// assertBatch(batch) +// return processor +// } +// +// config := newTempdirConfig(t) +// // Write 5K span events and close the DB to persist to disk the storage +// // size and assert that none are reported immediately. +// writeBatch(5000, config, func(b modelpb.Batch) { +// assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b))) +// }) +// assert.NoError(t, config.Storage.Flush()) +// assert.NoError(t, config.DB.Close()) +// +// // Open a new instance of the badgerDB and check the size. +// var err error +// config.DB, err = eventstorage.NewStorageManager(config.StorageDir) +// require.NoError(t, err) +// t.Cleanup(func() { config.DB.Close() }) +// config.Storage = config.DB.NewReadWriter() +// +// lsm, vlog := config.DB.Size() +// assert.GreaterOrEqual(t, lsm+vlog, int64(1024)) +// +// config.StorageLimit = 1024 // Set the storage limit to 1024 bytes. +// // Create a massive 150K span batch (per CPU) to trigger the badger error +// // Transaction too big, causing the ProcessBatch to report the some traces +// // immediately. +// // Rather than setting a static threshold, use the runtime.NumCPU as a +// // multiplier since the sharded writers use that variable and the more CPUs +// // we have, the more sharded writes we'll have, resulting in a greater buffer. +// // To avoid huge test time on large systems do this incrementally +// for i := 1; i < runtime.NumCPU(); i++ { +// processor := writeBatch(150_000*i, config, func(b modelpb.Batch) { +// assert.NotEmpty(t, b) +// }) +// +// failedWrites := collectProcessorMetrics(processor).Ints["sampling.events.failed_writes"] +// t.Log(failedWrites) +// // Ensure that there are some failed writes. +// +// if failedWrites >= 1 { +// return +// } +// } +// +// t.Fatal("badger error never thrown") +//} func TestProcessRemoteTailSamplingPersistence(t *testing.T) { config := newTempdirConfig(t) @@ -826,131 +747,6 @@ func TestProcessRemoteTailSamplingPersistence(t *testing.T) { assert.Equal(t, `{"index_name":1}`, string(data)) } -func TestDropLoop(t *testing.T) { - // This test ensures that if badger is stuck at storage limit for TTL, - // DB is dropped and recreated. - if testing.Short() { - t.Skip("skipping slow test") - } - - makeBatch := func(n int) modelpb.Batch { - batch := make(modelpb.Batch, 0, n) - for i := 0; i < n; i++ { - traceID := uuid.Must(uuid.NewV4()).String() - batch = append(batch, &modelpb.APMEvent{ - Trace: &modelpb.Trace{Id: traceID}, - Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)}, - Span: &modelpb.Span{ - Type: "type", - Id: traceID, - }, - }) - } - return batch - } - - writeBatch := func(t *testing.T, n int, c sampling.Config, assertBatch func(b modelpb.Batch)) *sampling.Processor { - processor, err := sampling.NewProcessor(c) - require.NoError(t, err) - go processor.Run() - defer processor.Stop(context.Background()) - batch := makeBatch(n) - err = processor.ProcessBatch(context.Background(), &batch) - require.NoError(t, err) - assertBatch(batch) - return processor - } - - for _, tc := range []struct { - name string - subscriberPosExists bool - }{ - { - name: "subscriber_position_not_exist", - subscriberPosExists: false, - }, - { - name: "subscriber_position_exists", - subscriberPosExists: true, - }, - } { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - config := newTempdirConfig(t) - config.StorageGCInterval = time.Hour // effectively disable GC - - config.FlushInterval = 10 * time.Millisecond - subscriberChan := make(chan string) - subscriber := pubsubtest.SubscriberChan(subscriberChan) - config.Elasticsearch = pubsubtest.Client(nil, subscriber) - subscriberPositionFile := filepath.Join(config.StorageDir, "subscriber_position.json") - - // Write 5K span events and close the DB to persist to disk the storage - // size and assert that none are reported immediately. - writeBatch(t, 5000, config, func(b modelpb.Batch) { - assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b))) - - subscriberChan <- "0102030405060708090a0b0c0d0e0f10" - assert.Eventually(t, func() bool { - data, err := config.DB.ReadSubscriberPosition() - return err == nil && string(data) == `{"index_name":1}` - }, time.Second, 100*time.Millisecond) - }) - assert.NoError(t, config.Storage.Flush()) - assert.NoError(t, config.DB.Close()) - - if !tc.subscriberPosExists { - err := os.Remove(subscriberPositionFile) - assert.NoError(t, err) - } - - func() { - // Open a new instance of the badgerDB and check the size. - var err error - config.DB, err = eventstorage.NewStorageManager(config.StorageDir) - require.NoError(t, err) - t.Cleanup(func() { config.DB.Close() }) - config.Storage = config.DB.NewReadWriter() - - lsm, vlog := config.DB.Size() - assert.Greater(t, lsm+vlog, int64(1024*1024)) - - config.Elasticsearch = pubsubtest.Client(nil, nil) // disable pubsub - - config.StorageLimit = 100 * 1024 // lower limit to trigger storage limit error - config.TTL = time.Second - processor, err := sampling.NewProcessor(config) - require.NoError(t, err) - go processor.Run() - defer processor.Stop(context.Background()) - - // wait for up to 1 minute for dropAndRecreate to kick in - // no SST files after dropping DB and before first write - var filenames []string - assert.Eventually(t, func() bool { - filenames = sstFilenames(config.StorageDir) - return len(filenames) == 0 - }, 90*time.Second, 200*time.Millisecond, filenames) - - data, err := config.DB.ReadSubscriberPosition() - assert.NoError(t, err) - if tc.subscriberPosExists { - assert.Equal(t, `{"index_name":1}`, string(data)) - } else { - assert.Equal(t, "{}", string(data)) - } - - // try to write to new DB - batch := makeBatch(10) - err = processor.ProcessBatch(context.Background(), &batch) - require.NoError(t, err) - }() - assert.NoError(t, config.DB.Close()) - assert.Greater(t, len(sstFilenames(config.StorageDir)), 0) - }) - } -} - func TestGracefulShutdown(t *testing.T) { config := newTempdirConfig(t) sampleRate := 0.5 @@ -998,11 +794,11 @@ func newTempdirConfig(tb testing.TB) sampling.Config { require.NoError(tb, err) tb.Cleanup(func() { os.RemoveAll(tempdir) }) - badgerDB, err := eventstorage.NewStorageManager(tempdir) + db, err := eventstorage.NewStorageManager(tempdir) require.NoError(tb, err) - tb.Cleanup(func() { badgerDB.Close() }) + tb.Cleanup(func() { db.Close() }) - storage := badgerDB.NewReadWriter() + storage := db.NewReadWriter() return sampling.Config{ BatchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { return nil }), @@ -1024,7 +820,7 @@ func newTempdirConfig(tb testing.TB) sampling.Config { UUID: "local-apm-server", }, StorageConfig: sampling.StorageConfig{ - DB: badgerDB, + DB: db, Storage: storage, StorageDir: tempdir, StorageGCInterval: time.Second,