Skip to content

Commit

Permalink
Add splitExportTraceServiceRequest API to OTelProtoDecoder (opensearc…
Browse files Browse the repository at this point in the history
…h-project#3600)

* Add splitExportTraceServiceRequest API to OTelProtoDecoder

Signed-off-by: Krishna Kondaka <[email protected]>

* Renamed the API

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed code and modified test case

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed check style test

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Nov 10, 2023
1 parent bc504fd commit ca6f86a
Show file tree
Hide file tree
Showing 3 changed files with 402 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,26 @@ public List<Span> parseExportTraceServiceRequest(final ExportTraceServiceRequest
.flatMap(rs -> parseResourceSpans(rs).stream()).collect(Collectors.toList());
}

public Map<String, ExportTraceServiceRequest> splitExportTraceServiceRequestByTraceId(final ExportTraceServiceRequest exportTraceServiceRequest) {
Map<String, ExportTraceServiceRequest> result = new HashMap<>();
Map<String, ExportTraceServiceRequest.Builder> resultBuilderMap = new HashMap<>();
for (final ResourceSpans resourceSpans: exportTraceServiceRequest.getResourceSpansList()) {
for (Map.Entry<String, ResourceSpans> entry: splitResourceSpansByTraceId(resourceSpans).entrySet()) {
String traceId = entry.getKey();

if (resultBuilderMap.containsKey(traceId)) {
resultBuilderMap.get(traceId).addResourceSpans(entry.getValue());
} else {
resultBuilderMap.put(traceId, ExportTraceServiceRequest.newBuilder().addResourceSpans(entry.getValue()));
}
}
}
for (Map.Entry<String, ExportTraceServiceRequest.Builder> entry: resultBuilderMap.entrySet()) {
result.put(entry.getKey(), entry.getValue().build());
}
return result;
}

public List<OpenTelemetryLog> parseExportLogsServiceRequest(final ExportLogsServiceRequest exportLogsServiceRequest) {
return exportLogsServiceRequest.getResourceLogsList().stream()
.flatMap(rs -> parseResourceLogs(rs).stream()).collect(Collectors.toList());
Expand Down Expand Up @@ -199,6 +219,38 @@ protected Collection<OpenTelemetryLog> parseResourceLogs(ResourceLogs rs) {
return Stream.concat(mappedInstrumentationLibraryLogs, mappedScopeListLogs).collect(Collectors.toList());
}

protected Map<String, ResourceSpans> splitResourceSpansByTraceId(final ResourceSpans resourceSpans) {
final Resource resource = resourceSpans.getResource();
Map<String, ResourceSpans> result = new HashMap<>();
Map<String, ResourceSpans.Builder> resultBuilderMap = new HashMap<>();

if (resourceSpans.getScopeSpansList().size() > 0) {
for (Map.Entry<String, List<ScopeSpans>> entry: splitScopeSpansByTraceId(resourceSpans.getScopeSpansList()).entrySet()) {
ResourceSpans.Builder b = ResourceSpans.newBuilder().setResource(resource).addAllScopeSpans(entry.getValue());
resultBuilderMap.put(entry.getKey(), b);
}
}

if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) {
for (Map.Entry<String, List<InstrumentationLibrarySpans>> entry: splitInstrumentationLibrarySpansByTraceId(resourceSpans.getInstrumentationLibrarySpansList()).entrySet()) {
ResourceSpans.Builder resourceSpansBuilder;
String traceId = entry.getKey();
if (resultBuilderMap.containsKey(traceId)) {
resourceSpansBuilder = resultBuilderMap.get(traceId);
} else {
resourceSpansBuilder = ResourceSpans.newBuilder().setResource(resource);
resultBuilderMap.put(traceId, resourceSpansBuilder);
}
resourceSpansBuilder.addAllInstrumentationLibrarySpans(entry.getValue());
}
}
for (Map.Entry<String, ResourceSpans.Builder> entry: resultBuilderMap.entrySet()) {
result.put(entry.getKey(), entry.getValue().build());
}

return result;
}

protected List<Span> parseResourceSpans(final ResourceSpans resourceSpans) {
final String serviceName = getServiceName(resourceSpans.getResource()).orElse(null);
final Map<String, Object> resourceAttributes = getResourceAttributes(resourceSpans.getResource());
Expand All @@ -223,6 +275,21 @@ private List<Span> parseScopeSpans(final List<ScopeSpans> scopeSpansList, final
.collect(Collectors.toList());
}

private Map<String, List<ScopeSpans>> splitScopeSpansByTraceId(final List<ScopeSpans> scopeSpansList) {
Map<String, List<ScopeSpans>> result = new HashMap<>();
for (ScopeSpans ss: scopeSpansList) {
for (Map.Entry<String, List<io.opentelemetry.proto.trace.v1.Span>> entry: splitSpansByTraceId(ss.getSpansList()).entrySet()) {
ScopeSpans.Builder scopeSpansBuilder = ScopeSpans.newBuilder().setScope(ss.getScope()).addAllSpans(entry.getValue());
String traceId = entry.getKey();
if (!result.containsKey(traceId)) {
result.put(traceId, new ArrayList<>());
}
result.get(traceId).add(scopeSpansBuilder.build());
}
}
return result;
}

private List<Span> parseInstrumentationLibrarySpans(final List<InstrumentationLibrarySpans> instrumentationLibrarySpansList,
final String serviceName, final Map<String, Object> resourceAttributes) {
return instrumentationLibrarySpansList.stream()
Expand All @@ -233,6 +300,38 @@ private List<Span> parseInstrumentationLibrarySpans(final List<InstrumentationLi
.collect(Collectors.toList());
}

private Map<String, List<InstrumentationLibrarySpans>> splitInstrumentationLibrarySpansByTraceId(final List<InstrumentationLibrarySpans> instrumentationLibrarySpansList) {
Map<String, List<InstrumentationLibrarySpans>> result = new HashMap<>();
for (InstrumentationLibrarySpans is: instrumentationLibrarySpansList) {
for (Map.Entry<String, List<io.opentelemetry.proto.trace.v1.Span>> entry: splitSpansByTraceId(is.getSpansList()).entrySet()) {
String traceId = entry.getKey();
InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setInstrumentationLibrary(is.getInstrumentationLibrary()).addAllSpans(entry.getValue());
if (!result.containsKey(traceId)) {
result.put(traceId, new ArrayList<>());
}
result.get(traceId).add(ilSpansBuilder.build());
}
}
return result;
}


private Map<String, List<io.opentelemetry.proto.trace.v1.Span>> splitSpansByTraceId(final List<io.opentelemetry.proto.trace.v1.Span> spans) {
Map<String, List<io.opentelemetry.proto.trace.v1.Span>> result = new HashMap<>();
for (io.opentelemetry.proto.trace.v1.Span span: spans) {
String traceId = convertByteStringToString(span.getTraceId());
List<io.opentelemetry.proto.trace.v1.Span> spanList;
if (result.containsKey(traceId)) {
spanList = result.get(traceId);
} else {
spanList = new ArrayList<>();
result.put(traceId, spanList);
}
spanList.add(span);
}
return result;
}

private <T> List<Span> parseSpans(final List<io.opentelemetry.proto.trace.v1.Span> spans, final T scope,
final Function<T, Map<String, Object>> scopeAttributesGetter,
final String serviceName, final Map<String, Object> resourceAttributes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -98,6 +99,7 @@ public class OTelProtoCodecTest {
private static final String TEST_REQUEST_HISTOGRAM_METRICS_JSON_FILE = "test-histogram-metrics.json";
private static final String TEST_REQUEST_LOGS_JSON_FILE = "test-request-log.json";
private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json";
private static final String TEST_REQUEST_MULTIPLE_TRACES_FILE = "test-request-multiple-traces.json";


private static final Long TIME = TimeUnit.MILLISECONDS.toNanos(ZonedDateTime.of(
Expand Down Expand Up @@ -153,6 +155,58 @@ private String getFileAsJsonString(String requestJsonFileName) throws IOExceptio

@Nested
class OTelProtoDecoderTest {
@Test
public void testSplitExportTraceServiceRequestWithMultipleTraces() throws Exception {
final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_MULTIPLE_TRACES_FILE);
final Map<String, ExportTraceServiceRequest> map = decoderUnderTest.splitExportTraceServiceRequestByTraceId(exportTraceServiceRequest);
assertThat(map.size(), is(equalTo(3)));
for (Map.Entry<String, ExportTraceServiceRequest> entry: map.entrySet()) {
String expectedTraceId = new String(Hex.decodeHex(entry.getKey()), StandardCharsets.UTF_8);
ExportTraceServiceRequest request = entry.getValue();
if (expectedTraceId.equals("TRACEID1")) {
assertThat(request.getResourceSpansList().size(), equalTo(1));
ResourceSpans rs = request.getResourceSpansList().get(0);
assertThat(rs.getScopeSpansList().size(), equalTo(1));
assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(0));
ScopeSpans ss = rs.getScopeSpansList().get(0);
assertThat(ss.getSpansList().size(), equalTo(1));
io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0);
String spanId = span.getSpanId().toStringUtf8();
assertTrue(spanId.equals("TRACEID1-SPAN1"));
} else if (expectedTraceId.equals("TRACEID2")) {
assertThat(request.getResourceSpansList().size(), equalTo(1));
ResourceSpans rs = request.getResourceSpansList().get(0);
assertThat(rs.getScopeSpansList().size(), equalTo(2));
assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(2));

ScopeSpans ss = rs.getScopeSpansList().get(0);
assertThat(ss.getSpansList().size(), equalTo(1));
io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0);
String spanId = span.getSpanId().toStringUtf8();
assertTrue(spanId.equals("TRACEID2-SPAN1"));

ss = rs.getScopeSpansList().get(1);
assertThat(ss.getSpansList().size(), equalTo(1));
span = ss.getSpansList().get(0);
spanId = span.getSpanId().toStringUtf8();
assertTrue(spanId.equals("TRACEID2-SPAN2"));

} else if (expectedTraceId.equals("TRACEID3")) {
assertThat(request.getResourceSpansList().size(), equalTo(1));
ResourceSpans rs = request.getResourceSpansList().get(0);
assertThat(rs.getScopeSpansList().size(), equalTo(1));
assertThat(rs.getInstrumentationLibrarySpansList().size(), equalTo(0));
ScopeSpans ss = rs.getScopeSpansList().get(0);
assertThat(ss.getSpansList().size(), equalTo(1));
io.opentelemetry.proto.trace.v1.Span span = ss.getSpansList().get(0);
String spanId = span.getSpanId().toStringUtf8();
assertTrue(spanId.equals("TRACEID3-SPAN1"));
} else {
assertTrue("Failed".equals("Unknown TraceId"));
}
}
}

@Test
public void testParseExportTraceServiceRequest() throws IOException {
final ExportTraceServiceRequest exportTraceServiceRequest = buildExportTraceServiceRequestFromJsonFile(TEST_REQUEST_TRACE_JSON_FILE);
Expand Down
Loading

0 comments on commit ca6f86a

Please sign in to comment.