diff --git a/_meta/beat.yml b/_meta/beat.yml index c87b03e0b01..22976c221ff 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -35,6 +35,11 @@ apm-server: # or the IP and User Agent of the real user (RUM requests). #capture_personal_data: true + # If specified, APM Server will record this value in events which have no service environment + # defined, and add it to agent configuration queries to Kibana when none is specified in the + # request from the agent. + #default_service_environment: + # Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/). #expvar: #enabled: false diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 2a0c8c65930..cf04f81dcdb 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -35,6 +35,11 @@ apm-server: # or the IP and User Agent of the real user (RUM requests). #capture_personal_data: true + # If specified, APM Server will record this value in events which have no service environment + # defined, and add it to agent configuration queries to Kibana when none is specified in the + # request from the agent. + #default_service_environment: + # Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/). #expvar: #enabled: false diff --git a/apm-server.yml b/apm-server.yml index bf1586c0e4d..65e9c5445fe 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -35,6 +35,11 @@ apm-server: # or the IP and User Agent of the real user (RUM requests). #capture_personal_data: true + # If specified, APM Server will record this value in events which have no service environment + # defined, and add it to agent configuration queries to Kibana when none is specified in the + # request from the agent. + #default_service_environment: + # Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/). #expvar: #enabled: false diff --git a/apmpackage/apm/0.1.0/_dev/docs/README.template.md b/apmpackage/apm/0.1.0/_dev/docs/README.template.md index 7f96e743c99..9714613aeed 100644 --- a/apmpackage/apm/0.1.0/_dev/docs/README.template.md +++ b/apmpackage/apm/0.1.0/_dev/docs/README.template.md @@ -18,7 +18,11 @@ If you need additional pipelines, override ILM policies, etc; you must do it ext #### Data Streams When using the APM integration, apm events are indexed into data streams. Data stream names contain the event type, - the service name, and a user configurable namespace. +the service name, and a user configurable namespace. + +There is no specific recommendaton for what to use as a namespace; it is intentionally flexible. +You might use the environment (production, testing, development) as the namespace, +or alternatively you could namespace data by business unit. It is your choice. ## Compatibility and limitations diff --git a/apmpackage/apm/0.1.0/agent/input/template.yml.hbs b/apmpackage/apm/0.1.0/agent/input/template.yml.hbs index 246560be3b9..85e3e507bf2 100644 --- a/apmpackage/apm/0.1.0/agent/input/template.yml.hbs +++ b/apmpackage/apm/0.1.0/agent/input/template.yml.hbs @@ -3,6 +3,7 @@ apm-server: secret_token: {{secret_token}} max_event_size: {{max_event_bytes}} capture_personal_data: {{capture_personal_data}} + default_service_environment: {{default_service_environment}} kibana: api_key: {{kibana_api_key}} rum: diff --git a/apmpackage/apm/0.1.0/docs/README.md b/apmpackage/apm/0.1.0/docs/README.md index a82d1b0e3ef..f85de1f4e0e 100644 --- a/apmpackage/apm/0.1.0/docs/README.md +++ b/apmpackage/apm/0.1.0/docs/README.md @@ -18,7 +18,11 @@ If you need additional pipelines, override ILM policies, etc; you must do it ext #### Data Streams When using the APM integration, apm events are indexed into data streams. Data stream names contain the event type, - the service name, and a user configurable namespace. +the service name, and a user configurable namespace. + +There is no specific recommendaton for what to use as a namespace; it is intentionally flexible. +You might use the environment (production, testing, development) as the namespace, +or alternatively you could namespace data by business unit. It is your choice. ## Compatibility and limitations diff --git a/apmpackage/apm/0.1.0/manifest.yml b/apmpackage/apm/0.1.0/manifest.yml index c6db9541b55..b49d8a6e526 100644 --- a/apmpackage/apm/0.1.0/manifest.yml +++ b/apmpackage/apm/0.1.0/manifest.yml @@ -56,6 +56,12 @@ policy_templates: required: true show_user: true default: false + - name: default_service_environment + type: string + title: Default Service Environment + description: Default service environment to record in events which have no service environment defined. + required: false + show_user: false - name: rum_allow_origins type: string title: RUM - Origin Headers diff --git a/beater/api/config/agent/handler.go b/beater/api/config/agent/handler.go index 8d49407aefa..351244dae21 100644 --- a/beater/api/config/agent/handler.go +++ b/beater/api/config/agent/handler.go @@ -62,7 +62,7 @@ var ( ) // Handler returns a request.Handler for managing agent central configuration requests. -func Handler(client kibana.Client, config *config.AgentConfig) request.Handler { +func Handler(client kibana.Client, config *config.AgentConfig, defaultServiceEnvironment string) request.Handler { cacheControl := fmt.Sprintf("max-age=%v, must-revalidate", config.Cache.Expiration.Seconds()) fetcher := agentcfg.NewFetcher(client, config.Cache.Expiration) @@ -88,6 +88,9 @@ func Handler(client kibana.Client, config *config.AgentConfig) request.Handler { c.Write() return } + if query.Service.Environment == "" { + query.Service.Environment = defaultServiceEnvironment + } result, err := fetcher.Fetch(c.Request.Context(), query) if err != nil { diff --git a/beater/api/config/agent/handler_test.go b/beater/api/config/agent/handler_test.go index 6f6883994ec..993525f8c20 100644 --- a/beater/api/config/agent/handler_test.go +++ b/beater/api/config/agent/handler_test.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -181,14 +182,12 @@ func TestAgentConfigHandler(t *testing.T) { for name, tc := range testcases { runTest := func(t *testing.T, expectedBody map[string]string, auth authorization.Authorization) { - h := Handler(tc.kbClient, &cfg) - w := httptest.NewRecorder() + h := Handler(tc.kbClient, &cfg, "") r := httptest.NewRequest(tc.method, target(tc.queryParams), nil) for k, v := range tc.requestHeader { r.Header.Set(k, v) } - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) ctx.Authorization = auth h(ctx) @@ -214,13 +213,9 @@ func TestAgentConfigHandler(t *testing.T) { func TestAgentConfigHandler_NoKibanaClient(t *testing.T) { cfg := config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}} - h := Handler(nil, &cfg) - - w := httptest.NewRecorder() - ctx := request.NewContext() - ctx.Reset(w, httptest.NewRequest(http.MethodGet, "/config", nil)) - h(ctx) + h := Handler(nil, &cfg, "") + w := sendRequest(h, httptest.NewRequest(http.MethodGet, "/config", nil)) assert.Equal(t, http.StatusServiceUnavailable, w.Code, w.Body.String()) } @@ -236,25 +231,43 @@ func TestAgentConfigHandler_PostOk(t *testing.T) { }, mockVersion, true) var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}} - h := Handler(kb, &cfg) - - w := httptest.NewRecorder() - r := httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{ - "service": m{"name": "opbeans-node"}})) - ctx := request.NewContext() - ctx.Reset(w, r) - h(ctx) + h := Handler(kb, &cfg, "") + w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{ + "service": m{"name": "opbeans-node"}}))) assert.Equal(t, http.StatusOK, w.Code, w.Body.String()) } +func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) { + kb := &recordingKibanaClient{ + Client: tests.MockKibana(http.StatusOK, m{ + "_id": "1", + "_source": m{ + "settings": m{ + "sampling_rate": 0.5, + }, + }, + }, mockVersion, true), + } + + var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}} + h := Handler(kb, &cfg, "default") + + sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}}))) + sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}}))) + require.Len(t, kb.requests, 2) + + body0, _ := ioutil.ReadAll(kb.requests[0].Body) + body1, _ := ioutil.ReadAll(kb.requests[1].Body) + assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"specified"},"etag":""}`, string(body0)) + assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"default"},"etag":""}`, string(body1)) +} + func TestAgentConfigRum(t *testing.T) { h := getHandler("rum-js") - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ "service": m{"name": "opbeans"}})) - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) ctx.IsRum = true h(ctx) var actual map[string]string @@ -266,10 +279,8 @@ func TestAgentConfigRum(t *testing.T) { func TestAgentConfigRumEtag(t *testing.T) { h := getHandler("rum-js") - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil) - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) ctx.IsRum = true h(ctx) assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String()) @@ -277,11 +288,9 @@ func TestAgentConfigRumEtag(t *testing.T) { func TestAgentConfigNotRum(t *testing.T) { h := getHandler("node-js") - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{ "service": m{"name": "opbeans"}})) - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -291,11 +300,9 @@ func TestAgentConfigNotRum(t *testing.T) { func TestAgentConfigNoLeak(t *testing.T) { h := getHandler("node-js") - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ "service": m{"name": "opbeans"}})) - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) ctx.IsRum = true h(ctx) var actual map[string]string @@ -306,11 +313,9 @@ func TestAgentConfigNoLeak(t *testing.T) { func TestAgentConfigRateLimit(t *testing.T) { h := getHandler("rum-js") - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ "service": m{"name": "opbeans"}})) - ctx := request.NewContext() - ctx.Reset(w, r) + ctx, w := newRequestContext(r) ctx.IsRum = true ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0) h(ctx) @@ -334,7 +339,7 @@ func getHandler(agent string) request.Handler { }, mockVersion, true) var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}} - return Handler(kb, &cfg) + return Handler(kb, &cfg, "") } func TestIfNoneMatch(t *testing.T) { @@ -358,22 +363,31 @@ func TestAgentConfigTraceContext(t *testing.T) { kibanaCfg := config.KibanaConfig{Enabled: true, ClientConfig: libkibana.DefaultClientConfig()} kibanaCfg.Host = "testKibana:12345" client := kibana.NewConnectingClient(&kibanaCfg) - handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}}) + handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}}, "") _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { // When the handler is called with a context containing // a transaction, the underlying Kibana query should create a span - w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{ "service": m{"name": "opbeans"}})) - r = r.WithContext(ctx) - c := request.NewContext() - c.Reset(w, r) - handler(c) + sendRequest(handler, r.WithContext(ctx)) }) require.Len(t, spans, 1) assert.Equal(t, "app", spans[0].Type) } +func sendRequest(h request.Handler, r *http.Request) *httptest.ResponseRecorder { + ctx, recorder := newRequestContext(r) + h(ctx) + return recorder +} + +func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRecorder) { + w := httptest.NewRecorder() + ctx := request.NewContext() + ctx.Reset(w, r) + return ctx, w +} + func target(params map[string]string) string { t := "/config" if len(params) == 0 { @@ -385,3 +399,20 @@ func target(params map[string]string) string { } return t } + +type recordingKibanaClient struct { + kibana.Client + requests []*http.Request +} + +func (c *recordingKibanaClient) Send(ctx context.Context, method string, path string, params url.Values, header http.Header, body io.Reader) (*http.Response, error) { + req := httptest.NewRequest(method, path, body) + req.URL.RawQuery = params.Encode() + for k, values := range header { + for _, v := range values { + req.Header.Add(k, v) + } + } + c.requests = append(c.requests, req.WithContext(ctx)) + return c.Client.Send(ctx, method, path, params, header, body) +} diff --git a/beater/api/mux.go b/beater/api/mux.go index c1f8663d482..e3700b49a9f 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -170,7 +170,7 @@ func agentConfigHandler(cfg *config.Config, authHandler *authorization.Handler, if cfg.Kibana.Enabled { client = kibana.NewConnectingClient(&cfg.Kibana) } - h := agent.Handler(client, cfg.AgentConfig) + h := agent.Handler(client, cfg.AgentConfig, cfg.DefaultServiceEnvironment) msg := "Agent remote configuration is disabled. " + "Configure the `apm-server.kibana` section in apm-server.yml to enable it. " + "If you are using a RUM agent, you also need to configure the `apm-server.rum` section. " + diff --git a/beater/beater.go b/beater/beater.go index c9de6beb85c..36d126d17e9 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -46,6 +46,7 @@ import ( "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/ingest/pipeline" logs "github.com/elastic/apm-server/log" + "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/sampling" "github.com/elastic/apm-server/sourcemap" @@ -352,6 +353,7 @@ func (s *serverRunner) run() error { // behaviour into the processing/reporting pipeline. runServer = s.wrapRunServer(runServer) } + runServer = s.wrapRunServerWithPreprocessors(runServer) transformConfig, err := newTransformConfig(s.beat.Info, s.config) if err != nil { @@ -399,6 +401,16 @@ func (s *serverRunner) run() error { return publisher.Stop(s.backgroundContext) } +func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc { + var processors []transform.Processor + if s.config.DefaultServiceEnvironment != "" { + processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{ + DefaultServiceEnvironment: s.config.DefaultServiceEnvironment, + }) + } + return WrapRunServerWithProcessors(runServer, processors...) +} + // checkConfig verifies the global configuration doesn't use unsupported settings // // TODO(axw) remove this, nobody expects dashboard setup from apm-server. @@ -592,3 +604,26 @@ func newSourcemapStore(beatInfo beat.Info, cfg *config.SourceMapping) (*sourcema index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version) return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration) } + +// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter +// with a function that transformables are first passed through the given +// processors in order. +func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...transform.Processor) RunServerFunc { + if len(processors) == 0 { + return runServer + } + return func(ctx context.Context, args ServerParams) error { + origReporter := args.Reporter + args.Reporter = func(ctx context.Context, req publish.PendingReq) error { + var err error + for _, p := range processors { + req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables) + if err != nil { + return err + } + } + return origReporter(ctx, req) + } + return runServer(ctx, args) + } +} diff --git a/beater/config/config.go b/beater/config/config.go index 69b0c61f114..1e3802b11fd 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -64,30 +64,31 @@ func defaultKibanaConfig() KibanaConfig { // Config holds configuration information nested under the key `apm-server` type Config struct { - Host string `config:"host"` - MaxHeaderSize int `config:"max_header_size"` - IdleTimeout time.Duration `config:"idle_timeout"` - ReadTimeout time.Duration `config:"read_timeout"` - WriteTimeout time.Duration `config:"write_timeout"` - MaxEventSize int `config:"max_event_size"` - ShutdownTimeout time.Duration `config:"shutdown_timeout"` - TLS *tlscommon.ServerConfig `config:"ssl"` - MaxConnections int `config:"max_connections"` - ResponseHeaders map[string][]string `config:"response_headers"` - Expvar *ExpvarConfig `config:"expvar"` - AugmentEnabled bool `config:"capture_personal_data"` - SelfInstrumentation *InstrumentationConfig `config:"instrumentation"` - RumConfig *RumConfig `config:"rum"` - Register *RegisterConfig `config:"register"` - Mode Mode `config:"mode"` - Kibana KibanaConfig `config:"kibana"` - AgentConfig *AgentConfig `config:"agent.config"` - SecretToken string `config:"secret_token"` - APIKeyConfig *APIKeyConfig `config:"api_key"` - JaegerConfig JaegerConfig `config:"jaeger"` - Aggregation AggregationConfig `config:"aggregation"` - Sampling SamplingConfig `config:"sampling"` - DataStreams DataStreamsConfig `config:"data_streams"` + Host string `config:"host"` + MaxHeaderSize int `config:"max_header_size"` + IdleTimeout time.Duration `config:"idle_timeout"` + ReadTimeout time.Duration `config:"read_timeout"` + WriteTimeout time.Duration `config:"write_timeout"` + MaxEventSize int `config:"max_event_size"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` + TLS *tlscommon.ServerConfig `config:"ssl"` + MaxConnections int `config:"max_connections"` + ResponseHeaders map[string][]string `config:"response_headers"` + Expvar *ExpvarConfig `config:"expvar"` + AugmentEnabled bool `config:"capture_personal_data"` + SelfInstrumentation *InstrumentationConfig `config:"instrumentation"` + RumConfig *RumConfig `config:"rum"` + Register *RegisterConfig `config:"register"` + Mode Mode `config:"mode"` + Kibana KibanaConfig `config:"kibana"` + AgentConfig *AgentConfig `config:"agent.config"` + SecretToken string `config:"secret_token"` + APIKeyConfig *APIKeyConfig `config:"api_key"` + JaegerConfig JaegerConfig `config:"jaeger"` + Aggregation AggregationConfig `config:"aggregation"` + Sampling SamplingConfig `config:"sampling"` + DataStreams DataStreamsConfig `config:"data_streams"` + DefaultServiceEnvironment string `config:"default_service_environment"` Pipeline string } diff --git a/beater/config/config_test.go b/beater/config/config_test.go index cc4bdd7bc1f..4399e8c91c5 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -136,6 +136,7 @@ func TestUnpackConfig(t *testing.T) { "max_groups": 456, }, }, + "default_service_environment": "overridden", }, outCfg: &Config{ Host: "localhost:3000", @@ -245,6 +246,7 @@ func TestUnpackConfig(t *testing.T) { TTL: 30 * time.Minute, }, }, + DefaultServiceEnvironment: "overridden", }, }, "merge config with default": { diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 34742ecd28d..d0c8cd4fc66 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -16,6 +16,7 @@ https://github.com/elastic/apm-server/compare/7.12\...master[View commits] ==== Added * Add support for Node.js wall time profiles {pull}4728[4728] * Add metricset.name field to metric docs {pull}4857[4857] +* Add `apm-server.default_service_environment` config {pull}4861[4861] [float] ==== Deprecated diff --git a/docs/configuration-process.asciidoc b/docs/configuration-process.asciidoc index c3f8feafdaa..ed4f795a20b 100644 --- a/docs/configuration-process.asciidoc +++ b/docs/configuration-process.asciidoc @@ -106,6 +106,11 @@ If true, APM Server captures the IP of the instrumented service and its User Agent if any. Enabled by default. +[[default_service_environment]] +[float] +==== `default_service_environment` +Sets the default service environment to associate with data and requests received from agents which have no service environment defined. + [[expvar.enabled]] [float] ==== `expvar.enabled` diff --git a/model/modelprocessor/environment.go b/model/modelprocessor/environment.go new file mode 100644 index 00000000000..06423f35ae3 --- /dev/null +++ b/model/modelprocessor/environment.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/transform" +) + +// SetDefaultServiceEnvironment is a transform.Processor that sets a default +// service.environment value for events without one already set. +type SetDefaultServiceEnvironment struct { + // DefaultServiceEnvironment is the default service.environment value + // to set for events without one already set. + DefaultServiceEnvironment string +} + +// ProcessTransformables sets a default service.value for events without one already set. +func (s *SetDefaultServiceEnvironment) ProcessTransformables(ctx context.Context, in []transform.Transformable) ([]transform.Transformable, error) { + for _, t := range in { + switch t := t.(type) { + case *model.Transaction: + if t.Metadata.Service.Environment == "" { + t.Metadata.Service.Environment = s.DefaultServiceEnvironment + } + case *model.Span: + if t.Metadata.Service.Environment == "" { + t.Metadata.Service.Environment = s.DefaultServiceEnvironment + } + case *model.Metricset: + if t.Metadata.Service.Environment == "" { + t.Metadata.Service.Environment = s.DefaultServiceEnvironment + } + case *model.Error: + if t.Metadata.Service.Environment == "" { + t.Metadata.Service.Environment = s.DefaultServiceEnvironment + } + case *model.PprofProfile: + if t.Metadata.Service.Environment == "" { + t.Metadata.Service.Environment = s.DefaultServiceEnvironment + } + } + } + return in, nil +} diff --git a/model/modelprocessor/environment_test.go b/model/modelprocessor/environment_test.go new file mode 100644 index 00000000000..56f78a75ede --- /dev/null +++ b/model/modelprocessor/environment_test.go @@ -0,0 +1,71 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" + "github.com/elastic/apm-server/transform" +) + +func TestSetDefaultServiceEnvironment(t *testing.T) { + nonEmptyMetadata := model.Metadata{Service: model.Service{Environment: "nonempty"}} + in := []transform.Transformable{ + // Should be left alone. + &model.Transaction{Metadata: nonEmptyMetadata}, + &model.Span{Metadata: nonEmptyMetadata}, + &model.Metricset{Metadata: nonEmptyMetadata}, + &model.Error{Metadata: nonEmptyMetadata}, + &model.PprofProfile{Metadata: nonEmptyMetadata}, + + // Should be updated. + &model.Transaction{}, + &model.Span{}, + &model.Metricset{}, + &model.Error{}, + &model.PprofProfile{}, + } + + processor := modelprocessor.SetDefaultServiceEnvironment{ + DefaultServiceEnvironment: "default", + } + out, err := processor.ProcessTransformables(context.Background(), in) + assert.NoError(t, err) + + defaultMetadata := model.Metadata{Service: model.Service{Environment: "default"}} + assert.Equal(t, []transform.Transformable{ + // Should be left alone. + &model.Transaction{Metadata: nonEmptyMetadata}, + &model.Span{Metadata: nonEmptyMetadata}, + &model.Metricset{Metadata: nonEmptyMetadata}, + &model.Error{Metadata: nonEmptyMetadata}, + &model.PprofProfile{Metadata: nonEmptyMetadata}, + + // Should be updated. + &model.Transaction{Metadata: defaultMetadata}, + &model.Span{Metadata: defaultMetadata}, + &model.Metricset{Metadata: defaultMetadata}, + &model.Error{Metadata: defaultMetadata}, + &model.PprofProfile{Metadata: defaultMetadata}, + }, out) +} diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index e5d496fc5c0..bb4c8c19dc1 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -42,14 +42,15 @@ const ( // Config holds APM Server configuration. type Config struct { - SecretToken string `json:"apm-server.secret_token,omitempty"` - Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"` - Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"` - Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"` - Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` - RUM *RUMConfig `json:"apm-server.rum,omitempty"` - DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"` - APIKey *APIKeyConfig `json:"apm-server.api_key,omitempty"` + SecretToken string `json:"apm-server.secret_token,omitempty"` + Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"` + Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"` + Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"` + Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` + RUM *RUMConfig `json:"apm-server.rum,omitempty"` + DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"` + APIKey *APIKeyConfig `json:"apm-server.api_key,omitempty"` + DefaultServiceEnvironment string `json:"apm-server.default_service_environment,omitempty"` // ResponseHeaders holds headers to add to all APM Server HTTP responses. ResponseHeaders http.Header `json:"apm-server.response_headers,omitempty"` diff --git a/systemtest/environment_test.go b/systemtest/environment_test.go new file mode 100644 index 00000000000..5bc4ce4a0a7 --- /dev/null +++ b/systemtest/environment_test.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package systemtest_test + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" + + "github.com/elastic/apm-server/systemtest" + "github.com/elastic/apm-server/systemtest/apmservertest" + "github.com/elastic/apm-server/systemtest/estest" +) + +func TestDefaultServiceEnvironment(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + srv.Config.DefaultServiceEnvironment = "default" + err := srv.Start() + require.NoError(t, err) + + defer os.Unsetenv("ELASTIC_APM_ENVIRONMENT") + tracerDefaultEnvironment := srv.Tracer() + + os.Setenv("ELASTIC_APM_ENVIRONMENT", "specified") + tracerSpecifiedEnvironment := srv.Tracer() + + tracerDefaultEnvironment.StartTransaction("default_environment", "type").End() + tracerDefaultEnvironment.Flush(nil) + + tracerSpecifiedEnvironment.StartTransaction("specified_environment", "type").End() + tracerSpecifiedEnvironment.Flush(nil) + + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", + estest.TermQuery{Field: "processor.event", Value: "transaction"}, + ) + environments := make(map[string]string) + for _, hit := range result.Hits.Hits { + transactionName := gjson.GetBytes(hit.RawSource, "transaction.name").String() + serviceEnvironment := gjson.GetBytes(hit.RawSource, "service.environment").String() + environments[transactionName] = serviceEnvironment + } + assert.Equal(t, map[string]string{ + "default_environment": "default", + "specified_environment": "specified", + }, environments) +} diff --git a/transform/transform.go b/transform/transform.go index 459bc171985..0a3aae7026b 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -26,6 +26,14 @@ import ( "github.com/elastic/apm-server/sourcemap" ) +// Processor can be used to process a set of Tramsformables, giving +// the opportunity to add or remove by returning a new slice. +type Processor interface { + ProcessTransformables(context.Context, []Transformable) ([]Transformable, error) +} + +// Transformable is an interface implemented by all top-level model objects for +// translating to beat.Events. type Transformable interface { Transform(context.Context, *Config) []beat.Event } diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index 91e02c81b6b..f1f0e6f133b 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -18,7 +18,6 @@ import ( "github.com/elastic/apm-server/beater" "github.com/elastic/apm-server/elasticsearch" - "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/spanmetrics" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/txmetrics" @@ -41,7 +40,7 @@ type namedProcessor struct { } type processor interface { - ProcessTransformables(context.Context, []transform.Transformable) ([]transform.Transformable, error) + transform.Processor Run() error Stop(context.Context) error } @@ -176,17 +175,11 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc return runServer(ctx, args) } - origReport := args.Reporter - args.Reporter = func(ctx context.Context, req publish.PendingReq) error { - var err error - for _, p := range processors { - req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables) - if err != nil { - return err - } - } - return origReport(ctx, req) + transformProcessors := make([]transform.Processor, len(processors)) + for i, p := range processors { + transformProcessors[i] = p } + runServer = beater.WrapRunServerWithProcessors(runServer, transformProcessors...) g, ctx := errgroup.WithContext(ctx) for _, p := range processors {