Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a configurable default service environment #4861

Merged
merged 8 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
5 changes: 5 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
5 changes: 5 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ apm-server:
# or the IP and User Agent of the real user (RUM requests).
#capture_personal_data: true

# If specified, APM Server will record this value in events which have no service environment
# defined, and add it to agent configuration queries to Kibana when none is specified in the
# request from the agent.
#default_service_environment:

# Enable APM Server Golang expvar support (https://golang.org/pkg/expvar/).
#expvar:
#enabled: false
Expand Down
1 change: 1 addition & 0 deletions apmpackage/apm/0.1.0/agent/input/template.yml.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apm-server:
secret_token: {{secret_token}}
max_event_size: {{max_event_bytes}}
capture_personal_data: {{capture_personal_data}}
default_service_environment: {{default_service_environment}}
kibana:
api_key: {{kibana_api_key}}
rum:
Expand Down
6 changes: 6 additions & 0 deletions apmpackage/apm/0.1.0/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ policy_templates:
required: false
description: Enter as <Id>:<API Key>
show_user: true
- name: default_service_environment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure about this, shouldn't we use the namespace instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep them separate for now, so users have the flexibility to have two dimensions:

  • a namespace is a collection of APM services, e.g. split by business applications, business unit, etc.
  • typical environments for those services: production, testing, development

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, that was news to me 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the README accordingly, it refers to using namespace for environment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, setting namespace to environment is still a reasonable choice. Perhaps we should just soften the language a little, to point it out as an option but not necessarily state that it is recommended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's what I meant, the additional use case for namespace tha you explained above, should be added to the README.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

type: string
title: Default Service Environment
description: Default service environment to record in events which have no service environment defined.
required: false
show_user: false
- name: api_key_limit
type: int
title: Maximum number of API Keys
Expand Down
5 changes: 4 additions & 1 deletion beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

// Handler returns a request.Handler for managing agent central configuration requests.
func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
func Handler(client kibana.Client, config *config.AgentConfig, defaultServiceEnvironment string) request.Handler {
cacheControl := fmt.Sprintf("max-age=%v, must-revalidate", config.Cache.Expiration.Seconds())
fetcher := agentcfg.NewFetcher(client, config.Cache.Expiration)

Expand All @@ -88,6 +88,9 @@ func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
c.Write()
return
}
if query.Service.Environment == "" {
query.Service.Environment = defaultServiceEnvironment
}

result, err := fetcher.Fetch(c.Request.Context(), query)
if err != nil {
Expand Down
111 changes: 71 additions & 40 deletions beater/api/config/agent/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -181,14 +182,12 @@ func TestAgentConfigHandler(t *testing.T) {
for name, tc := range testcases {

runTest := func(t *testing.T, expectedBody map[string]string, auth authorization.Authorization) {
h := Handler(tc.kbClient, &cfg)
w := httptest.NewRecorder()
h := Handler(tc.kbClient, &cfg, "")
r := httptest.NewRequest(tc.method, target(tc.queryParams), nil)
for k, v := range tc.requestHeader {
r.Header.Set(k, v)
}
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.Authorization = auth
h(ctx)

Expand All @@ -214,13 +213,9 @@ func TestAgentConfigHandler(t *testing.T) {

func TestAgentConfigHandler_NoKibanaClient(t *testing.T) {
cfg := config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(nil, &cfg)

w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, httptest.NewRequest(http.MethodGet, "/config", nil))
h(ctx)
h := Handler(nil, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodGet, "/config", nil))
assert.Equal(t, http.StatusServiceUnavailable, w.Code, w.Body.String())
}

Expand All @@ -236,25 +231,43 @@ func TestAgentConfigHandler_PostOk(t *testing.T) {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg)

w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
h(ctx)
h := Handler(kb, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
assert.Equal(t, http.StatusOK, w.Code, w.Body.String())
}

func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) {
kb := &recordingKibanaClient{
Client: tests.MockKibana(http.StatusOK, m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": 0.5,
},
},
}, mockVersion, true),
}

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg, "default")

sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}})))
sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}})))
require.Len(t, kb.requests, 2)

body0, _ := ioutil.ReadAll(kb.requests[0].Body)
body1, _ := ioutil.ReadAll(kb.requests[1].Body)
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"specified"},"etag":""}`, string(body0))
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"default"},"etag":""}`, string(body1))
}

func TestAgentConfigRum(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -266,22 +279,18 @@ func TestAgentConfigRum(t *testing.T) {

func TestAgentConfigRumEtag(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil)
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String())
}

