diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index e31f5f08..47460e12 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: AdoptOpenJDK/install-jdk@v1
with:
- version: '15'
+ version: '16'
architecture: x64
- name: Build with Maven
run: ./mvnw verify
diff --git a/Dockerfile b/Dockerfile
index 8df6d07e..fadcf395 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,5 +1,5 @@
# Builder image
-FROM adoptopenjdk:15_36-jdk-hotspot as build
+FROM adoptopenjdk/openjdk16:jdk-16.0.1_9-debian as build
RUN jlink \
--module-path /opt/java/jmods \
@@ -10,7 +10,7 @@ RUN jlink \
--output /opt/jdk-mini
# Start a new image and copy just the minimal java distribution from the previous one
-FROM debian:10.6-slim
+FROM debian:stable-slim
COPY --from=build /opt/jdk-mini /opt/jdk-mini
# Create some dirs and copy pitchfork jar
diff --git a/pom.xml b/pom.xml
index 810ec0ae..3d586791 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,14 +9,14 @@
1.0-SNAPSHOT
- 15
+ 16
UTF-8
1.11.1009
2.16.3
2.4.5
1.15.3
- 2.10.2
+ 2.12.3
0.14.6
1.14.2
5.12.0
@@ -27,6 +27,7 @@
1.3.2
4.0.3
3.19.0
+ 0.8.24
@@ -50,6 +51,15 @@
+
+
+ com.fasterxml.jackson
+ jackson-bom
+ ${jackson.version}
+ pom
+ import
+
+
org.springframework.boot
@@ -59,6 +69,13 @@
import
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+
+
io.zipkin.reporter2
zipkin-reporter-bom
@@ -115,6 +132,11 @@
javax.annotation
javax.annotation-api
+
+ org.msgpack
+ jackson-dataformat-msgpack
+ ${messagepack.version}
+
com.amazonaws
@@ -209,6 +231,18 @@
${testcontainers.version}
test
+
+ org.testcontainers
+ mockserver
+ ${testcontainers.version}
+ test
+
+
+ org.mock-server
+ mockserver-client-java
+ 5.11.2
+ test
+
org.junit.jupiter
junit-jupiter-api
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/RoutingConfig.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/RoutingConfig.java
index cbf67aef..27ed90c6 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/RoutingConfig.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/RoutingConfig.java
@@ -16,13 +16,10 @@
*/
package com.hotels.service.tracing.zipkintohaystack;
-import static org.springframework.http.MediaType.APPLICATION_JSON;
-import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
-import static org.springframework.web.reactive.function.server.RequestPredicates.method;
-import static org.springframework.web.reactive.function.server.RequestPredicates.path;
-import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
-import static org.springframework.web.reactive.function.server.RouterFunctions.route;
-
+import com.hotels.service.tracing.zipkintohaystack.ingresses.datadog.DatadogSpansMessagePackDecoder;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.datadog.DatadogSpansJsonDecoder;
+import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
+import io.netty.handler.codec.http.HttpContentDecompressor;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
@@ -33,9 +30,11 @@
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
-import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
-import io.netty.handler.codec.http.HttpContentDecompressor;
-import zipkin2.codec.SpanBytesDecoder;
+import static org.springframework.http.MediaType.APPLICATION_JSON;
+import static org.springframework.web.reactive.function.server.RequestPredicates.*;
+import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
+import static org.springframework.web.reactive.function.server.RouterFunctions.route;
+import static zipkin2.codec.SpanBytesDecoder.*;
@Configuration
public class RoutingConfig {
@@ -48,18 +47,28 @@ public class RoutingConfig {
* At this moment we support {@code POST}s for the v1 api encoded in Json or Thrift, or for the v2 api in Json.
*/
@Bean
- public RouterFunction myRoutes(ZipkinController zipkinController, MetersProvider metersProvider) {
+ public RouterFunction myRoutes(ZipkinController zipkinController,
+ MetersProvider metersProvider,
+ DatadogSpansMessagePackDecoder datadogSpansMessagePackDecoder,
+ DatadogSpansJsonDecoder datadogSpansJsonDecoder) {
var counterJsonV1 = metersProvider.getSpansCounter("http", "jsonv1");
var counterJsonV2 = metersProvider.getSpansCounter("http", "jsonv2");
var counterThrift = metersProvider.getSpansCounter("http", "thrift");
var counterProtobuf = metersProvider.getSpansCounter("http", "protobuf");
+ var counterDatadog = metersProvider.getSpansCounter("http", "datadog");
return nest(method(HttpMethod.POST),
nest(contentType(APPLICATION_JSON),
- route(path("/api/v1/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V1, counterJsonV1))
- .andRoute(path("/api/v2/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V2, counterJsonV2)))
- .andRoute(contentType(APPLICATION_THRIFT), request -> zipkinController.addSpans(request, SpanBytesDecoder.THRIFT, counterThrift))
- .andRoute(contentType(APPLICATION_PROTOBUF), request -> zipkinController.addSpans(request, SpanBytesDecoder.PROTO3, counterProtobuf)))
+ route(path("/api/v1/spans"), request -> zipkinController.addSpans(request, JSON_V1::decodeList, counterJsonV1))
+ .andRoute(path("/api/v2/spans"), request -> zipkinController.addSpans(request, JSON_V2::decodeList, counterJsonV2)))
+ .andRoute(contentType(APPLICATION_THRIFT), request -> zipkinController.addSpans(request, THRIFT::decodeList, counterThrift))
+ .andRoute(contentType(APPLICATION_PROTOBUF), request -> zipkinController.addSpans(request, PROTO3::decodeList, counterProtobuf)))
+ .andNest(method(HttpMethod.PUT),
+ nest(contentType(new MediaType("application", "msgpack")),
+ route(path("/v0.3/traces"), request -> zipkinController.addSpans(request, datadogSpansMessagePackDecoder, counterDatadog)))
+ .andNest(contentType(APPLICATION_JSON),
+ route(path("/v0.3/traces"), request -> zipkinController.addSpans(request, datadogSpansJsonDecoder, counterDatadog)))
+ )
.andRoute(RequestPredicates.all(), zipkinController::unmatched);
}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ZipkinController.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ZipkinController.java
index 680b36ad..5a57a566 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ZipkinController.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ZipkinController.java
@@ -19,6 +19,7 @@
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.SpanForwarder;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.Decoder;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +31,6 @@
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import zipkin2.Span;
-import zipkin2.codec.SpanBytesDecoder;
import java.util.Collection;
import java.util.function.Function;
@@ -67,7 +67,7 @@ public Mono unmatched(ServerRequest serverRequest) {
* Valid requests made to this service will be handled by this function.
* It submits the reported spans to the registered {@link SpanForwarder} asynchronously and waits until they all complete.
*/
- public Mono addSpans(ServerRequest serverRequest, SpanBytesDecoder decoder, Counter counter) {
+ public Mono addSpans(ServerRequest serverRequest, Decoder decoder, Counter counter) {
return serverRequest
.bodyToMono(byte[].class)
.flatMapIterable(decodeList(decoder))
@@ -78,7 +78,7 @@ public Mono addSpans(ServerRequest serverRequest, SpanBytesDecod
.then(ok().body(BodyInserters.empty()));
}
- private Function> decodeList(SpanBytesDecoder decoder) {
+ private Function> decodeList(Decoder decoder) {
return bytes -> (Collection) decoder.decodeList(bytes);
}
}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverter.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverter.java
new file mode 100644
index 00000000..51fc5a56
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverter.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2018 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog;
+
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
+import zipkin2.Annotation;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static org.springframework.util.StringUtils.hasText;
+
+/**
+ * Converter between {@code Zipkin} and {@code Datadog} domains.
+ */
+public class DatadogDomainConverter {
+
+ public static final Long MILLIS_TO_NANOS = 1_000_000L;
+ private static final Integer ERROR = 1;
+
+ /**
+ * Accepts a span in {@code Zipkin V2} format and returns a span in {@code Datadog} format.
+ */
+ public static DatadogSpan fromZipkinV2(zipkin2.Span zipkin) {
+ Integer error = isError(zipkin);
+ BigInteger parentId = hexadecimalToDecimal(zipkin.parentId());
+ var spanId = hexadecimalToDecimal(zipkin.id());
+ var traceId = hexadecimalToDecimal(zipkin.traceId());
+
+ return new DatadogSpan(
+ traceId,
+ spanId,
+ parentId,
+ toNanos(zipkin.timestamp()),
+ toNanos(zipkin.duration()),
+ error,
+ tags(zipkin.tags(), zipkin.annotations(), zipkin.kind()),
+ emptyMap(),
+ valueOrDefault(zipkin.name(), "span"),
+ valueOrDefault(zipkin.name(), "resource"), // TODO: maybe derive resource from tags? http.method + http.path?
+ zipkin.localServiceName(),
+ "web"
+ );
+ }
+
+ private static Map tags(Map tags, List annotations, Span.Kind kind) {
+ Map collected = new HashMap<>();
+
+ if (kind != null) {
+ collected.put("span.kind", kind.name());
+ }
+
+ tags.forEach(collected::put);
+
+ return collected;
+ }
+
+ private static String valueOrDefault(String input, String defaultValue) {
+ return input != null ? input : defaultValue;
+ }
+
+ private static Long toNanos(Long timestamp) {
+ return timestamp != null ? timestamp * MILLIS_TO_NANOS : null;
+ }
+
+ private static Long toMillis(Long timestamp) {
+ return timestamp != null ? timestamp / MILLIS_TO_NANOS : null;
+ }
+
+ private static BigInteger hexadecimalToDecimal(String input) {
+ // B3 accepts ids with different sizes (64 or 128 bits).
+ // To make this work we truncate larger than 64 bit ones (16 chars).
+ if (input == null) {
+ return null;
+ } else if (input.length() > 16) {
+ return new BigInteger(input.substring(16), 16);
+ } else {
+ return new BigInteger(input, 16);
+ }
+ }
+
+ private static Integer isError(Span zipkin) {
+ return zipkin.tags().containsKey("error") ? ERROR : null;
+ }
+
+ private static boolean isError(DatadogSpan datadog) {
+ return ERROR.equals(datadog.error());
+ }
+
+ public static Span toZipkin(DatadogSpan datadogSpan) {
+ var builder = Span.newBuilder();
+
+ if (hasText(datadogSpan.name())) {
+ builder.localEndpoint(Endpoint.newBuilder()
+ .serviceName(datadogSpan.service())
+ .build());
+ }
+
+ builder.name(datadogSpan.name());
+
+ if (isError(datadogSpan)) {
+ builder.putTag("error", "error");
+ }
+ if (datadogSpan.meta() != null) {
+ datadogSpan.meta().forEach(builder::putTag);
+ }
+
+ if (datadogSpan.type() != null) {
+ builder.putTag("type", datadogSpan.type());
+ }
+ if (datadogSpan.traceId() != null) {
+ builder.traceId(decimalToHexadecimal(datadogSpan.traceId()));
+ }
+ if (datadogSpan.spanId() != null) {
+ builder.id(decimalToHexadecimal(datadogSpan.spanId()));
+ }
+ if (datadogSpan.parentId() != null) {
+ builder.parentId(decimalToHexadecimal(datadogSpan.parentId()));
+ }
+ if (datadogSpan.start() != null) {
+ builder.timestamp(toMillis(datadogSpan.start()));
+ }
+ if (datadogSpan.duration() != null) {
+ builder.duration(toMillis(datadogSpan.duration()));
+ }
+ // TODO: annotations, resource_name, ...
+
+ return builder.build();
+ }
+
+ /**
+ * Zipkin trace ids are 64 or 128 bits represented as 16 or 32 hex characters with '0' left padding
+ * We always use 64 bits (16 chars) to maintain compatibility with Datadog.
+ */
+ private static String decimalToHexadecimal(BigInteger id) {
+ return String.format("%016x", id);
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogForwarder.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogForwarder.java
new file mode 100644
index 00000000..ff1b15bf
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogForwarder.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2018 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog;
+
+import com.hotels.service.tracing.zipkintohaystack.forwarders.SpanForwarder;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.properties.DatadogForwarderConfigProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+import zipkin2.Span;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Forwarder of spans to an async {@code Datadog} collector.
+ * Spans are sent to a dispatcher that will process them in the background.
+ */
+@EnableConfigurationProperties(DatadogForwarderConfigProperties.class)
+@ConditionalOnProperty(name = "pitchfork.forwarders.datadog.enabled", havingValue = "true")
+@Component
+public class DatadogForwarder implements SpanForwarder {
+
+ private final Logger logger = LoggerFactory.getLogger(DatadogForwarder.class);
+ private final DatadogSpansDispatcher spansDispatcher;
+
+ public DatadogForwarder(DatadogSpansDispatcher spansDispatcher) {
+ this.spansDispatcher = spansDispatcher;
+ }
+
+ @PostConstruct
+ public void initialize() {
+ CompletableFuture.runAsync(spansDispatcher);
+ }
+
+ @Override
+ public void process(Span span) {
+ logger.info("operation=process, span={}", span);
+
+ spansDispatcher.addSpan(span);
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogSpansDispatcher.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogSpansDispatcher.java
new file mode 100644
index 00000000..c9290efa
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogSpansDispatcher.java
@@ -0,0 +1,122 @@
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.properties.DatadogForwarderConfigProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import zipkin2.Span;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.stream.Collectors.toList;
+import static org.springframework.web.reactive.function.BodyInserters.fromValue;
+
+@EnableConfigurationProperties(DatadogForwarderConfigProperties.class)
+@ConditionalOnProperty(name = "pitchfork.forwarders.datadog.enabled", havingValue = "true")
+@Component
+public class DatadogSpansDispatcher implements Runnable {
+
+ private static final long FIVE_SECONDS = 5 * 1000; // in millis
+ private static final int MINIMUM_PENDING_SPANS = 100;
+ private final Logger logger = LoggerFactory.getLogger(DatadogForwarder.class);
+
+ private final WebClient datadogClient;
+ private final ObjectWriter mapper;
+ private final ArrayBlockingQueue pending;
+
+ public DatadogSpansDispatcher(WebClient datadogClient,
+ ObjectMapper mapper,
+ DatadogForwarderConfigProperties properties) {
+ this.datadogClient = datadogClient;
+ this.mapper = mapper.writer();
+ this.pending = new ArrayBlockingQueue<>(properties.getQueuedMaxSpans());
+ }
+
+ @Override
+ public void run() {
+ List spans = new ArrayList<>();
+ long lastFlush = currentTimeMillis();
+
+ while (true) {
+ try {
+ Span take = pending.poll(1, TimeUnit.SECONDS);
+
+ if (take != null) {
+ spans.add(take);
+ }
+
+ if (shouldFlushSpans(spans.size(), lastFlush)) {
+ flush(spans);
+
+ lastFlush = currentTimeMillis();
+ }
+ } catch (InterruptedException e) {
+ logger.error("operation=run", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Flush data if there's already a big number of them pending or if they've been waiting for over x seconds.
+ */
+ private boolean shouldFlushSpans(int size, long lastFlush) {
+ long currentTime = currentTimeMillis();
+ boolean enoughPendingSpans = size > MINIMUM_PENDING_SPANS;
+ boolean spansPendingForOverXSeconds = (lastFlush + FIVE_SECONDS < currentTime) && size > 0;
+
+ return enoughPendingSpans || spansPendingForOverXSeconds;
+ }
+
+ private void flush(List spans) {
+ List datadogSpans = spans.stream()
+ .map(DatadogDomainConverter::fromZipkinV2)
+ .collect(toList());
+
+ Optional body = serialize(datadogSpans);
+
+ body.ifPresent(it -> {
+ datadogClient.put()
+ .uri("/v0.3/traces")
+ .body(fromValue(it))
+ .retrieve()
+ .toBodilessEntity()
+ .subscribe(); // FIXME: reactive
+ });
+
+ spans.clear();
+ }
+
+ private Optional serialize(List spans) {
+ try {
+ var body = mapper.writeValueAsString(List.of(spans));
+ return Optional.ofNullable(body);
+ } catch (JsonProcessingException e) {
+ logger.error("operation=serialize", e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Add spans to the backlog queue.
+ */
+ public void addSpan(Span span) {
+ try {
+ this.pending.add(span);
+ } catch (Exception e) {
+ logger.error("operation=addSpan", e);
+ }
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/client/HttpClientSpringConfig.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/client/HttpClientSpringConfig.java
new file mode 100644
index 00000000..9646a6d7
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/client/HttpClientSpringConfig.java
@@ -0,0 +1,43 @@
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.client;
+
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.properties.DatadogForwarderConfigProperties;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.reactive.ClientHttpConnector;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
+
+import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.springframework.http.HttpHeaders.CONTENT_TYPE;
+import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+import static reactor.netty.resources.ConnectionProvider.DEFAULT_POOL_LEASING_STRATEGY;
+
+@EnableConfigurationProperties(DatadogForwarderConfigProperties.class)
+@ConditionalOnProperty(name = "pitchfork.forwarders.datadog.enabled", havingValue = "true")
+@Configuration
+public class HttpClientSpringConfig {
+
+ @Bean("datadogClient")
+ public WebClient createWebClient(WebClient.Builder webClientBuilder, DatadogForwarderConfigProperties properties) {
+ HttpClient httpClient =
+ HttpClient.create(ConnectionProvider.builder(DEFAULT_POOL_LEASING_STRATEGY)
+ .maxConnections(properties.getMaxConnections()).build())
+ .tcpConfiguration(client ->
+ client.option(CONNECT_TIMEOUT_MILLIS, properties.getConnectTimeoutMs())
+ .doOnConnected(conn -> conn
+ .addHandlerLast(new ReadTimeoutHandler(properties.getReadTimeoutMs()))));
+
+ ClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+
+ return webClientBuilder
+ .clientConnector(connector)
+ .baseUrl("http://" + properties.getHost() + ":" + properties.getPort())
+ .defaultHeader(CONTENT_TYPE, APPLICATION_JSON_VALUE)
+ .build();
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/model/DatadogSpan.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/model/DatadogSpan.java
new file mode 100644
index 00000000..08c4720f
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/model/DatadogSpan.java
@@ -0,0 +1,28 @@
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+/**
+ * Model for a {@code Datadog} span
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public record DatadogSpan(@JsonProperty("trace_id")
+ BigInteger traceId,
+ @JsonProperty("span_id")
+ BigInteger spanId,
+ @JsonProperty("parent_id")
+ BigInteger parentId,
+ Long start,
+ Long duration,
+ Integer error,
+ Map meta,
+ Map metrics,
+ String name,
+ String resource,
+ String service,
+ String type) {
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/properties/DatadogForwarderConfigProperties.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/properties/DatadogForwarderConfigProperties.java
new file mode 100644
index 00000000..55b01274
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/properties/DatadogForwarderConfigProperties.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.properties;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("pitchfork.forwarders.datadog")
+public class DatadogForwarderConfigProperties {
+ private String host;
+ private Integer port;
+ private Integer connectTimeoutMs;
+ private Integer readTimeoutMs;
+ private Integer maxConnections;
+ private Integer queuedMaxSpans;
+
+ public Integer getQueuedMaxSpans() {
+ return queuedMaxSpans;
+ }
+
+ public void setQueuedMaxSpans(Integer queuedMaxSpans) {
+ this.queuedMaxSpans = queuedMaxSpans;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public Integer getConnectTimeoutMs() {
+ return connectTimeoutMs;
+ }
+
+ public void setConnectTimeoutMs(Integer connectTimeoutMs) {
+ this.connectTimeoutMs = connectTimeoutMs;
+ }
+
+ public Integer getReadTimeoutMs() {
+ return readTimeoutMs;
+ }
+
+ public void setReadTimeoutMs(Integer readTimeoutMs) {
+ this.readTimeoutMs = readTimeoutMs;
+ }
+
+ public Integer getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(Integer maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/Decoder.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/Decoder.java
new file mode 100644
index 00000000..83ce4dcd
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/Decoder.java
@@ -0,0 +1,16 @@
+package com.hotels.service.tracing.zipkintohaystack.ingresses;
+
+import zipkin2.Span;
+
+import java.util.Collection;
+
+/**
+ * Contract for a decoder of bytes {@link Span}.
+ */
+public interface Decoder {
+
+ /**
+ * Returns a collection of spans from their serialized form.
+ */
+ Collection decodeList(byte[] serialized);
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansJsonDecoder.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansJsonDecoder.java
new file mode 100644
index 00000000..38f75118
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansJsonDecoder.java
@@ -0,0 +1,46 @@
+package com.hotels.service.tracing.zipkintohaystack.ingresses.datadog;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.DatadogDomainConverter;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.zipkin.http.ZipkinForwarder;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.Decoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import zipkin2.Span;
+
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+
+@Component
+public class DatadogSpansJsonDecoder implements Decoder {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZipkinForwarder.class);
+ private final ObjectReader reader;
+
+ public DatadogSpansJsonDecoder(ObjectMapper mapper) {
+ this.reader = mapper.readerFor(new TypeReference>>() {
+ });
+ }
+
+ @Override
+ public List decodeList(byte[] bytes) {
+ try {
+ List> traces = reader.readValue(bytes);
+
+ return traces.stream()
+ .flatMap(Collection::stream)
+ .map(DatadogDomainConverter::toZipkin)
+ .collect(toList());
+ } catch (Exception e) {
+ logger.error("operation=readList", e);
+ return emptyList();
+ }
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansMessagePackDecoder.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansMessagePackDecoder.java
new file mode 100644
index 00000000..27134d82
--- /dev/null
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/datadog/DatadogSpansMessagePackDecoder.java
@@ -0,0 +1,51 @@
+package com.hotels.service.tracing.zipkintohaystack.ingresses.datadog;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.DatadogDomainConverter;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
+import com.hotels.service.tracing.zipkintohaystack.forwarders.zipkin.http.ZipkinForwarder;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.Decoder;
+import org.msgpack.jackson.dataformat.MessagePackFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import zipkin2.Span;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+
+@Component
+public class DatadogSpansMessagePackDecoder implements Decoder {
+
+ private static final Logger logger = LoggerFactory.getLogger(ZipkinForwarder.class);
+ private ObjectReader reader;
+
+ @PostConstruct
+ public void initialize() {
+ // Instantiate ObjectMapper for MessagePack
+ ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
+ this.reader = mapper.readerFor(new TypeReference>>() {
+ });
+ }
+
+ @Override
+ public List decodeList(byte[] bytes) {
+ try {
+ List> traces = reader.readValue(bytes);
+
+ return traces.stream()
+ .flatMap(Collection::stream)
+ .map(DatadogDomainConverter::toZipkin)
+ .collect(toList());
+ } catch (Exception e) {
+ logger.error("operation=readList", e);
+ return emptyList();
+ }
+ }
+}
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerLoop.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerLoop.java
similarity index 95%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerLoop.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerLoop.java
index 96ec5fb6..413e1d59 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerLoop.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerLoop.java
@@ -14,11 +14,11 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.kafka;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka;
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
-import com.hotels.service.tracing.zipkintohaystack.ingresses.kafka.properties.KafkaIngressConfigProperties;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka.properties.KafkaIngressConfigProperties;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerSpringConfig.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerSpringConfig.java
similarity index 90%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerSpringConfig.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerSpringConfig.java
index 46cfa27e..15db1854 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaConsumerSpringConfig.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaConsumerSpringConfig.java
@@ -14,11 +14,11 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.kafka;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka;
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
-import com.hotels.service.tracing.zipkintohaystack.ingresses.kafka.properties.KafkaIngressConfigProperties;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka.properties.KafkaIngressConfigProperties;
import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaRecordsConsumer.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaRecordsConsumer.java
similarity index 94%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaRecordsConsumer.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaRecordsConsumer.java
index fd256b0b..adc4170a 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/KafkaRecordsConsumer.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/KafkaRecordsConsumer.java
@@ -14,11 +14,11 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.kafka;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka;
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
-import com.hotels.service.tracing.zipkintohaystack.ingresses.kafka.properties.KafkaIngressConfigProperties;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka.properties.KafkaIngressConfigProperties;
import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/properties/KafkaIngressConfigProperties.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/properties/KafkaIngressConfigProperties.java
similarity index 96%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/properties/KafkaIngressConfigProperties.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/properties/KafkaIngressConfigProperties.java
index fdfbbd68..c14d821b 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/kafka/properties/KafkaIngressConfigProperties.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/kafka/properties/KafkaIngressConfigProperties.java
@@ -14,7 +14,7 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.kafka.properties;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.kafka.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqConsumer.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqConsumer.java
similarity index 96%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqConsumer.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqConsumer.java
index 22116528..741e755a 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqConsumer.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqConsumer.java
@@ -14,7 +14,7 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.rabbitmq;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.rabbitmq;
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqIngressSpringConfig.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqIngressSpringConfig.java
similarity index 92%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqIngressSpringConfig.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqIngressSpringConfig.java
index 0a7b0156..a612a51e 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/RabbitMqIngressSpringConfig.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/RabbitMqIngressSpringConfig.java
@@ -14,7 +14,7 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.rabbitmq;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.rabbitmq;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -23,7 +23,7 @@
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
-import com.hotels.service.tracing.zipkintohaystack.ingresses.rabbitmq.properties.RabbitMqIngressConfigProperties;
+import com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.rabbitmq.properties.RabbitMqIngressConfigProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
diff --git a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/properties/RabbitMqIngressConfigProperties.java b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/properties/RabbitMqIngressConfigProperties.java
similarity index 96%
rename from src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/properties/RabbitMqIngressConfigProperties.java
rename to src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/properties/RabbitMqIngressConfigProperties.java
index 6b309197..da4eafad 100644
--- a/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/rabbitmq/properties/RabbitMqIngressConfigProperties.java
+++ b/src/main/java/com/hotels/service/tracing/zipkintohaystack/ingresses/zipkin/rabbitmq/properties/RabbitMqIngressConfigProperties.java
@@ -14,7 +14,7 @@
* limitations under the License.
*
*/
-package com.hotels.service.tracing.zipkintohaystack.ingresses.rabbitmq.properties;
+package com.hotels.service.tracing.zipkintohaystack.ingresses.zipkin.rabbitmq.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
diff --git a/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/src/main/resources/META-INF/additional-spring-configuration-metadata.json
index 7df1ddc8..b243b81e 100644
--- a/src/main/resources/META-INF/additional-spring-configuration-metadata.json
+++ b/src/main/resources/META-INF/additional-spring-configuration-metadata.json
@@ -150,6 +150,41 @@
"type": "java.util.Map",
"description": "Overrides for the kafka forwarder."
},
+ {
+ "name": "pitchfork.forwarders.datadog.enabled",
+ "type": "java.lang.Boolean",
+ "description": "When true, creates a forwarder to Datadog."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.queued-max-spans",
+ "type": "java.lang.Integer",
+ "description": "Maximum number of spans on the backlog queue waiting to be reported."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.host",
+ "type": "java.lang.String",
+ "description": "The host for the datadog collector. Defaults to localhost."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.port",
+ "type": "java.lang.Integer",
+ "description": "The port for the datadog collector. Defaults to 8216."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.connect-timeout-ms",
+ "type": "java.lang.Integer",
+ "description": "The connection timeout in milliseconds for the Datadog reporter. Defaults to 10s."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.read-timeout-ms",
+ "type": "java.lang.Integer",
+ "description": "The read timeout in milliseconds for the Datadog reporter. Defaults to 10s."
+ },
+ {
+ "name": "pitchfork.forwarders.datadog.max-connections",
+ "type": "java.lang.Integer",
+ "description": "The number of maximum connections for the Datadog reporter. Defaults to 10."
+ },
{
"name": "pitchfork.forwarders.logging.enabled",
"type": "java.lang.Boolean",
@@ -207,4 +242,3 @@
}
]
}
-
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 872f1771..84c10f97 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -30,6 +30,14 @@ pitchfork:
queue-name: zipkin
source-format: JSON_V2
forwarders:
+ datadog:
+ enabled: false
+ host: localhost
+ port: 8126
+ connect-timeout-ms: 10000
+ read-timeout-ms: 1000
+ max-connections: 50
+ queued-max-spans: 1000
haystack:
kinesis:
enabled: false
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/DatadogToZipkinForwarderTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/DatadogToZipkinForwarderTest.java
new file mode 100644
index 00000000..356f4382
--- /dev/null
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/DatadogToZipkinForwarderTest.java
@@ -0,0 +1,95 @@
+package com.hotels.service.tracing.zipkintohaystack;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.http.*;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Testcontainers
+@DirtiesContext
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ContextConfiguration(initializers = {DatadogToZipkinForwarderTest.Initializer.class})
+class DatadogToZipkinForwarderTest {
+
+ @Container
+ private static final GenericContainer zipkinContainer = new GenericContainer("openzipkin/zipkin:2.23")
+ .withExposedPorts(9411)
+ .waitingFor(new HttpWaitStrategy().forPath("/health"));
+ @LocalServerPort
+ private int localServerPort;
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+ @Test
+ void shouldAcceptDatadogFormatAndForwardToZipkin() {
+ String spanId = "2696599e12b2a265";
+ String traceId = "3116bae014149aad";
+ String parentId = "d6318b5dfa0088fa";
+ long timestamp = 1528386023537760L;
+ int duration = 17636;
+ String localEndpoint = "jsonv2";
+
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+
+ var url = "http://localhost:" + localServerPort + "/v0.3/traces";
+ String datadogSpan = """
+ [
+ [
+ {
+ "duration": 12345,
+ "name": "span_name",
+ "resource": "/home",
+ "service": "service_name",
+ "span_id": 987654321,
+ "start": 0,
+ "trace_id": 123456789
+ }
+ ]
+ ]
+ """;
+
+ // build the request
+ HttpEntity entity = new HttpEntity<>(datadogSpan, headers);
+
+ restTemplate.put(url, entity);
+
+ // proxy is async, and zipkin is async too, so we retry our assertions until they are true
+ AWAIT.untilAsserted(() -> {
+
+ // assert that traces were forwarded to zipkin by asking which services it knows about
+ ResponseEntity responseFromZipkin = restTemplate
+ .getForEntity(
+ "http://" + zipkinContainer.getContainerIpAddress() + ":" + zipkinContainer.getFirstMappedPort() + "/api/v2/services",
+ String.class);
+
+ assertThat(HttpStatus.OK).isEqualTo(responseFromZipkin.getStatusCode());
+ assertThat(responseFromZipkin.getBody()).contains("\"service_name\"");
+ });
+ }
+
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.forwarders.zipkin.http.enabled=true",
+ "pitchfork.forwarders.zipkin.http.endpoint=http://" + zipkinContainer.getContainerIpAddress() + ":" + zipkinContainer
+ .getFirstMappedPort() + "/api/v2/spans"
+ );
+ values.applyTo(context);
+ }
+ }
+}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKafkaForwarderTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKafkaForwarderTest.java
index eed05537..d783085d 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKafkaForwarderTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKafkaForwarderTest.java
@@ -6,7 +6,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
@@ -24,17 +23,15 @@
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;
-import java.time.Duration;
import java.util.Optional;
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.awaitility.Awaitility.await;
@Testcontainers
@DirtiesContext
@@ -44,21 +41,16 @@ class HaystackKafkaForwarderTest {
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer();
- private static final ConditionFactory AWAIT = await()
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .pollDelay(Duration.ofSeconds(1));
@LocalServerPort
private int localServerPort;
- static class Initializer implements ApplicationContextInitializer {
- public void initialize(ConfigurableApplicationContext context) {
- var values = TestPropertyValues.of(
- "pitchfork.forwarders.haystack.kafka.enabled=true",
- "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
- );
- values.applyTo(context);
+ private static Optional deserialize(byte[] data) {
+ try {
+ return ofNullable(Span.parseFrom(data));
+ } catch (Exception e) {
+ fail("Failed to deserialise span from data");
+ return empty();
}
}
@@ -131,12 +123,13 @@ private AsyncReporter setupReporter() {
return AsyncReporter.create(sender);
}
- private static Optional deserialize(byte[] data) {
- try {
- return ofNullable(Span.parseFrom(data));
- } catch (Exception e) {
- fail("Failed to deserialise span from data");
- return empty();
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.forwarders.haystack.kafka.enabled=true",
+ "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
+ );
+ values.applyTo(context);
}
}
}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKinesisForwarderTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKinesisForwarderTest.java
index 455398ce..206a7361 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKinesisForwarderTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/HaystackKinesisForwarderTest.java
@@ -6,7 +6,6 @@
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.*;
import com.expedia.open.tracing.Span;
-import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@@ -24,16 +23,14 @@
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;
-import java.time.Duration;
import java.util.Optional;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.awaitility.Awaitility.await;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;
@Testcontainers
@@ -44,10 +41,6 @@ class HaystackKinesisForwarderTest {
@Container
private static final LocalStackContainer kinesisContainer = new LocalStackContainer().withServices(KINESIS);
- private static final ConditionFactory AWAIT = await()
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .pollDelay(Duration.ofSeconds(1));
private static String KINESIS_SERVICE_ENDPOINT;
private static AmazonKinesis kinesisClient;
@@ -64,16 +57,31 @@ static void setup() {
kinesisClient.createStream("proto-spans", 1);
}
- static class Initializer implements ApplicationContextInitializer {
- public void initialize(ConfigurableApplicationContext context) {
- var values = TestPropertyValues.of(
- "pitchfork.forwarders.haystack.kinesis.enabled=true",
- "pitchfork.forwarders.haystack.kinesis.auth.config-type=BASIC",
- "pitchfork.forwarders.haystack.kinesis.client.config-type=ENDPOINT",
- "pitchfork.forwarders.haystack.kinesis.client.endpoint.service-endpoint=" + KINESIS_SERVICE_ENDPOINT
- );
+ private static void setKinesisServiceEndpoint() {
+ // https://github.com/localstack/localstack/blob/e479afa41df908305c4177276237925accc77e10/localstack/ext/java/src/test/java/cloud/localstack/BasicFunctionalityTest.java#L54
+ System.setProperty("com.amazonaws.sdk.disableCbor", "true");
- values.applyTo(context);
+ var serviceEndpoint = kinesisContainer.getEndpointConfiguration(KINESIS).getServiceEndpoint();
+ var endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, "us-west-1");
+
+ KINESIS_SERVICE_ENDPOINT = endpointConfiguration.getServiceEndpoint();
+ }
+
+ private static AmazonKinesis setupKinesisClient() {
+ var endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(KINESIS_SERVICE_ENDPOINT, "us-west-1");
+
+ return AmazonKinesisClientBuilder.standard()
+ .withCredentials(kinesisContainer.getDefaultCredentialsProvider())
+ .withEndpointConfiguration(endpointConfiguration)
+ .build();
+ }
+
+ private static Optional deserialize(byte[] data) {
+ try {
+ return ofNullable(Span.parseFrom(data));
+ } catch (Exception e) {
+ fail("Failed to deserialise span from data");
+ return empty();
}
}
@@ -133,34 +141,6 @@ private String streamStatus(String streamName) {
return streamResult.getStreamDescription().getStreamStatus();
}
- private static void setKinesisServiceEndpoint() {
- // https://github.com/localstack/localstack/blob/e479afa41df908305c4177276237925accc77e10/localstack/ext/java/src/test/java/cloud/localstack/BasicFunctionalityTest.java#L54
- System.setProperty("com.amazonaws.sdk.disableCbor", "true");
-
- var serviceEndpoint = kinesisContainer.getEndpointConfiguration(KINESIS).getServiceEndpoint();
- var endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, "us-west-1");
-
- KINESIS_SERVICE_ENDPOINT = endpointConfiguration.getServiceEndpoint();
- }
-
- private static AmazonKinesis setupKinesisClient() {
- var endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(KINESIS_SERVICE_ENDPOINT, "us-west-1");
-
- return AmazonKinesisClientBuilder.standard()
- .withCredentials(kinesisContainer.getDefaultCredentialsProvider())
- .withEndpointConfiguration(endpointConfiguration)
- .build();
- }
-
- private static Optional deserialize(byte[] data) {
- try {
- return ofNullable(Span.parseFrom(data));
- } catch (Exception e) {
- fail("Failed to deserialise span from data");
- return empty();
- }
- }
-
/**
* Create reporter.
*/
@@ -171,4 +151,17 @@ private AsyncReporter setupReporter() {
.build();
return AsyncReporter.create(sender);
}
+
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.forwarders.haystack.kinesis.enabled=true",
+ "pitchfork.forwarders.haystack.kinesis.auth.config-type=BASIC",
+ "pitchfork.forwarders.haystack.kinesis.client.config-type=ENDPOINT",
+ "pitchfork.forwarders.haystack.kinesis.client.endpoint.service-endpoint=" + KINESIS_SERVICE_ENDPOINT
+ );
+
+ values.applyTo(context);
+ }
+ }
}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/KafkaIngressTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/KafkaIngressTest.java
index 8e4111ec..0afb9f19 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/KafkaIngressTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/KafkaIngressTest.java
@@ -6,7 +6,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.util.TestPropertyValues;
@@ -23,17 +22,15 @@
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.kafka.KafkaSender;
-import java.time.Duration;
import java.util.Optional;
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.awaitility.Awaitility.await;
@Testcontainers
@DirtiesContext
@@ -43,21 +40,24 @@ class KafkaIngressTest {
@Container
private static final KafkaContainer kafkaContainer = new KafkaContainer();
- private static final ConditionFactory AWAIT = await()
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .pollDelay(Duration.ofSeconds(1));
- static class Initializer implements ApplicationContextInitializer {
- public void initialize(ConfigurableApplicationContext context) {
- var values = TestPropertyValues.of(
- "pitchfork.ingress.kafka.enabled=true",
- "pitchfork.ingress.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(),
- "pitchfork.ingress.kafka.source-format=PROTO3",
- "pitchfork.forwarders.haystack.kafka.enabled=true",
- "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
- );
- values.applyTo(context);
+ /**
+ * Create reporter.
+ */
+ private static AsyncReporter setupReporter() {
+ var sender = KafkaSender.newBuilder()
+ .encoding(Encoding.PROTO3)
+ .bootstrapServers(kafkaContainer.getBootstrapServers())
+ .build();
+ return AsyncReporter.create(sender);
+ }
+
+ private static Optional deserialize(byte[] data) {
+ try {
+ return ofNullable(Span.parseFrom(data));
+ } catch (Exception e) {
+ fail("Failed to deserialise span from data");
+ return empty();
}
}
@@ -101,17 +101,6 @@ void shouldForwardTracesToKafka() throws Exception {
}
}
- /**
- * Create reporter.
- */
- private static AsyncReporter setupReporter() {
- var sender = KafkaSender.newBuilder()
- .encoding(Encoding.PROTO3)
- .bootstrapServers(kafkaContainer.getBootstrapServers())
- .build();
- return AsyncReporter.create(sender);
- }
-
/**
* Create consumer and subscribe to spans topic.
*/
@@ -130,12 +119,16 @@ private KafkaConsumer setupConsumer() {
return consumer;
}
- private static Optional deserialize(byte[] data) {
- try {
- return ofNullable(Span.parseFrom(data));
- } catch (Exception e) {
- fail("Failed to deserialise span from data");
- return empty();
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.ingress.kafka.enabled=true",
+ "pitchfork.ingress.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers(),
+ "pitchfork.ingress.kafka.source-format=PROTO3",
+ "pitchfork.forwarders.haystack.kafka.enabled=true",
+ "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
+ );
+ values.applyTo(context);
}
}
}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/RabbitMqIngressTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/RabbitMqIngressTest.java
index 926957ee..8dc1b320 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/RabbitMqIngressTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/RabbitMqIngressTest.java
@@ -7,7 +7,6 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@@ -28,17 +27,15 @@
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.amqp.RabbitMQSender;
-import java.time.Duration;
import java.util.Optional;
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-import static org.awaitility.Awaitility.await;
@Testcontainers
@DirtiesContext
@@ -53,28 +50,40 @@ class RabbitMqIngressTest {
.withExposedPorts(5672)
.withNetworkAliases("rabbitmq")
.waitingFor(new HostPortWaitStrategy());
- private static final ConditionFactory AWAIT = await()
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .pollDelay(Duration.ofSeconds(1));
@BeforeAll
static void setup() throws Exception {
setupRabbitMqQueue();
}
- static class Initializer implements ApplicationContextInitializer {
- public void initialize(ConfigurableApplicationContext context) {
- var values = TestPropertyValues.of(
- "pitchfork.ingress.rabbitmq.enabled=true",
- "pitchfork.ingress.rabbitmq.host=" + rabbitMqContainer.getContainerIpAddress(),
- "pitchfork.ingress.rabbitmq.port=" + rabbitMqContainer.getFirstMappedPort(),
- "pitchfork.ingress.rabbitmq.queue-name=zipkin",
- "pitchfork.ingress.rabbitmq.source-format=PROTO3",
- "pitchfork.forwarders.haystack.kafka.enabled=true",
- "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
- );
- values.applyTo(context);
+ private static void setupRabbitMqQueue() throws Exception {
+ var channel = getRabbitMqChannel();
+ var exchangeName = "pitchforkExchange";
+ var routingKey = "pitchforkExchange";
+ channel.exchangeDeclare(exchangeName, "direct", true);
+ channel.queueDeclare("zipkin", true, false, true, null);
+ channel.queueBind("zipkin", exchangeName, routingKey);
+ }
+
+ private static Channel getRabbitMqChannel() throws Exception {
+ ConnectionFactory factory = new ConnectionFactory();
+ factory.setUsername("guest");
+ factory.setPassword("guest");
+ factory.setVirtualHost("/");
+ factory.setHost(rabbitMqContainer.getContainerIpAddress());
+ factory.setPort(rabbitMqContainer.getFirstMappedPort());
+
+ var connection = factory.newConnection();
+
+ return connection.createChannel();
+ }
+
+ private static Optional deserialize(byte[] data) {
+ try {
+ return ofNullable(com.expedia.open.tracing.Span.parseFrom(data));
+ } catch (Exception e) {
+ fail("Failed to deserialise span from data");
+ return empty();
}
}
@@ -134,37 +143,6 @@ private AsyncReporter setupReporter(Encoding encoding) {
return AsyncReporter.create(sender);
}
- private static void setupRabbitMqQueue() throws Exception {
- var channel = getRabbitMqChannel();
- var exchangeName = "pitchforkExchange";
- var routingKey = "pitchforkExchange";
- channel.exchangeDeclare(exchangeName, "direct", true);
- channel.queueDeclare("zipkin", true, false, true, null);
- channel.queueBind("zipkin", exchangeName, routingKey);
- }
-
- private static Channel getRabbitMqChannel() throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");
- factory.setHost(rabbitMqContainer.getContainerIpAddress());
- factory.setPort(rabbitMqContainer.getFirstMappedPort());
-
- var connection = factory.newConnection();
-
- return connection.createChannel();
- }
-
- private static Optional deserialize(byte[] data) {
- try {
- return ofNullable(com.expedia.open.tracing.Span.parseFrom(data));
- } catch (Exception e) {
- fail("Failed to deserialise span from data");
- return empty();
- }
- }
-
/**
* Create consumer and subscribe to spans topic.
*/
@@ -182,4 +160,19 @@ private KafkaConsumer setupConsumer() {
return consumer;
}
+
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.ingress.rabbitmq.enabled=true",
+ "pitchfork.ingress.rabbitmq.host=" + rabbitMqContainer.getContainerIpAddress(),
+ "pitchfork.ingress.rabbitmq.port=" + rabbitMqContainer.getFirstMappedPort(),
+ "pitchfork.ingress.rabbitmq.queue-name=zipkin",
+ "pitchfork.ingress.rabbitmq.source-format=PROTO3",
+ "pitchfork.forwarders.haystack.kafka.enabled=true",
+ "pitchfork.forwarders.haystack.kafka.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
+ );
+ values.applyTo(context);
+ }
+ }
}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/TestUtils.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/TestUtils.java
new file mode 100644
index 00000000..198e0aad
--- /dev/null
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/TestUtils.java
@@ -0,0 +1,18 @@
+package com.hotels.service.tracing.zipkintohaystack;
+
+import org.awaitility.core.ConditionFactory;
+
+import java.time.Duration;
+
+import static org.awaitility.Awaitility.await;
+
+public class TestUtils {
+
+ /**
+ * Convenience await statement to validate async code.
+ */
+ public static final ConditionFactory AWAIT = await()
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofSeconds(1))
+ .pollDelay(Duration.ofSeconds(1));
+}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinForwarderTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinForwarderTest.java
index ee965f53..b91b26ac 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinForwarderTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinForwarderTest.java
@@ -1,6 +1,5 @@
package com.hotels.service.tracing.zipkintohaystack;
-import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@@ -24,12 +23,10 @@
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.okhttp3.OkHttpSender;
-import java.time.Duration;
import java.util.List;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
import static zipkin2.codec.SpanBytesEncoder.JSON_V1;
@Testcontainers
@@ -38,32 +35,15 @@
@ContextConfiguration(initializers = {ZipkinForwarderTest.Initializer.class})
class ZipkinForwarderTest {
- @LocalServerPort
- private int localServerPort;
-
@Container
- private static final GenericContainer zipkinContainer = new GenericContainer("openzipkin/zipkin:2.12")
+ private static final GenericContainer zipkinContainer = new GenericContainer("openzipkin/zipkin:2.23")
.withExposedPorts(9411)
.waitingFor(new HttpWaitStrategy().forPath("/health"));
- private static final ConditionFactory AWAIT = await()
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .pollDelay(Duration.ofSeconds(1));
-
+ @LocalServerPort
+ private int localServerPort;
@Autowired
private TestRestTemplate restTemplate;
- static class Initializer implements ApplicationContextInitializer {
- public void initialize(ConfigurableApplicationContext context) {
- var values = TestPropertyValues.of(
- "pitchfork.forwarders.zipkin.http.enabled=true",
- "pitchfork.forwarders.zipkin.http.endpoint=http://" + zipkinContainer.getContainerIpAddress() + ":" + zipkinContainer
- .getFirstMappedPort() + "/api/v2/spans"
- );
- values.applyTo(context);
- }
- }
-
@Test
void shouldAcceptJsonV2AndForwardToZipkin() {
String spanId = "2696599e12b2a265";
@@ -254,4 +234,15 @@ private AsyncReporter setupReporter(Encoding encoding, boolean com
.build();
return AsyncReporter.create(sender);
}
+
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.forwarders.zipkin.http.enabled=true",
+ "pitchfork.forwarders.zipkin.http.endpoint=http://" + zipkinContainer.getContainerIpAddress() + ":" + zipkinContainer
+ .getFirstMappedPort() + "/api/v2/spans"
+ );
+ values.applyTo(context);
+ }
+ }
}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinToDatadogTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinToDatadogTest.java
new file mode 100644
index 00000000..7efab5bb
--- /dev/null
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/ZipkinToDatadogTest.java
@@ -0,0 +1,104 @@
+package com.hotels.service.tracing.zipkintohaystack;
+
+import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.RequestDefinition;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.context.ApplicationContextInitializer;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ContextConfiguration;
+import org.testcontainers.containers.MockServerContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import zipkin2.Endpoint;
+import zipkin2.codec.Encoding;
+import zipkin2.reporter.AsyncReporter;
+import zipkin2.reporter.okhttp3.OkHttpSender;
+
+import static com.hotels.service.tracing.zipkintohaystack.TestUtils.AWAIT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+@Testcontainers
+@DirtiesContext
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ContextConfiguration(initializers = {ZipkinToDatadogTest.Initializer.class})
+class ZipkinToDatadogTest {
+
+ @Container
+ private static final MockServerContainer datadogContainer = new MockServerContainer("5.11.2");
+ @LocalServerPort
+ private int localServerPort;
+
+ @Test
+ void shouldAcceptZipkinTracesAndForwardToDatadog() throws Exception {
+ MockServerClient mockServerClient = new MockServerClient(datadogContainer.getHost(), datadogContainer.getServerPort());
+
+ mockServerClient
+ .when(request()
+ .withMethod("PUT")
+ .withPath("/v0.3/traces"))
+ .respond(response().withStatusCode(200));
+
+ String spanId = "352bff9a74ca9ad2";
+ String traceId = "5af7183fb1d4cf5f";
+ String parentId = "6b221d5bc9e6496c";
+ long timestamp = System.currentTimeMillis();
+ int duration = 17636;
+ String localEndpoint = "service_name";
+
+ var zipkinSpan = zipkin2.Span.newBuilder()
+ .id(spanId)
+ .traceId(traceId)
+ .putTag("foo", "bar")
+ .addAnnotation(timestamp, "zoo")
+ .parentId(parentId)
+ .timestamp(timestamp)
+ .duration(duration)
+ .localEndpoint(Endpoint.newBuilder().serviceName(localEndpoint).build())
+ .build();
+
+ var reporter = setupReporter(Encoding.JSON, false);
+ reporter.report(zipkinSpan);
+
+ // we retry our assertions until they are true
+ AWAIT.untilAsserted(() -> {
+ RequestDefinition[] recordedRequests = mockServerClient.retrieveRecordedRequests(request()
+ .withPath("/v0.3/traces")
+ .withMethod("PUT"));
+
+ assertThat(recordedRequests).hasSize(1);
+
+ HttpRequest recordedRequest = (HttpRequest) recordedRequests[0];
+ assertThat(recordedRequest.getBody().getValue().toString()).contains("6554734444506566495"); // this is the decimal representation of the trace id
+ });
+ }
+
+ /**
+ * Create reporter.
+ */
+ private AsyncReporter setupReporter(Encoding encoding, boolean compressionEnabled) {
+ var sender = OkHttpSender.newBuilder()
+ .encoding(encoding)
+ .compressionEnabled(compressionEnabled)
+ .endpoint("http://localhost:" + localServerPort + "/api/v2/spans")
+ .build();
+ return AsyncReporter.create(sender);
+ }
+
+ static class Initializer implements ApplicationContextInitializer {
+ public void initialize(ConfigurableApplicationContext context) {
+ var values = TestPropertyValues.of(
+ "pitchfork.forwarders.datadog.enabled=true",
+ "pitchfork.forwarders.datadog.host=" + datadogContainer.getContainerIpAddress(),
+ "pitchfork.forwarders.datadog.port=" + datadogContainer.getFirstMappedPort()
+ );
+ values.applyTo(context);
+ }
+ }
+}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverterTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverterTest.java
new file mode 100644
index 00000000..b38db4d1
--- /dev/null
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/datadog/DatadogDomainConverterTest.java
@@ -0,0 +1,136 @@
+package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog;
+
+import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
+import org.junit.jupiter.api.Test;
+import zipkin2.Endpoint;
+import zipkin2.Span;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class DatadogDomainConverterTest {
+
+ @Test
+ public void shouldCreateZipkinSpanFromDatadog() {
+ String name = "name";
+ String serviceName = "service_name";
+ long traceId = 123L; // 7b hexadecimal
+ long parentId = 456L; // 1c8 hexadecimal
+ long spanId = 789L; // 315 hexadecimal
+ long timestamp = 1621233762447000000L;
+ long duration = 100000000L;
+
+ DatadogSpan datadogSpan = new DatadogSpan(
+ BigInteger.valueOf(traceId),
+ BigInteger.valueOf(spanId),
+ BigInteger.valueOf(parentId),
+ timestamp,
+ duration,
+ 1,
+ Map.of("tag1", "value1",
+ "tag2", "value2"),
+ Collections.emptyMap(),
+ name,
+ null,
+ serviceName,
+ "web"
+ );
+
+ Span zipkinSpan = DatadogDomainConverter.toZipkin(datadogSpan);
+
+ assertThat(zipkinSpan.traceId()).isEqualTo("000000000000007b");
+ assertThat(zipkinSpan.id()).isEqualTo("0000000000000315");
+ assertThat(zipkinSpan.parentId()).isEqualTo("00000000000001c8");
+ assertThat(zipkinSpan.name()).isEqualTo(name);
+ assertThat(zipkinSpan.tags().get("type")).isEqualTo("web");
+ assertThat(zipkinSpan.localServiceName()).isEqualTo(serviceName);
+ assertThat(zipkinSpan.duration()).isEqualTo(100);
+ assertThat(zipkinSpan.timestamp()).isEqualTo(1621233762447L);
+
+ // No error
+ assertThat(zipkinSpan.tags().get("error")).isNotBlank();
+
+ // 2 user defined tags
+ assertThat(zipkinSpan.tags()).hasSize(4); // 2 tags + the error tag + the type tag
+ assertThat(zipkinSpan.tags().get("tag1")).isEqualTo("value1");
+ assertThat(zipkinSpan.tags().get("tag2")).isEqualTo("value2");
+ }
+
+ @Test
+ public void shouldConvertZipkinToDatadog() {
+ String name = "name";
+ String serviceName = "service_name";
+ String traceId = "7b"; // 123 decimal
+ String parentId = "1c8"; // 456 decimal
+ String spanId = "315"; // 789 decimal
+ long timestamp = 1621233762447L;
+ var kind = Span.Kind.CLIENT;
+ long duration = 100L;
+
+ zipkin2.Span zipkinSpan = Span.newBuilder()
+ .traceId(traceId)
+ .id(spanId)
+ .parentId(parentId)
+ .name(name)
+ .localEndpoint(Endpoint.newBuilder().serviceName(serviceName).build())
+ .timestamp(timestamp)
+ .duration(duration)
+ .kind(kind)
+ .putTag("tag1", "value1")
+ .putTag("tag2", "value2")
+ .build();
+
+ DatadogSpan datadogSpan = DatadogDomainConverter.fromZipkinV2(zipkinSpan);
+
+ assertThat(datadogSpan.meta().get("span.kind")).isEqualTo(kind.name());
+ assertThat(datadogSpan.traceId()).isEqualTo(123L);
+ assertThat(datadogSpan.spanId()).isEqualTo(789);
+ assertThat(datadogSpan.parentId()).isEqualTo(456L);
+ assertThat(datadogSpan.name()).isEqualTo(name);
+ assertThat(datadogSpan.service()).isEqualTo(serviceName);
+ assertThat(datadogSpan.duration()).isEqualTo(100_000_000);
+ assertThat(datadogSpan.start()).isEqualTo(1621233762447000000L);
+
+ // No error
+ assertThat(datadogSpan.error()).isNull();
+
+ // 2 user defined tags
+ assertThat(datadogSpan.meta()).hasSize(3); // 2 user tags + span.kind tag
+ assertThat(datadogSpan.meta().get("tag1")).isEqualTo("value1");
+ assertThat(datadogSpan.meta().get("tag2")).isEqualTo("value2");
+ }
+
+ @Test
+ public void shouldTruncateLongIds() {
+ String name = "pitchfork";
+ String traceId = "352bff9a74ca9ad25af7183fb1d4cf5f"; // 5af7183fb1d4cf5f (rightmost part) which is 6554734444506566495 decimal
+ String spanId = "20471a"; // 2115354 decimal
+
+ zipkin2.Span zipkinSpan = zipkin2.Span.newBuilder()
+ .traceId(traceId)
+ .id(spanId)
+ .name(name)
+ .build();
+
+ DatadogSpan datadogSpan = DatadogDomainConverter.fromZipkinV2(zipkinSpan);
+
+ assertThat(datadogSpan.traceId()).isEqualTo(6554734444506566495L);
+ assertThat(datadogSpan.spanId()).isEqualTo(2115354);
+ }
+
+ @Test
+ public void shouldConvertZipkinErrorTag() {
+ zipkin2.Span zipkinSpan = zipkin2.Span.newBuilder()
+ .traceId("7b")
+ .id("1c8")
+ .putTag("error", "failure_msg")
+ .build();
+
+ DatadogSpan datadogSpan = DatadogDomainConverter.fromZipkinV2(zipkinSpan);
+
+ assertThat(datadogSpan.error()).isEqualTo(1); // 1 = error
+ }
+}
diff --git a/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/haystack/HaystackDomainConverterTest.java b/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/haystack/HaystackDomainConverterTest.java
index 78a0b179..155d47aa 100644
--- a/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/haystack/HaystackDomainConverterTest.java
+++ b/src/test/java/com/hotels/service/tracing/zipkintohaystack/forwarders/haystack/HaystackDomainConverterTest.java
@@ -1,15 +1,11 @@
package com.hotels.service.tracing.zipkintohaystack.forwarders.haystack;
-import static java.lang.System.currentTimeMillis;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-import java.util.Map;
-
-import org.junit.jupiter.api.Test;
-
import com.expedia.open.tracing.Span;
import com.expedia.open.tracing.Tag;
+import org.junit.jupiter.api.Test;
+
+import static java.lang.System.currentTimeMillis;
+import static org.assertj.core.api.Assertions.assertThat;
public class HaystackDomainConverterTest {
@@ -21,11 +17,6 @@ public void shouldCreateHaystackSpanFromZipkinSpan() {
String spanId = zipkinSpanId(789L);
long timestamp = currentTimeMillis();
long duration = 100L;
- Map tags = Map.of(
- "span.kind", "client",
- "tag1", "value1",
- "tag2", "value2"
- );
zipkin2.Span zipkinSpan = zipkin2.Span.newBuilder()
.traceId(traceId)
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index ca041b44..4577d507 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -28,6 +28,14 @@ pitchfork:
host: localhost
source-format: JSON_V2
forwarders:
+ datadog:
+ enabled: false
+ host: localhost
+ port: 8126
+ connect-timeout-ms: 10000
+ read-timeout-ms: 1000
+ max-connections: 10
+ queued-max-spans: 1000
haystack:
kinesis:
enabled: false