Skip to content

Commit

Permalink
Introduce a configurable default service environment (#4861)
Browse files Browse the repository at this point in the history
* Add transform.Processor interface

* beater: add default service environment

* model/modelprocessor: update PprofProfile too

* apmpackage: update README about namespace
  • Loading branch information
axw authored Feb 26, 2021
1 parent acc9d81 commit 992699d
Show file tree
Hide file tree
Showing 21 changed files with 396 additions and 88 deletions.
5 changes: 5 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
5 changes: 5 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
5 changes: 5 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
6 changes: 5 additions & 1 deletion apmpackage/apm/0.1.0/_dev/docs/README.template.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ If you need additional pipelines, override ILM policies, etc; you must do it ext
#### Data Streams

When using the APM integration, apm events are indexed into data streams. Data stream names contain the event type,
the service name, and a user configurable namespace.
the service name, and a user configurable namespace.

There is no specific recommendaton for what to use as a namespace; it is intentionally flexible.
You might use the environment (production, testing, development) as the namespace,
or alternatively you could namespace data by business unit. It is your choice.

## Compatibility and limitations

Expand Down
1 change: 1 addition & 0 deletions apmpackage/apm/0.1.0/agent/input/template.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apm-server:
secret_token: {{secret_token}}
max_event_size: {{max_event_bytes}}
capture_personal_data: {{capture_personal_data}}
default_service_environment: {{default_service_environment}}
kibana:
api_key: {{kibana_api_key}}
rum:
Expand Down
6 changes: 5 additions & 1 deletion apmpackage/apm/0.1.0/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ If you need additional pipelines, override ILM policies, etc; you must do it ext
#### Data Streams

When using the APM integration, apm events are indexed into data streams. Data stream names contain the event type,
the service name, and a user configurable namespace.
the service name, and a user configurable namespace.

There is no specific recommendaton for what to use as a namespace; it is intentionally flexible.
You might use the environment (production, testing, development) as the namespace,
or alternatively you could namespace data by business unit. It is your choice.

## Compatibility and limitations

Expand Down
6 changes: 6 additions & 0 deletions apmpackage/apm/0.1.0/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ policy_templates:
required: true
show_user: true
default: false
- name: default_service_environment
type: string
title: Default Service Environment
description: Default service environment to record in events which have no service environment defined.
required: false
show_user: false
- name: rum_allow_origins
type: string
title: RUM - Origin Headers
Expand Down
5 changes: 4 additions & 1 deletion beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

// Handler returns a request.Handler for managing agent central configuration requests.
func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
func Handler(client kibana.Client, config *config.AgentConfig, defaultServiceEnvironment string) request.Handler {
cacheControl := fmt.Sprintf("max-age=%v, must-revalidate", config.Cache.Expiration.Seconds())
fetcher := agentcfg.NewFetcher(client, config.Cache.Expiration)

Expand All @@ -88,6 +88,9 @@ func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
c.Write()
return
}
if query.Service.Environment == "" {
query.Service.Environment = defaultServiceEnvironment
}

result, err := fetcher.Fetch(c.Request.Context(), query)
if err != nil {
Expand Down
111 changes: 71 additions & 40 deletions beater/api/config/agent/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -181,14 +182,12 @@ func TestAgentConfigHandler(t *testing.T) {
for name, tc := range testcases {

runTest := func(t *testing.T, expectedBody map[string]string, auth authorization.Authorization) {
h := Handler(tc.kbClient, &cfg)
w := httptest.NewRecorder()
h := Handler(tc.kbClient, &cfg, "")
r := httptest.NewRequest(tc.method, target(tc.queryParams), nil)
for k, v := range tc.requestHeader {
r.Header.Set(k, v)
}
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.Authorization = auth
h(ctx)

Expand All @@ -214,13 +213,9 @@ func TestAgentConfigHandler(t *testing.T) {

func TestAgentConfigHandler_NoKibanaClient(t *testing.T) {
cfg := config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(nil, &cfg)

w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, httptest.NewRequest(http.MethodGet, "/config", nil))
h(ctx)
h := Handler(nil, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodGet, "/config", nil))
assert.Equal(t, http.StatusServiceUnavailable, w.Code, w.Body.String())
}

Expand All @@ -236,25 +231,43 @@ func TestAgentConfigHandler_PostOk(t *testing.T) {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg)

w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
h(ctx)
h := Handler(kb, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
assert.Equal(t, http.StatusOK, w.Code, w.Body.String())
}

func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) {
kb := &recordingKibanaClient{
Client: tests.MockKibana(http.StatusOK, m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": 0.5,
},
},
}, mockVersion, true),
}

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg, "default")

sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}})))
sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}})))
require.Len(t, kb.requests, 2)