func TestAgentConfigNotRum(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
Expand All @@ -291,11 +300,9 @@ func TestAgentConfigNotRum(t *testing.T) {

func TestAgentConfigNoLeak(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -306,11 +313,9 @@ func TestAgentConfigNoLeak(t *testing.T) {

func TestAgentConfigRateLimit(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0)
h(ctx)
Expand All @@ -334,7 +339,7 @@ func getHandler(agent string) request.Handler {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
return Handler(kb, &cfg)
return Handler(kb, &cfg, "")
}

func TestIfNoneMatch(t *testing.T) {
Expand All @@ -358,22 +363,31 @@ func TestAgentConfigTraceContext(t *testing.T) {
kibanaCfg := config.KibanaConfig{Enabled: true, ClientConfig: libkibana.DefaultClientConfig()}
kibanaCfg.Host = "testKibana:12345"
client := kibana.NewConnectingClient(&kibanaCfg)
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}})
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}}, "")
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
// When the handler is called with a context containing
// a transaction, the underlying Kibana query should create a span
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
r = r.WithContext(ctx)
c := request.NewContext()
c.Reset(w, r)
handler(c)
sendRequest(handler, r.WithContext(ctx))
})
require.Len(t, spans, 1)
assert.Equal(t, "app", spans[0].Type)
}

func sendRequest(h request.Handler, r *http.Request) *httptest.ResponseRecorder {
ctx, recorder := newRequestContext(r)
h(ctx)
return recorder
}

func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRecorder) {
w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, r)
return ctx, w
}

func target(params map[string]string) string {
t := "/config"
if len(params) == 0 {
Expand All @@ -385,3 +399,20 @@ func target(params map[string]string) string {
}
return t
}

type recordingKibanaClient struct {
kibana.Client
requests []*http.Request
}

func (c *recordingKibanaClient) Send(ctx context.Context, method string, path string, params url.Values, header http.Header, body io.Reader) (*http.Response, error) {
req := httptest.NewRequest(method, path, body)
req.URL.RawQuery = params.Encode()
for k, values := range header {
for _, v := range values {
req.Header.Add(k, v)
}
}
c.requests = append(c.requests, req.WithContext(ctx))
return c.Client.Send(ctx, method, path, params, header, body)
}
2 changes: 1 addition & 1 deletion beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func agentConfigHandler(cfg *config.Config, authHandler *authorization.Handler,
if cfg.Kibana.Enabled {
client = kibana.NewConnectingClient(&cfg.Kibana)
}
h := agent.Handler(client, cfg.AgentConfig)
h := agent.Handler(client, cfg.AgentConfig, cfg.DefaultServiceEnvironment)
msg := "Agent remote configuration is disabled. " +
"Configure the `apm-server.kibana` section in apm-server.yml to enable it. " +
"If you are using a RUM agent, you also need to configure the `apm-server.rum` section. " +
Expand Down
35 changes: 35 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sampling"
"github.com/elastic/apm-server/sourcemap"
Expand Down Expand Up @@ -352,6 +353,7 @@ func (s *serverRunner) run() error {
// behaviour into the processing/reporting pipeline.
runServer = s.wrapRunServer(runServer)
}
runServer = s.wrapRunServerWithPreprocessors(runServer)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
if err != nil {
Expand Down Expand Up @@ -399,6 +401,16 @@ func (s *serverRunner) run() error {
return publisher.Stop(s.backgroundContext)
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
var processors []transform.Processor
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
return WrapRunServerWithProcessors(runServer, processors...)
}

// checkConfig verifies the global configuration doesn't use unsupported settings
//
// TODO(axw) remove this, nobody expects dashboard setup from apm-server.
Expand Down Expand Up @@ -592,3 +604,26 @@ func newSourcemapStore(beatInfo beat.Info, cfg *config.SourceMapping) (*sourcema
index := strings.ReplaceAll(cfg.IndexPattern, "%{[observer.version]}", beatInfo.Version)
return sourcemap.NewStore(esClient, index, cfg.Cache.Expiration)
}

// WrapRunServerWithProcessors wraps runServer such that it wraps args.Reporter
// with a function that transformables are first passed through the given
// processors in order.
func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...transform.Processor) RunServerFunc {
if len(processors) == 0 {
return runServer
}
return func(ctx context.Context, args ServerParams) error {
origReporter := args.Reporter
args.Reporter = func(ctx context.Context, req publish.PendingReq) error {
var err error
for _, p := range processors {
req.Transformables, err = p.ProcessTransformables(ctx, req.Transformables)
if err != nil {
return err
}
}
return origReporter(ctx, req)
}
return runServer(ctx, args)
}
}
Loading