From 5a3dd8dc71c05a26a142589629d046f5d0f56c42 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 3 Jan 2025 19:51:30 +0000 Subject: [PATCH] TBS: refactor to encapsulate badger DB (#15112) Introduce StorageManager to encapsulate badger DB access. There is a minor difference in how gc mutex and subscriber position file mutex are no longer global variables, but they are per StorageManager, but there should be no apm-server behavior change, only possibly subtle difference in concurrent testing where there are 2 storage managers. apm-server has only 1 global storage manager created by x-pack main. --- x-pack/apm-server/main.go | 29 +-- x-pack/apm-server/sampling/config.go | 5 +- x-pack/apm-server/sampling/config_test.go | 5 +- .../sampling/eventstorage/storage_manager.go | 173 ++++++++++++++++++ x-pack/apm-server/sampling/processor.go | 91 +++------ x-pack/apm-server/sampling/processor_test.go | 66 +++---- 6 files changed, 234 insertions(+), 135 deletions(-) create mode 100644 x-pack/apm-server/sampling/eventstorage/storage_manager.go diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index caab7b061e2..1bf6f94c9b7 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -11,7 +11,6 @@ import ( "os" "sync" - "github.com/dgraph-io/badger/v2" "github.com/gofrs/uuid/v5" "golang.org/x/sync/errgroup" @@ -43,10 +42,10 @@ var ( // badgerDB holds the badger database to use when tail-based sampling is configured. badgerMu sync.Mutex - badgerDB *badger.DB + badgerDB *eventstorage.StorageManager storageMu sync.Mutex - storage *eventstorage.ShardedReadWriter + storage *eventstorage.ManagedReadWriter // samplerUUID is a UUID used to identify sampled trace ID documents // published by this process. @@ -122,7 +121,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er if err != nil { return nil, fmt.Errorf("failed to get Badger database: %w", err) } - readWriters := getStorage(badgerDB) + readWriter := getStorage(badgerDB) policies := make([]sampling.Policy, len(tailSamplingConfig.Policies)) for i, in := range tailSamplingConfig.Policies { @@ -157,7 +156,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }, StorageConfig: sampling.StorageConfig{ DB: badgerDB, - Storage: readWriters, + Storage: readWriter, StorageDir: storageDir, StorageGCInterval: tailSamplingConfig.StorageGCInterval, StorageLimit: tailSamplingConfig.StorageLimitParsed, @@ -166,25 +165,24 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er }) } -func getBadgerDB(storageDir string) (*badger.DB, error) { +func getBadgerDB(storageDir string) (*eventstorage.StorageManager, error) { badgerMu.Lock() defer badgerMu.Unlock() if badgerDB == nil { - db, err := eventstorage.OpenBadger(storageDir, -1) + sm, err := eventstorage.NewStorageManager(storageDir) if err != nil { return nil, err } - badgerDB = db + badgerDB = sm } return badgerDB, nil } -func getStorage(db *badger.DB) *eventstorage.ShardedReadWriter { +func getStorage(sm *eventstorage.StorageManager) *eventstorage.ManagedReadWriter { storageMu.Lock() defer storageMu.Unlock() if storage == nil { - eventCodec := eventstorage.ProtobufCodec{} - storage = eventstorage.New(db, eventCodec).NewShardedReadWriter() + storage = sm.NewReadWriter() } return storage } @@ -261,16 +259,7 @@ func closeBadger() error { return nil } -func closeStorage() { - if storage != nil { - storage.Close() - } -} - func cleanup() error { - // Close the underlying storage, the storage will be flushed on processor stop. - closeStorage() - return closeBadger() } diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go index 43d5c34d96e..24aac06ad9a 100644 --- a/x-pack/apm-server/sampling/config.go +++ b/x-pack/apm-server/sampling/config.go @@ -7,7 +7,6 @@ package sampling import ( "time" - "github.com/dgraph-io/badger/v2" "github.com/pkg/errors" "github.com/elastic/apm-data/model/modelpb" @@ -99,13 +98,13 @@ type StorageConfig struct { // DB holds the badger database in which event storage will be maintained. // // DB will not be closed when the processor is closed. - DB *badger.DB + DB *eventstorage.StorageManager // Storage holds the read writers which provide sharded, locked access to storage. // // Storage lives outside processor lifecycle and will not be closed when processor // is closed - Storage *eventstorage.ShardedReadWriter + Storage *eventstorage.ManagedReadWriter // StorageDir holds the directory in which event storage will be maintained. StorageDir string diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go index f926711be7d..697cafec588 100644 --- a/x-pack/apm-server/sampling/config_test.go +++ b/x-pack/apm-server/sampling/config_test.go @@ -7,7 +7,6 @@ package sampling_test import ( "testing" - "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -72,10 +71,10 @@ func TestNewProcessorConfigInvalid(t *testing.T) { config.UUID = "server" assertInvalidConfigError("invalid storage config: DB unspecified") - config.DB = &badger.DB{} + config.DB = &eventstorage.StorageManager{} assertInvalidConfigError("invalid storage config: Storage unspecified") - config.Storage = &eventstorage.ShardedReadWriter{} + config.Storage = &eventstorage.ManagedReadWriter{} assertInvalidConfigError("invalid storage config: StorageDir unspecified") config.StorageDir = "tbs" diff --git a/x-pack/apm-server/sampling/eventstorage/storage_manager.go b/x-pack/apm-server/sampling/eventstorage/storage_manager.go new file mode 100644 index 00000000000..cf4ddbe4a23 --- /dev/null +++ b/x-pack/apm-server/sampling/eventstorage/storage_manager.go @@ -0,0 +1,173 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package eventstorage + +import ( + "os" + "path/filepath" + "sync" + "time" + + "github.com/dgraph-io/badger/v2" + + "github.com/elastic/apm-data/model/modelpb" + "github.com/elastic/apm-server/internal/logs" + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + // subscriberPositionFile holds the file name used for persisting + // the subscriber position across server restarts. + subscriberPositionFile = "subscriber_position.json" +) + +// StorageManager encapsulates badger.DB. +// It is to provide file system access, simplify synchronization and enable underlying db swaps. +// It assumes exclusive access to badger DB at storageDir. +type StorageManager struct { + storageDir string + logger *logp.Logger + + db *badger.DB + storage *Storage + rw *ShardedReadWriter + + // subscriberPosMu protects the subscriber file from concurrent RW. + subscriberPosMu sync.Mutex + + // gcLoopCh acts as a mutex to ensure only 1 gc loop is running per StorageManager. + // as it is possible that 2 separate RunGCLoop are created by 2 TBS processors during a hot reload. + gcLoopCh chan struct{} +} + +// NewStorageManager returns a new StorageManager with badger DB at storageDir. +func NewStorageManager(storageDir string) (*StorageManager, error) { + sm := &StorageManager{ + storageDir: storageDir, + gcLoopCh: make(chan struct{}, 1), + logger: logp.NewLogger(logs.Sampling), + } + err := sm.reset() + if err != nil { + return nil, err + } + return sm, nil +} + +// reset initializes db, storage, and rw. +func (s *StorageManager) reset() error { + db, err := OpenBadger(s.storageDir, -1) + if err != nil { + return err + } + s.db = db + s.storage = New(db, ProtobufCodec{}) + s.rw = s.storage.NewShardedReadWriter() + return nil +} + +// RunGCLoop runs a loop that calls badger DB RunValueLogGC every gcInterval. +// The loop stops when it receives from stopping. +func (s *StorageManager) RunGCLoop(stopping <-chan struct{}, gcInterval time.Duration) error { + select { + case <-stopping: + return nil + case s.gcLoopCh <- struct{}{}: + } + defer func() { + <-s.gcLoopCh + }() + // This goroutine is responsible for periodically garbage + // collecting the Badger value log, using the recommended + // discard ratio of 0.5. + ticker := time.NewTicker(gcInterval) + defer ticker.Stop() + for { + select { + case <-stopping: + return nil + case <-ticker.C: + const discardRatio = 0.5 + var err error + for err == nil { + // Keep garbage collecting until there are no more rewrites, + // or garbage collection fails. + err = s.runValueLogGC(discardRatio) + } + if err != nil && err != badger.ErrNoRewrite { + return err + } + } + } +} + +func (s *StorageManager) runValueLogGC(discardRatio float64) error { + return s.db.RunValueLogGC(discardRatio) +} + +func (s *StorageManager) Close() error { + s.rw.Close() + return s.db.Close() +} + +// Size returns the db size +func (s *StorageManager) Size() (lsm, vlog int64) { + return s.db.Size() +} + +func (s *StorageManager) ReadSubscriberPosition() ([]byte, error) { + s.subscriberPosMu.Lock() + defer s.subscriberPosMu.Unlock() + return os.ReadFile(filepath.Join(s.storageDir, subscriberPositionFile)) +} + +func (s *StorageManager) WriteSubscriberPosition(data []byte) error { + s.subscriberPosMu.Lock() + defer s.subscriberPosMu.Unlock() + return os.WriteFile(filepath.Join(s.storageDir, subscriberPositionFile), data, 0644) +} + +func (s *StorageManager) NewReadWriter() *ManagedReadWriter { + return &ManagedReadWriter{ + sm: s, + } +} + +// ManagedReadWriter is a read writer that is transparent to badger DB changes done by StorageManager. +// It is a wrapper of the ShardedReadWriter under StorageManager. +type ManagedReadWriter struct { + sm *StorageManager +} + +func (s *ManagedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error { + return s.sm.rw.ReadTraceEvents(traceID, out) +} + +func (s *ManagedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts WriterOpts) error { + return s.sm.rw.WriteTraceEvent(traceID, id, event, opts) +} + +func (s *ManagedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error { + return s.sm.rw.WriteTraceSampled(traceID, sampled, opts) +} + +func (s *ManagedReadWriter) IsTraceSampled(traceID string) (bool, error) { + return s.sm.rw.IsTraceSampled(traceID) +} + +func (s *ManagedReadWriter) DeleteTraceEvent(traceID, id string) error { + return s.sm.rw.DeleteTraceEvent(traceID, id) +} + +func (s *ManagedReadWriter) Flush() error { + return s.sm.rw.Flush() +} + +// NewBypassReadWriter returns a ReadWriter directly reading and writing to the database, +// bypassing any wrapper e.g. ShardedReadWriter. +// This should be used for testing only, useful to check if data is actually persisted to the DB. +func (s *StorageManager) NewBypassReadWriter() *ReadWriter { + return s.storage.NewReadWriter() +} diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 9402408b299..f414673f8f6 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -9,12 +9,10 @@ import ( "encoding/json" "fmt" "os" - "path/filepath" "sync" "sync/atomic" "time" - "github.com/dgraph-io/badger/v2" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -27,10 +25,6 @@ import ( ) const ( - // subscriberPositionFile holds the file name used for persisting - // the subscriber position across server restarts. - subscriberPositionFile = "subscriber_position.json" - // loggerRateLimit is the maximum frequency at which "too many groups" and // "write failure" log messages are logged. loggerRateLimit = time.Minute @@ -40,11 +34,6 @@ const ( shutdownGracePeriod = 5 * time.Second ) -var ( - // gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload - gcCh = make(chan struct{}, 1) -) - // Processor is a tail-sampling event processor. type Processor struct { config Config @@ -343,7 +332,7 @@ func (p *Processor) Run() error { bulkIndexerFlushInterval = p.config.FlushInterval } - initialSubscriberPosition, err := readSubscriberPosition(p.logger, p.config.StorageDir) + initialSubscriberPosition, err := readSubscriberPosition(p.logger, p.config.DB) if err != nil { return err } @@ -382,7 +371,7 @@ func (p *Processor) Run() error { time.AfterFunc(shutdownGracePeriod, cancelGracefulContext) return context.Canceled case pos := <-subscriberPositions: - if err := writeSubscriberPosition(p.config.StorageDir, pos); err != nil { + if err := writeSubscriberPosition(p.config.DB, pos); err != nil { p.rateLimitedLogger.With(logp.Error(err)).With(logp.Reflect("position", pos)).Warn( "failed to write subscriber position: %s", err, ) @@ -391,38 +380,7 @@ func (p *Processor) Run() error { } }) g.Go(func() error { - // Protect this goroutine from running concurrently when 2 TBS processors are active - // as badger GC is not concurrent safe. - select { - case <-p.stopping: - return nil - case gcCh <- struct{}{}: - } - defer func() { - <-gcCh - }() - // This goroutine is responsible for periodically garbage - // collecting the Badger value log, using the recommended - // discard ratio of 0.5. - ticker := time.NewTicker(p.config.StorageGCInterval) - defer ticker.Stop() - for { - select { - case <-p.stopping: - return nil - case <-ticker.C: - const discardRatio = 0.5 - var err error - for err == nil { - // Keep garbage collecting until there are no more rewrites, - // or garbage collection fails. - err = p.config.DB.RunValueLogGC(discardRatio) - } - if err != nil && err != badger.ErrNoRewrite { - return err - } - } - } + return p.config.DB.RunGCLoop(p.stopping, p.config.StorageGCInterval) }) g.Go(func() error { // Subscribe to remotely sampled trace IDs. This is cancelled immediately when @@ -575,15 +533,9 @@ func (p *Processor) Run() error { return nil } -// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload. -var subscriberPositionFileMutex sync.Mutex - -func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) { - subscriberPositionFileMutex.Lock() - defer subscriberPositionFileMutex.Unlock() - +func readSubscriberPosition(logger *logp.Logger, s *eventstorage.StorageManager) (pubsub.SubscriberPosition, error) { var pos pubsub.SubscriberPosition - data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile)) + data, err := s.ReadSubscriberPosition() if errors.Is(err, os.ErrNotExist) { return pos, nil } else if err != nil { @@ -597,15 +549,13 @@ func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.Subs return pos, nil } -func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) error { +func writeSubscriberPosition(s *eventstorage.StorageManager, pos pubsub.SubscriberPosition) error { data, err := json.Marshal(pos) if err != nil { return err } - subscriberPositionFileMutex.Lock() - defer subscriberPositionFileMutex.Unlock() - return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644) + return s.WriteSubscriberPosition(data) } func sendTraceIDs(ctx context.Context, out chan<- string, traceIDs []string) error { @@ -623,9 +573,18 @@ const ( storageLimitThreshold = 0.90 // Allow 90% of the quota to be used. ) -// wrappedRW wraps configurable write options for global ShardedReadWriter +type rw interface { + ReadTraceEvents(traceID string, out *modelpb.Batch) error + WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts eventstorage.WriterOpts) error + WriteTraceSampled(traceID string, sampled bool, opts eventstorage.WriterOpts) error + IsTraceSampled(traceID string) (bool, error) + DeleteTraceEvent(traceID, id string) error + Flush() error +} + +// wrappedRW wraps configurable write options for global rw type wrappedRW struct { - rw *eventstorage.ShardedReadWriter + rw rw writerOpts eventstorage.WriterOpts } @@ -634,7 +593,7 @@ type wrappedRW struct { // limit value greater than zero. The hard limit on storage is set to 90% of // the limit to account for delay in the size reporting by badger. // https://github.com/dgraph-io/badger/blob/82b00f27e3827022082225221ae05c03f0d37620/db.go#L1302-L1319. -func newWrappedRW(rw *eventstorage.ShardedReadWriter, ttl time.Duration, limit int64) *wrappedRW { +func newWrappedRW(rw rw, ttl time.Duration, limit int64) *wrappedRW { if limit > 1 { limit = int64(float64(limit) * storageLimitThreshold) } @@ -647,32 +606,32 @@ func newWrappedRW(rw *eventstorage.ShardedReadWriter, ttl time.Duration, limit i } } -// ReadTraceEvents calls ShardedReadWriter.ReadTraceEvents +// ReadTraceEvents calls rw.ReadTraceEvents func (s *wrappedRW) ReadTraceEvents(traceID string, out *modelpb.Batch) error { return s.rw.ReadTraceEvents(traceID, out) } -// WriteTraceEvents calls ShardedReadWriter.WriteTraceEvents using the configured WriterOpts +// WriteTraceEvent calls rw.WriteTraceEvent using the configured WriterOpts func (s *wrappedRW) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent) error { return s.rw.WriteTraceEvent(traceID, id, event, s.writerOpts) } -// WriteTraceSampled calls ShardedReadWriter.WriteTraceSampled using the configured WriterOpts +// WriteTraceSampled calls rw.WriteTraceSampled using the configured WriterOpts func (s *wrappedRW) WriteTraceSampled(traceID string, sampled bool) error { return s.rw.WriteTraceSampled(traceID, sampled, s.writerOpts) } -// IsTraceSampled calls ShardedReadWriter.IsTraceSampled +// IsTraceSampled calls rw.IsTraceSampled func (s *wrappedRW) IsTraceSampled(traceID string) (bool, error) { return s.rw.IsTraceSampled(traceID) } -// DeleteTraceEvent calls ShardedReadWriter.DeleteTraceEvent +// DeleteTraceEvent calls rw.DeleteTraceEvent func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error { return s.rw.DeleteTraceEvent(traceID, id) } -// Flush calls ShardedReadWriter.Flush +// Flush calls rw.Flush func (s *wrappedRW) Flush() error { return s.rw.Flush() } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 72be94bf8af..9dbd620371e 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -63,8 +63,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // subsequent events in the trace will be reported immediately. trace1 := modelpb.Trace{Id: "0102030405060708090a0b0c0d0e0f10"} trace2 := modelpb.Trace{Id: "0102030405060708090a0b0c0d0e0f11"} - storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}) - writer := storage.NewReadWriter() + writer := config.DB.NewBypassReadWriter() wOpts := eventstorage.WriterOpts{ TTL: time.Minute, StorageLimitInBytes: 0, @@ -74,8 +73,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { writer.Close() wOpts.TTL = -1 // expire immediately - storage = eventstorage.New(config.DB, eventstorage.ProtobufCodec{}) - writer = storage.NewReadWriter() + writer = config.DB.NewBypassReadWriter() assert.NoError(t, writer.WriteTraceSampled(trace2.Id, true, wOpts)) assert.NoError(t, writer.Flush()) writer.Close() @@ -143,7 +141,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.Storage.Flush()) - reader := storage.NewReadWriter() + reader := config.DB.NewBypassReadWriter() defer reader.Close() batch = nil @@ -262,8 +260,7 @@ func TestProcessLocalTailSampling(t *testing.T) { // Stop the processor and flush global storage so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.Storage.Flush()) - storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}) - reader := storage.NewReadWriter() + reader := config.DB.NewBypassReadWriter() defer reader.Close() sampled, err := reader.IsTraceSampled(sampledTraceID) @@ -327,8 +324,7 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { // Stop the processor so we can access the database. assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.Storage.Flush()) - storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}) - reader := storage.NewReadWriter() + reader := config.DB.NewBypassReadWriter() defer reader.Close() var anyUnsampled bool @@ -494,8 +490,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { assert.Empty(t, cmp.Diff(trace1Events, events, protocmp.Transform())) - storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}) - reader := storage.NewReadWriter() + reader := config.DB.NewBypassReadWriter() defer reader.Close() sampled, err := reader.IsTraceSampled(traceID1) @@ -597,31 +592,23 @@ func TestStorageGC(t *testing.T) { config.TTL = 10 * time.Millisecond config.FlushInterval = 10 * time.Millisecond - // Create a new badger DB with smaller value log files so we can test GC. - config.DB.Close() - badgerDB, err := eventstorage.OpenBadger(config.StorageDir, 1024*1024) - require.NoError(t, err) - t.Cleanup(func() { badgerDB.Close() }) - config.DB = badgerDB - config.Storage = eventstorage. - New(config.DB, eventstorage.ProtobufCodec{}). - NewShardedReadWriter() - t.Cleanup(func() { config.Storage.Close() }) - writeBatch := func(n int) { - config.StorageGCInterval = time.Minute // effectively disable + config.StorageGCInterval = time.Hour // effectively disable processor, err := sampling.NewProcessor(config) require.NoError(t, err) go processor.Run() defer processor.Stop(context.Background()) for i := 0; i < n; i++ { traceID := uuid.Must(uuid.NewV4()).String() + // Create a larger event to fill up the vlog faster, especially when it is above ValueThreshold batch := modelpb.Batch{{ Trace: &modelpb.Trace{Id: traceID}, Event: &modelpb.Event{Duration: uint64(123 * time.Millisecond)}, Span: &modelpb.Span{ - Type: "type", - Id: traceID, + Type: strings.Repeat("a", 1000), + Subtype: strings.Repeat("b", 1000), + Id: traceID, + Name: strings.Repeat("c", 1000), }, }} err := processor.ProcessBatch(context.Background(), &batch) @@ -647,7 +634,7 @@ func TestStorageGC(t *testing.T) { // Process spans until value log files have been created. // Garbage collection is disabled at this time. for len(vlogFilenames()) < 3 { - writeBatch(500) + writeBatch(2000) } config.StorageGCInterval = 10 * time.Millisecond @@ -657,16 +644,11 @@ func TestStorageGC(t *testing.T) { defer processor.Stop(context.Background()) // Wait for the first value log file to be garbage collected. - deadline := time.Now().Add(10 * time.Second) - for time.Now().Before(deadline) { - vlogs := vlogFilenames() - if len(vlogs) == 0 || vlogs[0] != "000000.vlog" { - // garbage collected - return - } - time.Sleep(10 * time.Millisecond) - } - t.Fatal("timed out waiting for value log garbage collection") + var vlogs []string + assert.Eventually(t, func() bool { + vlogs = vlogFilenames() + return len(vlogs) == 0 || vlogs[0] != "000000.vlog" + }, 10*time.Second, 100*time.Millisecond, vlogs) } func TestStorageGCConcurrency(t *testing.T) { @@ -734,12 +716,11 @@ func TestStorageLimit(t *testing.T) { assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b))) }) assert.NoError(t, config.Storage.Flush()) - config.Storage.Close() assert.NoError(t, config.DB.Close()) // Open a new instance of the badgerDB and check the size. var err error - config.DB, err = eventstorage.OpenBadger(config.StorageDir, 1024*1024) + config.DB, err = eventstorage.NewStorageManager(config.StorageDir) require.NoError(t, err) t.Cleanup(func() { config.DB.Close() }) @@ -825,7 +806,8 @@ func TestGracefulShutdown(t *testing.T) { assert.NoError(t, processor.Stop(context.Background())) assert.NoError(t, config.Storage.Flush()) - reader := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}).NewReadWriter() + reader := config.DB.NewBypassReadWriter() + defer reader.Close() var count int for i := 0; i < totalTraces; i++ { @@ -841,13 +823,11 @@ func newTempdirConfig(tb testing.TB) sampling.Config { require.NoError(tb, err) tb.Cleanup(func() { os.RemoveAll(tempdir) }) - badgerDB, err := eventstorage.OpenBadger(tempdir, 0) + badgerDB, err := eventstorage.NewStorageManager(tempdir) require.NoError(tb, err) tb.Cleanup(func() { badgerDB.Close() }) - eventCodec := eventstorage.ProtobufCodec{} - storage := eventstorage.New(badgerDB, eventCodec).NewShardedReadWriter() - tb.Cleanup(func() { storage.Close() }) + storage := badgerDB.NewReadWriter() return sampling.Config{ BatchProcessor: modelpb.ProcessBatchFunc(func(context.Context, *modelpb.Batch) error { return nil }),