diff --git a/.github/workflows/data-prepper-trace-analytics-raw-span-compatibility-e2e-tests.yml b/.github/workflows/data-prepper-trace-analytics-raw-span-compatibility-e2e-tests.yml index 7707e2d61d..c76dbddbc6 100644 --- a/.github/workflows/data-prepper-trace-analytics-raw-span-compatibility-e2e-tests.yml +++ b/.github/workflows/data-prepper-trace-analytics-raw-span-compatibility-e2e-tests.yml @@ -24,5 +24,8 @@ jobs: java-version: ${{ matrix.java }} - name: Checkout Data-Prepper uses: actions/checkout@v2 - - name: Run raw-span compatibility end-to-end tests with Gradle - run: ./gradlew :e2e-test:trace:rawSpanCompatibilityEndToEndTest + # TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) + - name: Run raw-span OTLP record type latest release compatibility end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:rawSpanOTLPLatestReleaseCompatibilityEndToEndTest + - name: Run raw-span Event record type latest release compatibility end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:rawSpanEventLatestReleaseCompatibilityEndToEndTest diff --git a/.github/workflows/data-prepper-trace-analytics-raw-span-e2e-tests.yml b/.github/workflows/data-prepper-trace-analytics-raw-span-e2e-tests.yml index 61bf01de11..110bbb8e15 100644 --- a/.github/workflows/data-prepper-trace-analytics-raw-span-e2e-tests.yml +++ b/.github/workflows/data-prepper-trace-analytics-raw-span-e2e-tests.yml @@ -24,5 +24,8 @@ jobs: java-version: ${{ matrix.java }} - name: Checkout Data-Prepper uses: actions/checkout@v2 - - name: Run raw-span end-to-end tests with Gradle - run: ./gradlew :e2e-test:trace:rawSpanEndToEndTest \ No newline at end of file + # TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) + - name: Run raw-span OTLP record type end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:rawSpanOTLPEndToEndTest + - name: Run raw-span OTLP and Event end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:rawSpanOTLPAndEventEndToEndTest \ No newline at end of file diff --git a/.github/workflows/data-prepper-trace-analytics-service-map-e2e-tests.yml b/.github/workflows/data-prepper-trace-analytics-service-map-e2e-tests.yml index 5221cb2b07..9b28662f0a 100644 --- a/.github/workflows/data-prepper-trace-analytics-service-map-e2e-tests.yml +++ b/.github/workflows/data-prepper-trace-analytics-service-map-e2e-tests.yml @@ -24,5 +24,8 @@ jobs: java-version: ${{ matrix.java }} - name: Checkout Data-Prepper uses: actions/checkout@v2 - - name: Run service-map end-to-end tests with Gradle - run: ./gradlew :e2e-test:trace:serviceMapEndToEndTest \ No newline at end of file + # TODO: Event record type only in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) + - name: Run service-map OTLP record type end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:serviceMapOTLPEndToEndTest + - name: Run service-map OTLP and Event end-to-end tests with Gradle + run: ./gradlew :e2e-test:trace:serviceMapOTLPAndEventEndToEndTest \ No newline at end of file diff --git a/e2e-test/trace/README.md b/e2e-test/trace/README.md index f8b30ba74f..93a6010002 100644 --- a/e2e-test/trace/README.md +++ b/e2e-test/trace/README.md @@ -4,33 +4,72 @@ This module includes e2e tests for trace data ingestion supported by data-preppe ## Raw Span Ingestion Pipeline End-to-end test +### OTLP record type + +Run from current directory +``` +./gradlew :rawSpanOTLPEndToEndTest +``` +or from project root directory +``` +./gradlew :e2e-test:trace:rawSpanOTLPEndToEndTest +``` + +### Event record type compatibility with OTLP record type + Run from current directory ``` -./gradlew :rawSpanEndToEndTest +./gradlew :rawSpanOTLPAndEventEndToEndTest ``` or from project root directory ``` -./gradlew :e2e-test:trace:rawSpanEndToEndTest +./gradlew :e2e-test:trace:rawSpanOTLPAndEventEndToEndTest ``` -## Raw Span Ingestion Pipelines Compatibility End-to-end test +## Raw Span Ingestion Pipelines Latest Release Compatibility End-to-end test + +### OTLP record type compatibility with latest release Run from current directory ``` -./gradlew :rawSpanCompatibilityEndToEndTest +./gradlew :rawSpanOTLPLatestReleaseCompatibilityEndToEndTest ``` or from project root directory ``` -./gradlew :e2e-test:trace:rawSpanCompatibilityEndToEndTest +./gradlew :e2e-test:trace:rawSpanOTLPLatestReleaseCompatibilityEndToEndTest +``` + +### Event record type compatibility with latest release + +Run from current directory +``` +./gradlew :rawSpanEventLatestReleaseCompatibilityEndToEndTest +``` +or from project root directory +``` +./gradlew :e2e-test:trace:rawSpanEventLatestReleaseCompatibilityEndToEndTest ``` ## Service Map Ingestion Pipelines End-to-end test +### OTLP record type + +Run from current directory +``` +./gradlew :serviceMapOTLPEndToEndTest +``` +or from project root directory +``` +./gradlew :e2e-test:trace:serviceMapOTLPEndToEndTest +``` + +### Event record type compatibility with OTLP record type + Run from current directory ``` -./gradlew :serviceMapEndToEndTest +./gradlew :serviceMapOTLPAndEventEndToEndTest ``` or from project root directory ``` -./gradlew :e2e-test:trace:serviceMapEndToEndTest +./gradlew :e2e-test:trace:serviceMapOTLPAndEventEndToEndTest ``` diff --git a/e2e-test/trace/build.gradle b/e2e-test/trace/build.gradle index 8aae1604cf..5ef393d1fe 100644 --- a/e2e-test/trace/build.gradle +++ b/e2e-test/trace/build.gradle @@ -30,8 +30,11 @@ task removeDataPrepperNetwork(type: DockerRemoveNetwork) { } def RAW_SPAN_PIPELINE_YAML = "raw-span-e2e-pipeline.yml" +def RAW_SPAN_PIPELINE_EVENT_TYPE_YAML = "raw-span-e2e-pipeline-event-type.yml" +def RAW_SPAN_PIPELINE_PEER_FORWARDER_EVENT_TYPE_YAML = "raw-span-e2e-pipeline-peer-forwarder-event-type.yml" def RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML = "raw-span-e2e-pipeline-latest-release.yml" def SERVICE_MAP_PIPELINE_YAML = "service-map-e2e-pipeline.yml" +def SERVICE_MAP_PIPELINE_EVENT_TYPE_YAML = "service-map-e2e-pipeline-event-type.yml" /** * DataPrepper Docker tasks @@ -45,8 +48,11 @@ task createDataPrepperDockerFile(type: Dockerfile) { workingDir("/app") copyFile("${dataPrepperJarFilepath}", "/app/data-prepper.jar") copyFile("src/integrationTest/resources/${RAW_SPAN_PIPELINE_YAML}", "/app/${RAW_SPAN_PIPELINE_YAML}") + copyFile("src/integrationTest/resources/${RAW_SPAN_PIPELINE_EVENT_TYPE_YAML}", "/app/${RAW_SPAN_PIPELINE_EVENT_TYPE_YAML}") + copyFile("src/integrationTest/resources/${RAW_SPAN_PIPELINE_PEER_FORWARDER_EVENT_TYPE_YAML}", "/app/${RAW_SPAN_PIPELINE_PEER_FORWARDER_EVENT_TYPE_YAML}") copyFile("src/integrationTest/resources/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}", "/app/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") copyFile("src/integrationTest/resources/${SERVICE_MAP_PIPELINE_YAML}", "/app/${SERVICE_MAP_PIPELINE_YAML}") + copyFile("src/integrationTest/resources/${SERVICE_MAP_PIPELINE_EVENT_TYPE_YAML}", "/app/${SERVICE_MAP_PIPELINE_EVENT_TYPE_YAML}") copyFile("src/integrationTest/resources/data_prepper.yml", "/app/data_prepper.yml") defaultCommand("java", "-jar", "data-prepper.jar", "/app/${RAW_SPAN_PIPELINE_YAML}", "/app/data_prepper.yml") } @@ -148,120 +154,104 @@ task stopOpenSearchDockerContainer(type: DockerStopContainer) { * End to end test. Spins up OpenSearch and DataPrepper docker containers, then runs the integ test * Stops the docker containers when finished */ -task rawSpanEndToEndTest(type: Test) { - dependsOn build - dependsOn startOpenSearchDockerContainer - def createDataPrepper1Task = createDataPrepperDockerContainer( - "rawSpanDataPrepper1", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_YAML}") - def createDataPrepper2Task = createDataPrepperDockerContainer( - "rawSpanDataPrepper2", "dataprepper2", 21891, 4901, "/app/${RAW_SPAN_PIPELINE_YAML}") - def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) - def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) - dependsOn startDataPrepper1Task - dependsOn startDataPrepper2Task - startDataPrepper1Task.mustRunAfter 'startOpenSearchDockerContainer' - startDataPrepper2Task.mustRunAfter 'startOpenSearchDockerContainer' - // wait for data-preppers to be ready - doFirst { - sleep(10*1000) +def createEndToEndTest(final String testName, final String includeTestsMatchPattern, + final DockerCreateContainer createDataPrepper1Task, final DockerCreateContainer createDataPrepper2Task) { + return tasks.create(testName, Test) { + dependsOn build + dependsOn startOpenSearchDockerContainer + def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) + def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) + dependsOn startDataPrepper1Task + dependsOn startDataPrepper2Task + startDataPrepper1Task.mustRunAfter 'startOpenSearchDockerContainer' + startDataPrepper2Task.mustRunAfter 'startOpenSearchDockerContainer' + // wait for data-preppers to be ready + doFirst { + sleep(10*1000) + } + + description = 'Runs the raw span integration tests.' + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath + + filter { + includeTestsMatching includeTestsMatchPattern + } + + finalizedBy stopOpenSearchDockerContainer + def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) + def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) + finalizedBy stopDataPrepper1Task + finalizedBy stopDataPrepper2Task + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) + finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) + finalizedBy removeDataPrepperNetwork } +} - description = 'Runs the raw span integration tests.' - group = 'verification' - testClassesDirs = sourceSets.integrationTest.output.classesDirs - classpath = sourceSets.integrationTest.runtimeClasspath +def includeRawSpanTestsMatchPattern = "com.amazon.dataprepper.integration.trace.EndToEndRawSpanTest.testPipelineEndToEnd*" - filter { - includeTestsMatching "com.amazon.dataprepper.integration.trace.EndToEndRawSpanTest.testPipelineEndToEnd*" - } +def createRawSpanDataPrepperOTLP1Task = createDataPrepperDockerContainer( + "rawSpanDataPrepper1", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_YAML}") +def createRawSpanDataPrepperOTLP2Task = createDataPrepperDockerContainer( + "rawSpanDataPrepper2", "dataprepper2", 21891, 4901, "/app/${RAW_SPAN_PIPELINE_YAML}") - finalizedBy stopOpenSearchDockerContainer - def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) - def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) - finalizedBy stopDataPrepper1Task - finalizedBy stopDataPrepper2Task - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) - finalizedBy removeDataPrepperNetwork -} +// TODO: replace with Event type in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) +def rawSpanOTLPEndToEndTest = createEndToEndTest("rawSpanOTLPEndToEndTest", includeRawSpanTestsMatchPattern, + createRawSpanDataPrepperOTLP1Task, createRawSpanDataPrepperOTLP2Task) -task rawSpanCompatibilityEndToEndTest(type: Test) { - dependsOn build - dependsOn startOpenSearchDockerContainer - def createDataPrepper1Task = createDataPrepperDockerContainer( - "rawSpanDataPrepperFromBuild", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") - def createDataPrepper2Task = createDataPrepperDockerContainerFromPullImage( - "rawSpanDataPrepperFromPull", "dataprepper2", 21891, 4901, "src/integrationTest/resources/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") - def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) - def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) - dependsOn startDataPrepper1Task - dependsOn startDataPrepper2Task - startDataPrepper1Task.mustRunAfter 'startOpenSearchDockerContainer' - startDataPrepper2Task.mustRunAfter 'startOpenSearchDockerContainer' - // wait for data-preppers to be ready - doFirst { - sleep(10*1000) - } +def createRawSpanDataPrepperOTLPTask = createDataPrepperDockerContainer( + "rawSpanDataPrepperOTLP", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_YAML}") +def createRawSpanDataPrepperEventTask = createDataPrepperDockerContainer( + "rawSpanDataPrepperEvent", "dataprepper2", 21891, 4901, "/app/${RAW_SPAN_PIPELINE_EVENT_TYPE_YAML}") - description = 'Runs the raw span compatibility integration tests.' - group = 'verification' - testClassesDirs = sourceSets.integrationTest.output.classesDirs - classpath = sourceSets.integrationTest.runtimeClasspath +// TODO: remove in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) +def rawSpanOTLPAndEventEndToEndTest = createEndToEndTest("rawSpanOTLPAndEventEndToEndTest", includeRawSpanTestsMatchPattern, + createRawSpanDataPrepperOTLPTask, createRawSpanDataPrepperEventTask) - filter { - includeTestsMatching "com.amazon.dataprepper.integration.trace.EndToEndRawSpanTest.testPipelineEndToEnd*" - } +def rawSpanDataPrepperOTLPFromBuild = createDataPrepperDockerContainer( + "rawSpanDataPrepperOTLPFromBuild", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") +def rawSpanDataPrepperOTLPFromPull = createDataPrepperDockerContainerFromPullImage( + "rawSpanDataPrepperOTLPFromPull", "dataprepper2", 21891, 4901, "src/integrationTest/resources/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") - finalizedBy stopOpenSearchDockerContainer - def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) - def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) - finalizedBy stopDataPrepper1Task - finalizedBy stopDataPrepper2Task - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) - finalizedBy removeDataPrepperNetwork -} +// TODO: remove in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) +def rawSpanOTLPLatestReleaseCompatibilityEndToEndTest = createEndToEndTest("rawSpanOTLPLatestReleaseCompatibilityEndToEndTest", + includeRawSpanTestsMatchPattern, + rawSpanDataPrepperOTLPFromBuild, rawSpanDataPrepperOTLPFromPull) -task serviceMapEndToEndTest(type: Test) { - dependsOn build - dependsOn startOpenSearchDockerContainer - def createDataPrepper1Task = createDataPrepperDockerContainer( - "serviceMapDataPrepper1", "dataprepper1", 21890, 4900, "/app/${SERVICE_MAP_PIPELINE_YAML}") - def createDataPrepper2Task = createDataPrepperDockerContainer( - "serviceMapDataPrepper2", "dataprepper2", 21891, 4901, "/app/${SERVICE_MAP_PIPELINE_YAML}") - def startDataPrepper1Task = startDataPrepperDockerContainer(createDataPrepper1Task as DockerCreateContainer) - def startDataPrepper2Task = startDataPrepperDockerContainer(createDataPrepper2Task as DockerCreateContainer) - dependsOn startDataPrepper1Task - dependsOn startDataPrepper2Task - startDataPrepper1Task.mustRunAfter 'startOpenSearchDockerContainer' - startDataPrepper2Task.mustRunAfter 'startOpenSearchDockerContainer' - // wait for data-preppers to be ready - doFirst { - sleep(10*1000) - } +def rawSpanDataPrepperEventFromBuild = createDataPrepperDockerContainer( + "rawSpanDataPrepperEventFromBuild", "dataprepper1", 21890, 4900, "/app/${RAW_SPAN_PIPELINE_PEER_FORWARDER_EVENT_TYPE_YAML}") +def rawSpanDataPrepperLatestFromPull = createDataPrepperDockerContainerFromPullImage( + "rawSpanDataPrepperLatestFromPull", "dataprepper2", 21891, 4901, "src/integrationTest/resources/${RAW_SPAN_PIPELINE_LATEST_RELEASE_YAML}") - description = 'Runs the service-map integration tests.' - group = 'verification' - testClassesDirs = sourceSets.integrationTest.output.classesDirs - classpath = sourceSets.integrationTest.runtimeClasspath +def rawSpanEventLatestReleaseCompatibilityEndToEndTest = createEndToEndTest("rawSpanEventLatestReleaseCompatibilityEndToEndTest", + includeRawSpanTestsMatchPattern, + rawSpanDataPrepperEventFromBuild, rawSpanDataPrepperLatestFromPull) - filter { - includeTestsMatching "com.amazon.dataprepper.integration.trace.EndToEndServiceMapTest*" - } +def includeServiceMapTestsMatchPattern = "com.amazon.dataprepper.integration.trace.EndToEndServiceMapTest*" - finalizedBy stopOpenSearchDockerContainer - def stopDataPrepper1Task = stopDataPrepperDockerContainer(startDataPrepper1Task as DockerStartContainer) - def stopDataPrepper2Task = stopDataPrepperDockerContainer(startDataPrepper2Task as DockerStartContainer) - finalizedBy stopDataPrepper1Task - finalizedBy stopDataPrepper2Task - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper1Task as DockerStopContainer) - finalizedBy removeDataPrepperDockerContainer(stopDataPrepper2Task as DockerStopContainer) - finalizedBy removeDataPrepperNetwork -} +def createServiceMapDataPrepperOTLP1Task = createDataPrepperDockerContainer( + "serviceMapDataPrepper1", "dataprepper1", 21890, 4900, "/app/${SERVICE_MAP_PIPELINE_YAML}") +def createServiceMapDataPrepperOTLP2Task = createDataPrepperDockerContainer( + "serviceMapDataPrepper2", "dataprepper2", 21891, 4901, "/app/${SERVICE_MAP_PIPELINE_YAML}") + +// TODO: replace with Event type in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) +def serviceMapOTLPEndToEndTest = createEndToEndTest("serviceMapOTLPEndToEndTest", includeServiceMapTestsMatchPattern, + createServiceMapDataPrepperOTLP1Task, createServiceMapDataPrepperOTLP2Task) + +def serviceMapDataPrepperOTLP = createDataPrepperDockerContainer( + "serviceMapDataPrepperOTLP", "dataprepper1", 21890, 4900, "/app/${SERVICE_MAP_PIPELINE_YAML}") +def serviceMapDataPrepperEvent = createDataPrepperDockerContainer( + "serviceMapDataPrepperEvent", "dataprepper2", 21891, 4901, "/app/${SERVICE_MAP_PIPELINE_EVENT_TYPE_YAML}") + +// TODO: remove in 2.0 (https://github.com/opensearch-project/data-prepper/issues/1272) +def serviceMapOTLPAndEventEndToEndTest = createEndToEndTest("serviceMapOTLPAndEventEndToEndTest", includeServiceMapTestsMatchPattern, + serviceMapDataPrepperOTLP, serviceMapDataPrepperEvent) dependencies { integrationTestImplementation project(':data-prepper-plugins:opensearch') - integrationTestImplementation project(':data-prepper-plugins:otel-trace-group-prepper') + integrationTestImplementation project(':data-prepper-plugins:otel-trace-group-processor') integrationTestImplementation "org.awaitility:awaitility:4.1.1" integrationTestImplementation "io.opentelemetry:opentelemetry-proto:${versionMap.opentelemetryProto}" integrationTestImplementation 'com.google.protobuf:protobuf-java-util:3.19.4' diff --git a/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndRawSpanTest.java b/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndRawSpanTest.java index 4ef0a2a922..08e6a1bec4 100644 --- a/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndRawSpanTest.java +++ b/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndRawSpanTest.java @@ -6,10 +6,9 @@ package com.amazon.dataprepper.integration.trace; -import com.amazon.dataprepper.plugins.prepper.oteltracegroup.model.TraceGroup; +import com.amazon.dataprepper.model.trace.DefaultTraceGroupFields; +import com.amazon.dataprepper.plugins.processor.oteltracegroup.model.TraceGroup; import com.amazon.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.ByteString; import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.retry.RetryRule; @@ -48,27 +47,30 @@ import static org.awaitility.Awaitility.await; public class EndToEndRawSpanTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() {}; private static final int DATA_PREPPER_PORT_1 = 21890; private static final int DATA_PREPPER_PORT_2 = 21891; private static final Map TEST_TRACEID_TO_TRACE_GROUP = new HashMap() {{ put(Hex.toHexString(EndToEndTestSpan.TRACE_1_ROOT_SPAN.traceId.getBytes()), - new TraceGroup( - EndToEndTestSpan.TRACE_1_ROOT_SPAN.name, - EndToEndTestSpan.TRACE_1_ROOT_SPAN.endTime, - EndToEndTestSpan.TRACE_1_ROOT_SPAN.durationInNanos, - EndToEndTestSpan.TRACE_1_ROOT_SPAN.statusCode - )); + new TraceGroup.TraceGroupBuilder() + .setTraceGroup(EndToEndTestSpan.TRACE_1_ROOT_SPAN.name) + .setTraceGroupFields(DefaultTraceGroupFields.builder() + .withEndTime(EndToEndTestSpan.TRACE_1_ROOT_SPAN.endTime) + .withDurationInNanos(EndToEndTestSpan.TRACE_1_ROOT_SPAN.durationInNanos) + .withStatusCode(EndToEndTestSpan.TRACE_1_ROOT_SPAN.statusCode) + .build()) + .build() + ); put(Hex.toHexString(EndToEndTestSpan.TRACE_2_ROOT_SPAN.traceId.getBytes()), - new TraceGroup( - EndToEndTestSpan.TRACE_2_ROOT_SPAN.name, - EndToEndTestSpan.TRACE_2_ROOT_SPAN.endTime, - EndToEndTestSpan.TRACE_2_ROOT_SPAN.durationInNanos, - EndToEndTestSpan.TRACE_2_ROOT_SPAN.statusCode - ) - ); + new TraceGroup.TraceGroupBuilder() + .setTraceGroup(EndToEndTestSpan.TRACE_2_ROOT_SPAN.name) + .setTraceGroupFields(DefaultTraceGroupFields.builder() + .withEndTime(EndToEndTestSpan.TRACE_2_ROOT_SPAN.endTime) + .withDurationInNanos(EndToEndTestSpan.TRACE_2_ROOT_SPAN.durationInNanos) + .withStatusCode(EndToEndTestSpan.TRACE_2_ROOT_SPAN.statusCode) + .build()) + .build() + ); }}; private static final List TEST_SPAN_SET_1_WITH_ROOT_SPAN = Arrays.asList( EndToEndTestSpan.TRACE_1_ROOT_SPAN, EndToEndTestSpan.TRACE_1_SPAN_2, EndToEndTestSpan.TRACE_1_SPAN_3, @@ -112,15 +114,17 @@ public void testPipelineEndToEnd() throws InterruptedException { builder.withUsername("admin"); builder.withPassword("admin"); final RestHighLevelClient restHighLevelClient = builder.build().createClient(); - // Wait for otel-trace-raw-prepper by at least trace_flush_interval - Thread.sleep(6000); // Wait for data to flow through pipeline and be indexed by ES - await().atMost(10, TimeUnit.SECONDS).untilAsserted( + await().atLeast(6, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).untilAsserted( () -> { refreshIndices(restHighLevelClient); final SearchRequest searchRequest = new SearchRequest(INDEX_NAME); searchRequest.source( - SearchSourceBuilder.searchSource().size(100) + SearchSourceBuilder.searchSource() + .size(100) + .fetchField(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD) + .fetchField(TraceGroup.TRACE_GROUP_END_TIME_FIELD, "strict_date_time") + .fetchField(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD) ); final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); final List> foundSources = getSourcesFromSearchHits(searchResponse.getHits()); @@ -158,10 +162,16 @@ private List> getSourcesFromSearchHits(final SearchHits sear final List> sources = new ArrayList<>(); searchHits.forEach(hit -> { Map source = hit.getSourceAsMap(); - // OpenSearch API identifies Number type by range, need to convert to Long - if (source.containsKey(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD)) { - final Long durationInNanos = ((Number) source.get(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD)).longValue(); - source.put(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD, durationInNanos); + if (source.containsKey(TraceGroup.TRACE_GROUP_NAME_FIELD)) { + // Extract and replace traceGroupFields with searchHit fields to unify the representation + source.entrySet().removeIf(entry -> entry.getKey().startsWith("traceGroupFields")); + final Number statusCode = hit.field(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD).getValue(); + // Restore trailing zeros for thousand, e.g. 2020-08-20T05:40:46.0895568Z -> 2020-08-20T05:40:46.089556800Z + final String endTime = Instant.parse(hit.field(TraceGroup.TRACE_GROUP_END_TIME_FIELD).getValue()).toString(); + final Number durationInNanos = hit.field(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD).getValue(); + source.put(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD, statusCode.intValue()); + source.put(TraceGroup.TRACE_GROUP_END_TIME_FIELD, endTime); + source.put(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD, durationInNanos.longValue()); } sources.add(source); }); @@ -267,7 +277,10 @@ private Map getExpectedEsDocumentSource(final Span span, final S esDocSource.put("status.code", span.getStatus().getCodeValue()); esDocSource.put("serviceName", serviceName); final TraceGroup traceGroup = TEST_TRACEID_TO_TRACE_GROUP.get(traceId); - esDocSource.putAll(OBJECT_MAPPER.convertValue(traceGroup, MAP_TYPE_REFERENCE)); + esDocSource.put(TraceGroup.TRACE_GROUP_NAME_FIELD, traceGroup.getTraceGroup()); + esDocSource.put(TraceGroup.TRACE_GROUP_END_TIME_FIELD, traceGroup.getTraceGroupFields().getEndTime()); + esDocSource.put(TraceGroup.TRACE_GROUP_DURATION_IN_NANOS_FIELD, traceGroup.getTraceGroupFields().getDurationInNanos()); + esDocSource.put(TraceGroup.TRACE_GROUP_STATUS_CODE_FIELD, traceGroup.getTraceGroupFields().getStatusCode()); return esDocSource; } diff --git a/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java b/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java index 50c0aa3e38..65aca37f7e 100644 --- a/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java +++ b/e2e-test/trace/src/integrationTest/java/com/amazon/dataprepper/integration/trace/EndToEndServiceMapTest.java @@ -83,8 +83,7 @@ public void testPipelineEndToEnd() throws IOException, InterruptedException { final RestHighLevelClient restHighLevelClient = builder.build().createClient(); // Wait for service map prepper by 2 * window_duration - Thread.sleep(6000); - await().atMost(20, TimeUnit.SECONDS).untilAsserted( + await().atLeast(8, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).untilAsserted( () -> { final List> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME); foundSources.forEach(source -> source.remove("hashId")); @@ -111,8 +110,7 @@ public void testPipelineEndToEnd() throws IOException, InterruptedException { .flatMap(Collection::stream).collect(Collectors.toList()); possibleEdges.addAll(getPossibleEdges(TEST_TRACEID_2, testDataSet2)); // Wait for service map prepper by 2 * window_duration - Thread.sleep(6000); - await().atMost(20, TimeUnit.SECONDS).untilAsserted( + await().atLeast(8, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS).untilAsserted( () -> { final List> foundSources = getSourcesFromIndex(restHighLevelClient, SERVICE_MAP_INDEX_NAME); foundSources.forEach(source -> source.remove("hashId")); diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-event-type.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-event-type.yml new file mode 100644 index 0000000000..fb1f3fff2f --- /dev/null +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-event-type.yml @@ -0,0 +1,25 @@ +entry-pipeline: + source: + otel_trace_source: + ssl: false + record_type: "event" + sink: + - pipeline: + name: "raw-pipeline" +raw-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - otel_trace_raw: + trace_flush_interval: 5 + - otel_trace_group: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + sink: + - opensearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + trace_analytics_raw: true \ No newline at end of file diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml index 1b611657e8..85052a4bec 100644 --- a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-latest-release.yml @@ -16,7 +16,6 @@ raw-pipeline: name: "entry-pipeline" prepper: - otel_trace_raw_prepper: - root_span_flush_delay: 3 # TODO: remove after 1.1 release trace_flush_interval: 5 sink: - opensearch: diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-peer-forwarder-event-type.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-peer-forwarder-event-type.yml new file mode 100644 index 0000000000..f7bda405f2 --- /dev/null +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline-peer-forwarder-event-type.yml @@ -0,0 +1,26 @@ +entry-pipeline: + source: + otel_trace_source: + ssl: false + record_type: "event" + processor: + - peer_forwarder: + discovery_mode: "static" + static_endpoints: ["dataprepper1", "dataprepper2"] + ssl: false + sink: + - pipeline: + name: "raw-pipeline" +raw-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - otel_trace_raw: + trace_flush_interval: 5 + sink: + - opensearch: + hosts: [ "https://node-0.example.com:9200" ] + username: "admin" + password: "admin" + trace_analytics_raw: true \ No newline at end of file diff --git a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml index f49a8cf2e1..945aa0e14f 100644 --- a/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml +++ b/e2e-test/trace/src/integrationTest/resources/raw-span-e2e-pipeline.yml @@ -11,7 +11,6 @@ raw-pipeline: name: "entry-pipeline" prepper: - otel_trace_raw_prepper: - root_span_flush_delay: 1 # TODO: remove after 1.1 release trace_flush_interval: 5 - otel_trace_group_prepper: hosts: [ "https://node-0.example.com:9200" ] diff --git a/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline-event-type.yml b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline-event-type.yml new file mode 100644 index 0000000000..bdb45ca868 --- /dev/null +++ b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline-event-type.yml @@ -0,0 +1,26 @@ +entry-pipeline: + source: + otel_trace_source: + ssl: false + record_type: "event" + processor: + - peer_forwarder: + discovery_mode: "static" + static_endpoints: ["dataprepper1", "dataprepper2"] + ssl: false + sink: + - pipeline: + name: "service-map-pipeline" +service-map-pipeline: + source: + pipeline: + name: "entry-pipeline" + processor: + - service_map_stateful: + window_duration: 4 + sink: + - opensearch: + hosts: ["https://node-0.example.com:9200"] + username: "admin" + password: "admin" + trace_analytics_service_map: true \ No newline at end of file diff --git a/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml index 90a6636479..181cb53991 100644 --- a/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml +++ b/e2e-test/trace/src/integrationTest/resources/service-map-e2e-pipeline.yml @@ -16,7 +16,7 @@ service-map-pipeline: name: "entry-pipeline" prepper: - service_map_stateful: - window_duration: 3 + window_duration: 4 sink: - opensearch: hosts: ["https://node-0.example.com:9200"]