Skip to content

Commit

Permalink
beater: add default service environment
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Feb 22, 2021
1 parent 22659fc commit 7f8c1c6
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 74 deletions.
5 changes: 4 additions & 1 deletion beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
)

// Handler returns a request.Handler for managing agent central configuration requests.
func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
func Handler(client kibana.Client, config *config.AgentConfig, defaultServiceEnvironment string) request.Handler {
cacheControl := fmt.Sprintf("max-age=%v, must-revalidate", config.Cache.Expiration.Seconds())
fetcher := agentcfg.NewFetcher(client, config.Cache.Expiration)

Expand All @@ -88,6 +88,9 @@ func Handler(client kibana.Client, config *config.AgentConfig) request.Handler {
c.Write()
return
}
if query.Service.Environment == "" {
query.Service.Environment = defaultServiceEnvironment
}

result, err := fetcher.Fetch(c.Request.Context(), query)
if err != nil {
Expand Down
111 changes: 71 additions & 40 deletions beater/api/config/agent/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -181,14 +182,12 @@ func TestAgentConfigHandler(t *testing.T) {
for name, tc := range testcases {

runTest := func(t *testing.T, expectedBody map[string]string, auth authorization.Authorization) {
h := Handler(tc.kbClient, &cfg)
w := httptest.NewRecorder()
h := Handler(tc.kbClient, &cfg, "")
r := httptest.NewRequest(tc.method, target(tc.queryParams), nil)
for k, v := range tc.requestHeader {
r.Header.Set(k, v)
}
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.Authorization = auth
h(ctx)

Expand All @@ -214,13 +213,9 @@ func TestAgentConfigHandler(t *testing.T) {

func TestAgentConfigHandler_NoKibanaClient(t *testing.T) {
cfg := config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(nil, &cfg)

w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, httptest.NewRequest(http.MethodGet, "/config", nil))
h(ctx)
h := Handler(nil, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodGet, "/config", nil))
assert.Equal(t, http.StatusServiceUnavailable, w.Code, w.Body.String())
}

Expand All @@ -236,25 +231,43 @@ func TestAgentConfigHandler_PostOk(t *testing.T) {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg)

w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
h(ctx)
h := Handler(kb, &cfg, "")

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
assert.Equal(t, http.StatusOK, w.Code, w.Body.String())
}

func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) {
kb := &recordingKibanaClient{
Client: tests.MockKibana(http.StatusOK, m{
"_id": "1",
"_source": m{
"settings": m{
"sampling_rate": 0.5,
},
},
}, mockVersion, true),
}

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
h := Handler(kb, &cfg, "default")

sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}})))
sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}})))
require.Len(t, kb.requests, 2)

