Skip to content

Commit

Permalink
Add transform.Processor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Feb 23, 2021
1 parent e1324c2 commit 3f406ae
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
23 changes: 23 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,3 +592,26 @@ func newSourcemapStore(beatInfo beat.Info, cfg *config.SourceMapping) (*sourcema
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
// with a function that transformables are first passed through the given
// processors in order.
func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...transform.Processor) RunServerFunc {
if len(processors) == 0 {
return runServer
}
return func(ctx context.Context, args ServerParams) error {
origReporter := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
var err error
for _, p := range processors {
req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables)
if err != nil {
return err
}
}
return origReporter(ctx, req)
}
return runServer(ctx, args)
}
}
8 changes: 8 additions & 0 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ import (
"github.com/elastic/apm-server/sourcemap"
)

// Processor can be used to process a set of Tramsformables, giving
// the opportunity to add or remove by returning a new slice.
type Processor interface {
ProcessTransformables(context.Context, []Transformable) ([]Transformable, error)
}

// Transformable is an interface implemented by all top-level model objects for
// translating to beat.Events.
type Transformable interface {
Transform(context.Context, *Config) []beat.Event
}
Expand Down
17 changes: 5 additions & 12 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/elastic/apm-server/beater"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/spanmetrics"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics"
Expand All @@ -41,7 +40,7 @@ type namedProcessor struct {
}

type processor interface {
ProcessTransformables(context.Context, []transform.Transformable) ([]transform.Transformable, error)
transform.Processor
Run() error
Stop(context.Context) error
}
Expand Down Expand Up @@ -176,17 +175,11 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc
return runServer(ctx, args)
}

origReport := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
var err error
for _, p := range processors {
req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables)
if err != nil {
return err
}
}
return origReport(ctx, req)
transformProcessors := make([]transform.Processor, len(processors))
for i, p := range processors {
transformProcessors[i] = p
}
runServer = beater.WrapRunServerWithProcessors(runServer, transformProcessors...)

g, ctx := errgroup.WithContext(ctx)
for _, p := range processors {
Expand Down

0 comments on commit 3f406ae

Please sign in to comment.