Skip to content

Commit

Permalink
[8.4] Fix race condition in custom libbeat instrumentation (backport #…
Browse files Browse the repository at this point in the history
…8900) (#8916)

* Fix race condition in custom libbeat instrumentation (#8900)

(cherry picked from commit 7494a95)

# Conflicts:
#	changelogs/head.asciidoc

* Fix conflicts

Co-authored-by: Vishal Raj <[email protected]>
  • Loading branch information
mergify[bot] and lahsivjar authored Aug 25, 2022
1 parent 53f4eab commit f49bff8
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func (bt *beater) run(ctx context.Context, cancelContext context.CancelFunc, b *
if b.Config != nil {
reloader.outputConfig = b.Config.Output
}
if err := reloader.reload(); err != nil {
reloader.mu.Lock()
err := reloader.reload()
reloader.mu.Unlock()
if err != nil {
return err
}
}
Expand Down Expand Up @@ -289,6 +292,7 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
}

r.mu.Lock()
defer r.mu.Unlock()
r.rawConfig = integrationConfig.APMServer
// Merge in datastream namespace passed in from apm integration
if integrationConfig.DataStream != nil && integrationConfig.DataStream.Namespace != "" {
Expand All @@ -300,7 +304,6 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
}
}
r.fleetConfig = &integrationConfig.Fleet
r.mu.Unlock()
return r.reload()
}

Expand All @@ -312,14 +315,16 @@ func (r *reloader) reloadOutput(config *reload.ConfigWithMeta) error {
}
}
r.mu.Lock()
defer r.mu.Unlock()
r.outputConfig = outputConfig
r.mu.Unlock()
return r.reload()
}

// reload creates a new serverRunner, launches it in a new goroutine, waits
// for it to have successfully started and returns after waiting for the previous
// serverRunner (if any) to exit. Calls to reload must be sycnhronized explicitly
// by acquiring reloader#mu by callers.
func (r *reloader) reload() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.rawConfig == nil {
// APM Server config not loaded yet.
return nil
Expand All @@ -345,6 +350,17 @@ func (r *reloader) reload() error {
r.args.Logger.Error(err)
}
}()

// Wait for the new runner to start; this avoids the race condition in updating
// the monitoring#Deafult global registry inside the runner due to two reloads,
// one for the inputs and the other for the elasticsearch output
select {
case <-runner.done:
return errors.New("runner exited unexpectedly")
case <-runner.started:
// runner has started
}

// If the old runner exists, cancel it
if r.runner != nil {
r.runner.cancelRunServerContext()
Expand All @@ -364,6 +380,7 @@ type serverRunner struct {
// immediately when the Stop method is invoked.
runServerContext context.Context
cancelRunServerContext context.CancelFunc
started chan struct{}
done chan struct{}

pipeline beat.PipelineConnector
Expand Down Expand Up @@ -416,6 +433,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
runServerContext: runServerContext,
cancelRunServerContext: cancel,
done: make(chan struct{}),
started: make(chan struct{}),

config: cfg,
rawConfig: args.RawConfig,
Expand Down Expand Up @@ -581,6 +599,10 @@ func (s *serverRunner) run(listener net.Listener) error {
NewElasticsearchClient: newElasticsearchClient,
})
})

// Signal that the runner has started
close(s.started)

result := g.Wait()
if err := closeFinalBatchProcessor(s.backgroundContext); err != nil {
result = multierror.Append(result, err)
Expand Down Expand Up @@ -664,19 +686,12 @@ func (s *serverRunner) waitReady(ctx context.Context, kibanaClient kibana.Client
return waitReady(ctx, s.config.WaitReadyInterval, s.tracer, s.logger, check)
}

// This mutex must be held when updating the libbeat monitoring registry,
// as there may be multiple servers running concurrently.
var monitoringRegistryMu sync.Mutex

// newFinalBatchProcessor returns the final model.BatchProcessor that publishes events,
// and a cleanup function which should be called on server shutdown. If the output is
// "elasticsearch", then we use modelindexer; otherwise we use the libbeat publisher.
func (s *serverRunner) newFinalBatchProcessor(
newElasticsearchClient func(cfg *elasticsearch.Config) (elasticsearch.Client, error),
) (model.BatchProcessor, func(context.Context) error, error) {
monitoringRegistryMu.Lock()
defer monitoringRegistryMu.Unlock()

if s.elasticsearchOutputConfig == nil {
// When the publisher stops cleanly it will close its pipeline client,
// calling the acker's Close method. We need to call Open for each new
Expand Down

0 comments on commit f49bff8

Please sign in to comment.