diff --git a/changelogs/8.16.asciidoc b/changelogs/8.16.asciidoc index aef70522e8b..96e1a0a0cfc 100644 --- a/changelogs/8.16.asciidoc +++ b/changelogs/8.16.asciidoc @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/v8.15.2\...v8.16.0[View commits] - Track all bulk request response status codes {pull}13574[13574] - Fix a concurrent map write panic in monitoring middleware {pull}14335[14335] - Apply shutdown timeout to http server {pull}14339[14339] +- Tail-based sampling: Fix rare gc thread failure after EA hot reload causing storage not reclaimed and stuck with "storage limit reached" {pull}13574[13574] [float] ==== Breaking Changes diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 82dc2df59aa..9402408b299 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -40,6 +40,11 @@ const ( shutdownGracePeriod = 5 * time.Second ) +var ( + // gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload + gcCh = make(chan struct{}, 1) +) + // Processor is a tail-sampling event processor. type Processor struct { config Config @@ -386,6 +391,16 @@ func (p *Processor) Run() error { } }) g.Go(func() error { + // Protect this goroutine from running concurrently when 2 TBS processors are active + // as badger GC is not concurrent safe. + select { + case <-p.stopping: + return nil + case gcCh <- struct{}{}: + } + defer func() { + <-gcCh + }() // This goroutine is responsible for periodically garbage // collecting the Badger value log, using the recommended // discard ratio of 0.5. @@ -411,7 +426,9 @@ func (p *Processor) Run() error { }) g.Go(func() error { // Subscribe to remotely sampled trace IDs. This is cancelled immediately when - // Stop is called. The next subscriber will pick up from the previous position. + // Stop is called. But it is possible that both old and new subscriber goroutines + // run concurrently, before the old one eventually receives the Stop call. + // The next subscriber will pick up from the previous position. defer close(remoteSampledTraceIDs) defer close(subscriberPositions) ctx, cancel := context.WithCancel(context.Background()) @@ -558,7 +575,13 @@ func (p *Processor) Run() error { return nil } +// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload. +var subscriberPositionFileMutex sync.Mutex + func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) { + subscriberPositionFileMutex.Lock() + defer subscriberPositionFileMutex.Unlock() + var pos pubsub.SubscriberPosition data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile)) if errors.Is(err, os.ErrNotExist) { @@ -579,6 +602,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e if err != nil { return err } + + subscriberPositionFileMutex.Lock() + defer subscriberPositionFileMutex.Unlock() return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644) } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index f17500da501..e0bf38f77b2 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/testing/protocmp" "github.com/elastic/apm-data/model/modelpb" @@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) { t.Fatal("timed out waiting for value log garbage collection") } +func TestStorageGCConcurrency(t *testing.T) { + // This test ensures that TBS processor does not return an error + // even when run concurrently e.g. in hot reload + if testing.Short() { + t.Skip("skipping slow test") + } + + config := newTempdirConfig(t) + config.TTL = 10 * time.Millisecond + config.FlushInterval = 10 * time.Millisecond + config.StorageGCInterval = 10 * time.Millisecond + + g := errgroup.Group{} + for i := 0; i < 2; i++ { + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + g.Go(processor.Run) + go func() { + time.Sleep(time.Second) + assert.NoError(t, processor.Stop(context.Background())) + }() + } + assert.NoError(t, g.Wait()) +} + func TestStorageLimit(t *testing.T) { // This test ensures that when tail sampling is configured with a hard // storage limit, the limit is respected once the size is available.