diff --git a/systemtest/containers.go b/systemtest/containers.go index 697ca431eb1..fd231687560 100644 --- a/systemtest/containers.go +++ b/systemtest/containers.go @@ -54,7 +54,7 @@ var ( systemtestDir string ) -func init() { +func initContainers() { _, filename, _, ok := runtime.Caller(0) if !ok { panic("could not locate systemtest directory") diff --git a/systemtest/elasticsearch.go b/systemtest/elasticsearch.go index 0a861755308..5bb6c4f48a5 100644 --- a/systemtest/elasticsearch.go +++ b/systemtest/elasticsearch.go @@ -44,7 +44,7 @@ var ( Elasticsearch *espoll.Client ) -func init() { +func initElasticSearch() { cfg := newElasticsearchConfig() cfg.Username = adminElasticsearchUser cfg.Password = adminElasticsearchPass diff --git a/systemtest/intake_test.go b/systemtest/intake_test.go index 8f271800688..6f5e6c263c7 100644 --- a/systemtest/intake_test.go +++ b/systemtest/intake_test.go @@ -18,7 +18,12 @@ package systemtest_test import ( + "context" + "strings" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/elastic/apm-server/systemtest" "github.com/elastic/apm-server/systemtest/apmservertest" @@ -73,3 +78,28 @@ func TestIntake(t *testing.T) { } } + +func TestIntakeMalformed(t *testing.T) { + // Setup a custom ingest pipeline to test a malformed data ingestion. + r, err := systemtest.Elasticsearch.Ingest.PutPipeline( + "traces-apm@custom", + strings.NewReader(`{"processors":[{"set":{"field":"span.duration.us","value":"poison"}}]}`), + ) + require.NoError(t, err) + require.False(t, r.IsError()) + defer systemtest.Elasticsearch.Ingest.DeletePipeline("traces-apm@custom") + // Test malformed intake data. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + srv := apmservertest.NewServerTB(t) + systemtest.CleanupElasticsearch(t) + response := systemtest.SendBackendEventsPayload(t, srv.URL, "../testdata/intake-v2/spans.ndjson") + _, err = systemtest.Elasticsearch.SearchIndexMinDocs( + ctx, + response.Accepted, + "traces-apm*", + nil, + espoll.WithTimeout(10*time.Second), + ) + require.Error(t, err, "No traces should be indexed due to traces-apm@custom pipeline") +} diff --git a/systemtest/kibana.go b/systemtest/kibana.go index 3c73121808c..5384e875109 100644 --- a/systemtest/kibana.go +++ b/systemtest/kibana.go @@ -63,7 +63,7 @@ var ( IntegrationPackage *fleettest.Package ) -func init() { +func initKibana() { kibanaConfig := apmservertest.DefaultConfig().Kibana u, err := url.Parse(kibanaConfig.Host) if err != nil { diff --git a/systemtest/main_test.go b/systemtest/main_test.go index c2a0bbbd937..584b83203c7 100644 --- a/systemtest/main_test.go +++ b/systemtest/main_test.go @@ -25,10 +25,14 @@ import ( func TestMain(m *testing.M) { log.Println("INFO: starting stack containers...") + initContainers() if err := StartStackContainers(); err != nil { log.Fatalf("failed to start stack containers: %v", err) } - + initElasticSearch() + initKibana() + initSettings() + initOTEL() log.Println("INFO: running system tests...") os.Exit(m.Run()) } diff --git a/systemtest/otlp.go b/systemtest/otlp.go new file mode 100644 index 00000000000..803c93eb9d1 --- /dev/null +++ b/systemtest/otlp.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package systemtest + +import "go.opentelemetry.io/otel" + +var OtelErrors = make(chan error, 1) + +func initOTEL() { + // otel.SetErrorHandler can only be called once per process. + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + if err == nil { + return + } + select { + case OtelErrors <- err: + default: + } + })) +} diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 37996c01f3f..73e8f3f75c3 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -34,7 +34,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" semconv "go.opentelemetry.io/collector/semconv/v1.5.0" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" @@ -58,21 +57,6 @@ import ( "github.com/elastic/apm-tools/pkg/espoll" ) -var otelErrors = make(chan error, 1) - -func init() { - // otel.SetErrorHandler can only be called once per process. - otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { - if err == nil { - return - } - select { - case otelErrors <- err: - default: - } - })) -} - func TestOTLPGRPCTraces(t *testing.T) { systemtest.CleanupElasticsearch(t) srv := apmservertest.NewServerTB(t) @@ -623,7 +607,7 @@ func flushTracerProvider(ctx context.Context, tracerProvider *sdktrace.TracerPro return err } select { - case err := <-otelErrors: + case err := <-systemtest.OtelErrors: return err default: return nil @@ -653,7 +637,7 @@ func sendOTLPMetrics( return err } select { - case err := <-otelErrors: + case err := <-systemtest.OtelErrors: return err default: return nil diff --git a/systemtest/settings.go b/systemtest/settings.go new file mode 100644 index 00000000000..8d1384b8d8d --- /dev/null +++ b/systemtest/settings.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package systemtest + +import ( + "fmt" + "log" + "strings" +) + +func initSettings() { + // Proactively test with more strict + // "ignore_malformed" mode by default. + for _, t := range []string{ + "traces", + "metrics", + "logs-apm.error", + "logs-apm.app", + } { + if err := DisableIgnoreMalformed(t); err != nil { + log.Fatalf("failed to configure ignore_malformed %v", err) + } + } +} + +// DisableIgnoreMalformed updates component template index setting +// to disable "ignore_malformed" inside mappings. +func DisableIgnoreMalformed(componentTemplate string) error { + r, err := Elasticsearch.Cluster.PutComponentTemplate( + fmt.Sprintf("%s@custom", componentTemplate), + strings.NewReader(`{"template":{"settings":{"index":{"mapping":{"ignore_malformed":"false"}}}}}`), + ) + if err != nil { + return err + } + if r.IsError() { + return fmt.Errorf(`request to update "ignore_malformed":"false" failed for %s`, componentTemplate) + } + return nil +}