Skip to content

Commit

Permalink
Datadog forwarder - Draft (#340)
Browse files Browse the repository at this point in the history
* Support to forward spans to datadog and to accept datadog payloads

Co-authored-by: Nikos Katirtzis <[email protected]>
  • Loading branch information
worldtiki and nikos912000 authored May 20, 2021
1 parent 7f877ab commit 4fad766
Show file tree
Hide file tree
Showing 34 changed files with 1,222 additions and 224 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v1
- uses: AdoptOpenJDK/install-jdk@v1
with:
version: '15'
version: '16'
architecture: x64
- name: Build with Maven
run: ./mvnw verify
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Builder image
FROM adoptopenjdk:15_36-jdk-hotspot as build
FROM adoptopenjdk/openjdk16:jdk-16.0.1_9-debian as build

RUN jlink \
--module-path /opt/java/jmods \
Expand All @@ -10,7 +10,7 @@ RUN jlink \
--output /opt/jdk-mini

# Start a new image and copy just the minimal java distribution from the previous one
FROM debian:10.6-slim
FROM debian:stable-slim
COPY --from=build /opt/jdk-mini /opt/jdk-mini

# Create some dirs and copy pitchfork jar
Expand Down
38 changes: 36 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>15</java.version>
<java.version>16</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<aws-java-sdk.version>1.11.1009</aws-java-sdk.version>
<zipkin-reporter.version>2.16.3</zipkin-reporter.version>
<spring-boot.version>2.4.5</spring-boot.version>
<testcontainers.version>1.15.3</testcontainers.version>
<jackson.version>2.10.2</jackson.version>
<jackson.version>2.12.3</jackson.version>
<amazon-kinesis-producer.version>0.14.6</amazon-kinesis-producer.version>
<amazon-kinesis-client.version>1.14.2</amazon-kinesis-client.version>
<rabbitmq-client.version>5.12.0</rabbitmq-client.version>
Expand All @@ -27,6 +27,7 @@
<javax-annotation.version>1.3.2</javax-annotation.version>
<awaitility.version>4.0.3</awaitility.version>
<assertj-core.version>3.19.0</assertj-core.version>
<messagepack.version>0.8.24</messagepack.version>
</properties>

<organization>
Expand All @@ -50,6 +51,15 @@

<dependencyManagement>
<dependencies>
<!-- FIXME: this is just here until spring boot catches up. we need it to serialize java 16 records -->
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>${jackson.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
Expand All @@ -59,6 +69,13 @@
<scope>import</scope>
</dependency>

<!-- FIXME: hack for java 16 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>

<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-bom</artifactId>
Expand Down Expand Up @@ -115,6 +132,11 @@
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>${messagepack.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -209,6 +231,18 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mockserver</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
<version>5.11.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
*/
package com.hotels.service.tracing.zipkintohaystack;

import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.contentType;
import static org.springframework.web.reactive.function.server.RequestPredicates.method;
import static org.springframework.web.reactive.function.server.RequestPredicates.path;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

import com.hotels.service.tracing.zipkintohaystack.ingresses.datadog.DatadogSpansMessagePackDecoder;
import com.hotels.service.tracing.zipkintohaystack.ingresses.datadog.DatadogSpansJsonDecoder;
import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
import io.netty.handler.codec.http.HttpContentDecompressor;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.reactive.server.ReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
Expand All @@ -33,9 +30,11 @@
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import com.hotels.service.tracing.zipkintohaystack.metrics.MetersProvider;
import io.netty.handler.codec.http.HttpContentDecompressor;
import zipkin2.codec.SpanBytesDecoder;
import static org.springframework.http.MediaType.APPLICATION_JSON;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
import static org.springframework.web.reactive.function.server.RouterFunctions.nest;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static zipkin2.codec.SpanBytesDecoder.*;

@Configuration
public class RoutingConfig {
Expand All @@ -48,18 +47,28 @@ public class RoutingConfig {
* At this moment we support {@code POST}s for the v1 api encoded in Json or Thrift, or for the v2 api in Json.
*/
@Bean
public RouterFunction<ServerResponse> myRoutes(ZipkinController zipkinController, MetersProvider metersProvider) {
public RouterFunction<ServerResponse> myRoutes(ZipkinController zipkinController,
MetersProvider metersProvider,
DatadogSpansMessagePackDecoder datadogSpansMessagePackDecoder,
DatadogSpansJsonDecoder datadogSpansJsonDecoder) {
var counterJsonV1 = metersProvider.getSpansCounter("http", "jsonv1");
var counterJsonV2 = metersProvider.getSpansCounter("http", "jsonv2");
var counterThrift = metersProvider.getSpansCounter("http", "thrift");
var counterProtobuf = metersProvider.getSpansCounter("http", "protobuf");
var counterDatadog = metersProvider.getSpansCounter("http", "datadog");

return nest(method(HttpMethod.POST),
nest(contentType(APPLICATION_JSON),
route(path("/api/v1/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V1, counterJsonV1))
.andRoute(path("/api/v2/spans"), request -> zipkinController.addSpans(request, SpanBytesDecoder.JSON_V2, counterJsonV2)))
.andRoute(contentType(APPLICATION_THRIFT), request -> zipkinController.addSpans(request, SpanBytesDecoder.THRIFT, counterThrift))
.andRoute(contentType(APPLICATION_PROTOBUF), request -> zipkinController.addSpans(request, SpanBytesDecoder.PROTO3, counterProtobuf)))
route(path("/api/v1/spans"), request -> zipkinController.addSpans(request, JSON_V1::decodeList, counterJsonV1))
.andRoute(path("/api/v2/spans"), request -> zipkinController.addSpans(request, JSON_V2::decodeList, counterJsonV2)))
.andRoute(contentType(APPLICATION_THRIFT), request -> zipkinController.addSpans(request, THRIFT::decodeList, counterThrift))
.andRoute(contentType(APPLICATION_PROTOBUF), request -> zipkinController.addSpans(request, PROTO3::decodeList, counterProtobuf)))
.andNest(method(HttpMethod.PUT),
nest(contentType(new MediaType("application", "msgpack")),
route(path("/v0.3/traces"), request -> zipkinController.addSpans(request, datadogSpansMessagePackDecoder, counterDatadog)))
.andNest(contentType(APPLICATION_JSON),
route(path("/v0.3/traces"), request -> zipkinController.addSpans(request, datadogSpansJsonDecoder, counterDatadog)))
)
.andRoute(RequestPredicates.all(), zipkinController::unmatched);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.hotels.service.tracing.zipkintohaystack.forwarders.Fork;
import com.hotels.service.tracing.zipkintohaystack.forwarders.SpanForwarder;
import com.hotels.service.tracing.zipkintohaystack.forwarders.haystack.SpanValidator;
import com.hotels.service.tracing.zipkintohaystack.ingresses.Decoder;
import io.micrometer.core.instrument.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +31,6 @@
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;

import java.util.Collection;
import java.util.function.Function;
Expand Down Expand Up @@ -67,7 +67,7 @@ public Mono<ServerResponse> unmatched(ServerRequest serverRequest) {
* Valid requests made to this service will be handled by this function.
* It submits the reported spans to the registered {@link SpanForwarder} asynchronously and waits until they all complete.
*/
public Mono<ServerResponse> addSpans(ServerRequest serverRequest, SpanBytesDecoder decoder, Counter counter) {
public Mono<ServerResponse> addSpans(ServerRequest serverRequest, Decoder decoder, Counter counter) {
return serverRequest
.bodyToMono(byte[].class)
.flatMapIterable(decodeList(decoder))
Expand All @@ -78,7 +78,7 @@ public Mono<ServerResponse> addSpans(ServerRequest serverRequest, SpanBytesDecod
.then(ok().body(BodyInserters.empty()));
}

private Function<byte[], Iterable<Span>> decodeList(SpanBytesDecoder decoder) {
private Function<byte[], Iterable<Span>> decodeList(Decoder decoder) {
return bytes -> (Collection<Span>) decoder.decodeList(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2018 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.hotels.service.tracing.zipkintohaystack.forwarders.datadog;

import com.hotels.service.tracing.zipkintohaystack.forwarders.datadog.model.DatadogSpan;
import zipkin2.Annotation;
import zipkin2.Endpoint;
import zipkin2.Span;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.springframework.util.StringUtils.hasText;

/**
* Converter between {@code Zipkin} and {@code Datadog} domains.
*/
public class DatadogDomainConverter {

public static final Long MILLIS_TO_NANOS = 1_000_000L;
private static final Integer ERROR = 1;

/**
* Accepts a span in {@code Zipkin V2} format and returns a span in {@code Datadog} format.
*/
public static DatadogSpan fromZipkinV2(zipkin2.Span zipkin) {
Integer error = isError(zipkin);
BigInteger parentId = hexadecimalToDecimal(zipkin.parentId());
var spanId = hexadecimalToDecimal(zipkin.id());
var traceId = hexadecimalToDecimal(zipkin.traceId());

return new DatadogSpan(
traceId,
spanId,
parentId,
toNanos(zipkin.timestamp()),
toNanos(zipkin.duration()),
error,
tags(zipkin.tags(), zipkin.annotations(), zipkin.kind()),
emptyMap(),
valueOrDefault(zipkin.name(), "span"),
valueOrDefault(zipkin.name(), "resource"), // TODO: maybe derive resource from tags? http.method + http.path?
zipkin.localServiceName(),
"web"
);
}

private static Map<String, String> tags(Map<String, String> tags, List<Annotation> annotations, Span.Kind kind) {
Map<String, String> collected = new HashMap<>();

if (kind != null) {
collected.put("span.kind", kind.name());
}

tags.forEach(collected::put);

return collected;
}

private static String valueOrDefault(String input, String defaultValue) {
return input != null ? input : defaultValue;
}

private static Long toNanos(Long timestamp) {
return timestamp != null ? timestamp * MILLIS_TO_NANOS : null;
}

private static Long toMillis(Long timestamp) {
return timestamp != null ? timestamp / MILLIS_TO_NANOS : null;
}

private static BigInteger hexadecimalToDecimal(String input) {
// B3 accepts ids with different sizes (64 or 128 bits).
// To make this work we truncate larger than 64 bit ones (16 chars).
if (input == null) {
return null;
} else if (input.length() > 16) {
return new BigInteger(input.substring(16), 16);
} else {
return new BigInteger(input, 16);
}
}

private static Integer isError(Span zipkin) {
return zipkin.tags().containsKey("error") ? ERROR : null;
}

private static boolean isError(DatadogSpan datadog) {
return ERROR.equals(datadog.error());
}

public static Span toZipkin(DatadogSpan datadogSpan) {
var builder = Span.newBuilder();

if (hasText(datadogSpan.name())) {
builder.localEndpoint(Endpoint.newBuilder()
.serviceName(datadogSpan.service())
.build());
}

builder.name(datadogSpan.name());

if (isError(datadogSpan)) {
builder.putTag("error", "error");
}
if (datadogSpan.meta() != null) {
datadogSpan.meta().forEach(builder::putTag);
}

if (datadogSpan.type() != null) {
builder.putTag("type", datadogSpan.type());
}
if (datadogSpan.traceId() != null) {
builder.traceId(decimalToHexadecimal(datadogSpan.traceId()));
}
if (datadogSpan.spanId() != null) {
builder.id(decimalToHexadecimal(datadogSpan.spanId()));
}
if (datadogSpan.parentId() != null) {
builder.parentId(decimalToHexadecimal(datadogSpan.parentId()));
}
if (datadogSpan.start() != null) {
builder.timestamp(toMillis(datadogSpan.start()));
}
if (datadogSpan.duration() != null) {
builder.duration(toMillis(datadogSpan.duration()));
}
// TODO: annotations, resource_name, ...

return builder.build();
}

/**
* Zipkin trace ids are 64 or 128 bits represented as 16 or 32 hex characters with '0' left padding
* We always use 64 bits (16 chars) to maintain compatibility with Datadog.
*/
private static String decimalToHexadecimal(BigInteger id) {
return String.format("%016x", id);
}
}
Loading

0 comments on commit 4fad766

Please sign in to comment.