body0, _ := ioutil.ReadAll(kb.requests[0].Body)
body1, _ := ioutil.ReadAll(kb.requests[1].Body)
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"specified"},"etag":""}`, string(body0))
assert.Equal(t, `{"service":{"name":"opbeans-node","environment":"default"},"etag":""}`, string(body1))
}

func TestAgentConfigRum(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -266,22 +279,18 @@ func TestAgentConfigRum(t *testing.T) {

func TestAgentConfigRumEtag(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil)
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String())
}

func TestAgentConfigNotRum(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
Expand All @@ -291,11 +300,9 @@ func TestAgentConfigNotRum(t *testing.T) {

func TestAgentConfigNoLeak(t *testing.T) {
h := getHandler("node-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
h(ctx)
var actual map[string]string
Expand All @@ -306,11 +313,9 @@ func TestAgentConfigNoLeak(t *testing.T) {

func TestAgentConfigRateLimit(t *testing.T) {
h := getHandler("rum-js")
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx := request.NewContext()
ctx.Reset(w, r)
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0)
h(ctx)
Expand All @@ -334,7 +339,7 @@ func getHandler(agent string) request.Handler {
}, mockVersion, true)

var cfg = config.AgentConfig{Cache: &config.Cache{Expiration: time.Nanosecond}}
return Handler(kb, &cfg)
return Handler(kb, &cfg, "")
}

func TestIfNoneMatch(t *testing.T) {
Expand All @@ -358,22 +363,31 @@ func TestAgentConfigTraceContext(t *testing.T) {
kibanaCfg := config.KibanaConfig{Enabled: true, ClientConfig: libkibana.DefaultClientConfig()}
kibanaCfg.Host = "testKibana:12345"
client := kibana.NewConnectingClient(&kibanaCfg)
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}})
handler := Handler(client, &config.AgentConfig{Cache: &config.Cache{Expiration: 5 * time.Minute}}, "")
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
// When the handler is called with a context containing
// a transaction, the underlying Kibana query should create a span
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
r = r.WithContext(ctx)
c := request.NewContext()
c.Reset(w, r)
handler(c)
sendRequest(handler, r.WithContext(ctx))
})
require.Len(t, spans, 1)
assert.Equal(t, "app", spans[0].Type)
}

func sendRequest(h request.Handler, r *http.Request) *httptest.ResponseRecorder {
ctx, recorder := newRequestContext(r)
h(ctx)
return recorder
}

func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRecorder) {
w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, r)
return ctx, w
}

func target(params map[string]string) string {
t := "/config"
if len(params) == 0 {
Expand All @@ -385,3 +399,20 @@ func target(params map[string]string) string {
}
return t
}

type recordingKibanaClient struct {
kibana.Client
requests []*http.Request
}

func (c *recordingKibanaClient) Send(ctx context.Context, method string, path string, params url.Values, header http.Header, body io.Reader) (*http.Response, error) {
req := httptest.NewRequest(method, path, body)
req.URL.RawQuery = params.Encode()
for k, values := range header {
for _, v := range values {
req.Header.Add(k, v)
}
}
c.requests = append(c.requests, req.WithContext(ctx))
return c.Client.Send(ctx, method, path, params, header, body)
}
2 changes: 1 addition & 1 deletion beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func agentConfigHandler(cfg *config.Config, authHandler *authorization.Handler,
if cfg.Kibana.Enabled {
client = kibana.NewConnectingClient(&cfg.Kibana)
}
h := agent.Handler(client, cfg.AgentConfig)
h := agent.Handler(client, cfg.AgentConfig, cfg.DefaultServiceEnvironment)
msg := "Agent remote configuration is disabled. " +
"Configure the `apm-server.kibana` section in apm-server.yml to enable it. " +
"If you are using a RUM agent, you also need to configure the `apm-server.rum` section. " +
Expand Down
13 changes: 13 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/sampling"
"github.com/elastic/apm-server/sourcemap"
Expand Down Expand Up @@ -352,6 +353,7 @@ func (s *serverRunner) run() error {
// behaviour into the processing/reporting pipeline.
runServer = s.wrapRunServer(runServer)
}
runServer = s.wrapRunServerWithPreprocessors(runServer)

transformConfig, err := newTransformConfig(s.beat.Info, s.config)
if err != nil {
Expand Down Expand Up @@ -399,6 +401,17 @@ func (s *serverRunner) run() error {
return publisher.Stop(s.backgroundContext)
}

func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) RunServerFunc {
var processors []transform.Processor
s.logger.Info("DefaultServiceEnvironment " + s.config.DefaultServiceEnvironment)
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
DefaultServiceEnvironment: s.config.DefaultServiceEnvironment,
})
}
return WrapRunServerWithProcessors(runServer, processors...)
}

// checkConfig verifies the global configuration doesn't use unsupported settings
//
// TODO(axw) remove this, nobody expects dashboard setup from apm-server.
Expand Down
49 changes: 25 additions & 24 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,31 @@ func defaultKibanaConfig() KibanaConfig {

// Config holds configuration information nested under the key `apm-server`
type Config struct {
Host string `config:"host"`
MaxHeaderSize int `config:"max_header_size"`
IdleTimeout time.Duration `config:"idle_timeout"`
ReadTimeout time.Duration `config:"read_timeout"`
WriteTimeout time.Duration `config:"write_timeout"`
MaxEventSize int `config:"max_event_size"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
TLS *tlscommon.ServerConfig `config:"ssl"`
MaxConnections int `config:"max_connections"`
ResponseHeaders map[string][]string `config:"response_headers"`
Expvar *ExpvarConfig `config:"expvar"`
AugmentEnabled bool `config:"capture_personal_data"`
SelfInstrumentation *InstrumentationConfig `config:"instrumentation"`
RumConfig *RumConfig `config:"rum"`
Register *RegisterConfig `config:"register"`
Mode Mode `config:"mode"`
Kibana KibanaConfig `config:"kibana"`
AgentConfig *AgentConfig `config:"agent.config"`
SecretToken string `config:"secret_token"`
APIKeyConfig *APIKeyConfig `config:"api_key"`
JaegerConfig JaegerConfig `config:"jaeger"`
Aggregation AggregationConfig `config:"aggregation"`
Sampling SamplingConfig `config:"sampling"`
DataStreams DataStreamsConfig `config:"data_streams"`
Host string `config:"host"`
MaxHeaderSize int `config:"max_header_size"`
IdleTimeout time.Duration `config:"idle_timeout"`
ReadTimeout time.Duration `config:"read_timeout"`
WriteTimeout time.Duration `config:"write_timeout"`
MaxEventSize int `config:"max_event_size"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
TLS *tlscommon.ServerConfig `config:"ssl"`
MaxConnections int `config:"max_connections"`
ResponseHeaders map[string][]string `config:"response_headers"`
Expvar *ExpvarConfig `config:"expvar"`
AugmentEnabled bool `config:"capture_personal_data"`
SelfInstrumentation *InstrumentationConfig `config:"instrumentation"`
RumConfig *RumConfig `config:"rum"`
Register *RegisterConfig `config:"register"`
Mode Mode `config:"mode"`
Kibana KibanaConfig `config:"kibana"`
AgentConfig *AgentConfig `config:"agent.config"`
SecretToken string `config:"secret_token"`
APIKeyConfig *APIKeyConfig `config:"api_key"`
JaegerConfig JaegerConfig `config:"jaeger"`
Aggregation AggregationConfig `config:"aggregation"`
Sampling SamplingConfig `config:"sampling"`
DataStreams DataStreamsConfig `config:"data_streams"`
DefaultServiceEnvironment string `config:"default_service_environment"`

Pipeline string
}
Expand Down
2 changes: 2 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func TestUnpackConfig(t *testing.T) {
"max_groups": 456,
},
},
"default_service_environment": "overridden",
},
outCfg: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -245,6 +246,7 @@ func TestUnpackConfig(t *testing.T) {
TTL: 30 * time.Minute,
},
},
DefaultServiceEnvironment: "overridden",
},
},
"merge config with default": {
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits]
* Upgrade Go to 1.15.8 {pull}4733[4733]
* Add support for Node.js wall time profiles {pull}4728[4728]
* Add metricset.name field to metric docs {pull}4857[4857]
* Add `apm-server.default_service_environment` config {pull}4861[4861]

[float]
==== Deprecated
Expand Down
Loading

0 comments on commit 7f8c1c6

Please sign in to comment.