diff --git a/beater/beater.go b/beater/beater.go index c9de6beb85c..95aa36c0b5a 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -592,3 +592,26 @@ func newSourcemapStore(beatInfo beat.Info, cfg *config.SourceMapping) (*sourcema index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version) return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration) } + +// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter +// with a function that transformables are first passed through the given +// processors in order. +func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...transform.Processor) RunServerFunc { + if len(processors) == 0 { + return runServer + } + return func(ctx context.Context, args ServerParams) error { + origReporter := args.Reporter + args.Reporter = func(ctx context.Context, req publish.PendingReq) error { + var err error + for _, p := range processors { + req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables) + if err != nil { + return err + } + } + return origReporter(ctx, req) + } + return runServer(ctx, args) + } +} diff --git a/transform/transform.go b/transform/transform.go index 459bc171985..0a3aae7026b 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -26,6 +26,14 @@ import ( "github.com/elastic/apm-server/sourcemap" ) +// Processor can be used to process a set of Tramsformables, giving +// the opportunity to add or remove by returning a new slice. +type Processor interface { + ProcessTransformables(context.Context, []Transformable) ([]Transformable, error) +} + +// Transformable is an interface implemented by all top-level model objects for +// translating to beat.Events. type Transformable interface { Transform(context.Context, *Config) []beat.Event } diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 91e02c81b6b..f1f0e6f133b 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -18,7 +18,6 @@ import ( "github.com/elastic/apm-server/beater" "github.com/elastic/apm-server/elasticsearch" - "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/spanmetrics" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics" @@ -41,7 +40,7 @@ type namedProcessor struct { } type processor interface { - ProcessTransformables(context.Context, []transform.Transformable) ([]transform.Transformable, error) + transform.Processor Run() error Stop(context.Context) error } @@ -176,17 +175,11 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc return runServer(ctx, args) } - origReport := args.Reporter - args.Reporter = func(ctx context.Context, req publish.PendingReq) error { - var err error - for _, p := range processors { - req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables) - if err != nil { - return err - } - } - return origReport(ctx, req) + transformProcessors := make([]transform.Processor, len(processors)) + for i, p := range processors { + transformProcessors[i] = p } + runServer = beater.WrapRunServerWithProcessors(runServer, transformProcessors...) g, ctx := errgroup.WithContext(ctx) for _, p := range processors {