Skip to content

Commit

Permalink
Limit badger gc concurrency to 1 to avoid panic (#14340)
Browse files Browse the repository at this point in the history
Badger GC will panic when run concurrently. 2 TBS processors may run concurrently during a hot reload. Make TBS processor concurrent-safe by protecting badger gc using a mutex.

(cherry picked from commit 43e968f)
  • Loading branch information
carsonip authored and mergify[bot] committed Oct 21, 2024
1 parent 00175f1 commit 96acb93
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelogs/8.16.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/v8.15.2\...v8.16.0[View commits]
- Track all bulk request response status codes {pull}13574[13574]
- Fix a concurrent map write panic in monitoring middleware {pull}14335[14335]
- Apply shutdown timeout to http server {pull}14339[14339]
- Tail-based sampling: Fix rare gc thread failure after EA hot reload causing storage not reclaimed and stuck with "storage limit reached" {pull}13574[13574]

[float]
==== Breaking Changes
Expand Down
28 changes: 27 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
shutdownGracePeriod = 5 * time.Second
)

var (
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload
gcCh = make(chan struct{}, 1)
)

// Processor is a tail-sampling event processor.
type Processor struct {
config Config
Expand Down Expand Up @@ -386,6 +391,16 @@ func (p *Processor) Run() error {
}
})
g.Go(func() error {
// Protect this goroutine from running concurrently when 2 TBS processors are active
// as badger GC is not concurrent safe.
select {
case <-p.stopping:
return nil
case gcCh <- struct{}{}:
}
defer func() {
<-gcCh
}()
// This goroutine is responsible for periodically garbage
// collecting the Badger value log, using the recommended
// discard ratio of 0.5.
Expand All @@ -411,7 +426,9 @@ func (p *Processor) Run() error {
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. The next subscriber will pick up from the previous position.
// Stop is called. But it is possible that both old and new subscriber goroutines
// run concurrently, before the old one eventually receives the Stop call.
// The next subscriber will pick up from the previous position.
defer close(remoteSampledTraceIDs)
defer close(subscriberPositions)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -558,7 +575,13 @@ func (p *Processor) Run() error {
return nil
}

// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload.
var subscriberPositionFileMutex sync.Mutex

func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) {
subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()

var pos pubsub.SubscriberPosition
data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile))
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -579,6 +602,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e
if err != nil {
return err
}

subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()
return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644)
}

Expand Down
26 changes: 26 additions & 0 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
Expand Down Expand Up @@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) {
t.Fatal("timed out waiting for value log garbage collection")
}

func TestStorageGCConcurrency(t *testing.T) {
// This test ensures that TBS processor does not return an error
// even when run concurrently e.g. in hot reload
if testing.Short() {
t.Skip("skipping slow test")
}

config := newTempdirConfig(t)
config.TTL = 10 * time.Millisecond
config.FlushInterval = 10 * time.Millisecond
config.StorageGCInterval = 10 * time.Millisecond

g := errgroup.Group{}
for i := 0; i < 2; i++ {
processor, err := sampling.NewProcessor(config)
require.NoError(t, err)
g.Go(processor.Run)
go func() {
time.Sleep(time.Second)
assert.NoError(t, processor.Stop(context.Background()))
}()
}
assert.NoError(t, g.Wait())
}

func TestStorageLimit(t *testing.T) {
// This test ensures that when tail sampling is configured with a hard
// storage limit, the limit is respected once the size is available.
Expand Down

0 comments on commit 96acb93

Please sign in to comment.