body0, _ := ioutil.ReadAll(kb.requests[0].Body)
body1, _ := ioutil.ReadAll(kb.requests[1].Body)
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"specified"},"etag":""}`, string(body0))
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"default"},"etag":""}`, string(body1))
}

func TestAgentConfigRum(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -266,22 +279,18 @@ func TestAgentConfigRum(t *testing.T) {

func TestAgentConfigRumEtag(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil)
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String())
}

func TestAgentConfigNotRum(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
Expand All @@ -291,11 +300,9 @@ func TestAgentConfigNotRum(t *testing.T) {

func TestAgentConfigNoLeak(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -306,11 +313,9 @@ func TestAgentConfigNoLeak(t *testing.T) {

func TestAgentConfigRateLimit(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0)
h(ctx)
Expand All @@ -334,7 +339,7 @@ func getHandler(agent string) request.Handler {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
return Handler(kb, &cfg)
return Handler(kb, &cfg, "")
}

func TestIfNoneMatch(t *testing.T) {
Expand All @@ -358,22 +363,31 @@ func TestAgentConfigTraceContext(t *testing.T) {
kibanaCfg := config.KibanaConfig{Enabled: true, ClientConfig: libkibana.DefaultClientConfig()}
kibanaCfg.Host = "testKibana:12345"
client := kibana.NewConnectingClient(&kibanaCfg)
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}})
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}}, "")
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
// When the handler is called with a context containing
// a transaction, the underlying Kibana query should create a span
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
r = r.WithContext(ctx)
c := request.NewContext()
c.Reset(w, r)
handler(c)
sendRequest(handler, r.WithContext(ctx))
})
require.Len(t, spans, 1)
assert.Equal(t, "app", spans[0].Type)
}

func sendRequest(h request.Handler, r *http.Request) *httptest.ResponseRecorder {
ctx, recorder := newRequestContext(r)
h(ctx)
return recorder
}

func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRecorder) {
w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, r)
return ctx, w
}

func target(params map[string]string) string {
t := "/config"
if len(params) == 0 {
Expand All @@ -385,3 +399,20 @@ func target(params map[string]string) string {
}
return t
}

type recordingKibanaClient struct {
kibana.Client
requests []*http.Request
}

func (c *recordingKibanaClient) Send(ctx context.Context, method string, path string, params url.Values, header http.Header, body io.Reader) (*http.Response, error) {
req := httptest.NewRequest(method, path, body)
req.URL.RawQuery = params.Encode()
for k, values := range header {
for _, v := range values {
req.Header.Add(k, v)
}
}
c.requests = append(c.requests, req.WithContext(ctx))
return c.Client.Send(ctx, method, path, params, header, body)
}
2 changes: 1 addition & 1 deletion beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func agentConfigHandler(cfg *config.Config, authHandler *authorization.Handler,
if cfg.Kibana.Enabled {
client = kibana.NewConnectingClient(&cfg.Kibana)
}
h := agent.Handler(client, cfg.AgentConfig)
h := agent.Handler(client, cfg.AgentConfig, cfg.DefaultServiceEnvironment)
msg := "Agent remote configuration is disabled. " +
"Configure the `apm-server.kibana` section in apm-server.yml to enable it. " +
"If you are using a RUM agent, you also need to configure the `apm-server.rum` section. " +
Expand Down
35 changes: 35 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sampling"
"github.com/elastic/apm-server/sourcemap"
Expand Down Expand Up @@ -352,6 +353,7 @@ func (s *serverRunner) run() error {
// behaviour into the processing/reporting pipeline.
runServer = s.wrapRunServer(runServer)
}
runServer = s.wrapRunServerWithPreprocessors(runServer)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
if err != nil {
Expand Down Expand Up @@ -399,6 +401,16 @@ func (s *serverRunner) run() error {
return publisher.Stop(s.backgroundContext)
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
var processors []transform.Processor
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
return WrapRunServerWithProcessors(runServer, processors...)
}

// checkConfig verifies the global configuration doesn't use unsupported settings
//
// TODO(axw) remove this, nobody expects dashboard setup from apm-server.
Expand Down Expand Up @@ -592,3 +604,26 @@ func newSourcemapStore(beatInfo beat.Info, cfg *config.SourceMapping) (*sourcema
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
// with a function that transformables are first passed through the given
// processors in order.
func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...transform.Processor) RunServerFunc {
if len(processors) == 0 {
return runServer
}
return func(ctx context.Context, args ServerParams) error {
origReporter := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
var err error
for _, p := range processors {
req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables)
if err != nil {
return err
}
}
return origReporter(ctx, req)
}
return runServer(ctx, args)
}
}
Loading

0 comments on commit 992699d

Please sign in to comment.