Skip to content

Commit

Permalink
Fix race condition in custom libbeat instrumentation (#8900)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Aug 22, 2022
1 parent b7d5eb0 commit 7494a95
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/8.5\...main[View commits]
==== Bug fixes
- Set `message` instead of `labels.event` for Jaeger span events {pull}8765[8765]
- Fix event loss during reload of TBS processor {pull}8809[8809]
- Fix sporadically missing custom libbeat metrics {pull}8900[8900]

[float]
==== Intake API Changes
Expand Down
39 changes: 27 additions & 12 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *
if b.Config != nil {
reloader.outputConfig = b.Config.Output
}
if err := reloader.reload(); err != nil {
reloader.mu.Lock()
err := reloader.reload()
reloader.mu.Unlock()
if err != nil {
return err
}
}
Expand Down Expand Up @@ -289,6 +292,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
}

r.mu.Lock()
defer r.mu.Unlock()
r.rawConfig = integrationConfig.APMServer
// Merge in datastream namespace passed in from apm integration
if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" {
Expand All @@ -300,7 +304,6 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
}
}
r.fleetConfig = &integrationConfig.Fleet
r.mu.Unlock()
return r.reload()
}

Expand All @@ -312,14 +315,16 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error {
}
}
r.mu.Lock()
defer r.mu.Unlock()
r.outputConfig = outputConfig
r.mu.Unlock()
return r.reload()
}

// reload creates a new serverRunner, launches it in a new goroutine, waits
// for it to have successfully started and returns after waiting for the previous
// serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly
// by acquiring reloader#mu by callers.
func (r *reloader) reload() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.rawConfig == nil {
// APM Server config not loaded yet.
return nil
Expand All @@ -345,6 +350,17 @@ func (r *reloader) reload() error {
r.args.Logger.Error(err)
}
}()

// Wait for the new runner to start; this avoids the race condition in updating
// the monitoring#Deafult global registry inside the runner due to two reloads,
// one for the inputs and the other for the elasticsearch output
select {
case <-runner.done:
return errors.New("runner exited unexpectedly")
case <-runner.started:
// runner has started
}

// If the old runner exists, cancel it
if r.runner != nil {
r.runner.cancelRunServerContext()
Expand All @@ -364,6 +380,7 @@ type serverRunner struct {
// immediately when the Stop method is invoked.
runServerContext context.Context
cancelRunServerContext context.CancelFunc
started chan struct{}
done chan struct{}

pipeline beat.PipelineConnector
Expand Down Expand Up @@ -416,6 +433,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
runServerContext: runServerContext,
cancelRunServerContext: cancel,
done: make(chan struct{}),
started: make(chan struct{}),

config: cfg,
rawConfig: args.RawConfig,
Expand Down Expand Up @@ -581,6 +599,10 @@ func (s *serverRunner) run(listener net.Listener) error {
NewElasticsearchClient: newElasticsearchClient,
})
})

// Signal that the runner has started
close(s.started)

result := g.Wait()
if err := closeFinalBatchProcessor(s.backgroundContext); err != nil {
result = multierror.Append(result, err)
Expand Down Expand Up @@ -664,19 +686,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check)
}

// This mutex must be held when updating the libbeat monitoring registry,
// as there may be multiple servers running concurrently.
var monitoringRegistryMu sync.Mutex

// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events,
// and a cleanup function which should be called on server shutdown. If the output is
// "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher.
func (s *serverRunner) newFinalBatchProcessor(
newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error),
) (model.BatchProcessor, func(context.Context) error, error) {
monitoringRegistryMu.Lock()
defer monitoringRegistryMu.Unlock()

if s.elasticsearchOutputConfig == nil {
// When the publisher stops cleanly it will close its pipeline client,
// calling the acker's Close method. We need to call Open for each new
Expand Down

0 comments on commit 7494a95

Please sign in to comment.