Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding new inferred sampler #12927

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Tracing Framework] Adds support for inferred sampling ([#12315](https://github.com/opensearch-project/OpenSearch/issues/12315))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,24 @@

private TransportAddress remoteAddress;

private boolean sampled;

public void remoteAddress(TransportAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}

public void markResponseAsSampled() {
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
this.sampled = true;
}

Check warning on line 56 in libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java

View check run for this annotation

Codecov / codecov/patch

libs/core/src/main/java/org/opensearch/core/transport/TransportMessage.java#L55-L56

Added lines #L55 - L56 were not covered by tests

public TransportAddress remoteAddress() {
return remoteAddress;
}

public boolean sampled() {
return sampled;
}

/**
* Constructs a new empty transport message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@
} else {
parentSpan = getCurrentSpanInternal();
}

Span span = createSpan(context, parentSpan);
addDefaultAttributes(span);
addDefaultAttributes(parentSpan, span);
return span;
}

Expand Down Expand Up @@ -97,14 +98,33 @@
* Adds default attributes in the span
* @param span the current active span
*/
protected void addDefaultAttributes(Span span) {
protected void addDefaultAttributes(Span parentSpan, Span span) {
addCommonParentAttributes(parentSpan, span);
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
span.addAttribute(THREAD_NAME, Thread.currentThread().getName());
}

@Override
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
addRequestAttributeToContext(spanCreationContext, headers);
return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null)));
}

private void addRequestAttributeToContext(SpanCreationContext spanCreationContext, Map<String, Collection<String>> headers) {
if (headers != null && headers.containsKey(TracerContextStorage.INFERRED_SAMPLER)) {
spanCreationContext.getAttributes().addAttribute(TracerContextStorage.INFERRED_SAMPLER, true);

Check warning on line 115 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java#L115

Added line #L115 was not covered by tests
}
}

private void addCommonParentAttributes(Span parentSpan, Span currentSpan) {
// This work as common attribute propagator from parent to child
if (parentSpan != null) {
Optional<Boolean> inferredAttribute = Optional.ofNullable(
parentSpan.getAttributeBoolean(TracerContextStorage.INFERRED_SAMPLER)
);
if (inferredAttribute.isPresent()) {
currentSpan.addAttribute(TracerContextStorage.INFERRED_SAMPLER, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,31 @@ public interface Span {
*/
String getSpanId();

/**
* *
* @param key for which we need to look for value
* @return string attribute value
*/
String getAttributeString(String key);
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved

/**
* *
* @param key for which we need to look for value
* @return Boolean attribute value
*/
Boolean getAttributeBoolean(String key);

/**
* *
* @param key for which we need to look for value
* @return Long attribute value
*/
Long getAttributeLong(String key);

/**
* *
* @param key for which we need to look for value
* @return Double attribute value
*/
Double getAttributeDouble(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ public interface TracerContextStorage<K, V> {
*/
String CURRENT_SPAN = "current_span";

/**
* Key for storing sample information
*/
String SAMPLED = "sampled";
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Key for storing inferred Sampling information
*/
String INFERRED_SAMPLER = "Inferred_sampler";

/**
* Fetches value corresponding to key
* @param key of the tracing context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,24 @@
public String getSpanId() {
return "noop-span-id";
}

@Override
public String getAttributeString(String key) {
return "";

Check warning on line 88 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L88

Added line #L88 was not covered by tests
}

@Override
public Boolean getAttributeBoolean(String key) {
return false;

Check warning on line 93 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L93

Added line #L93 was not covered by tests
}

@Override
public Long getAttributeLong(String key) {
return 0L;

Check warning on line 98 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L98

Added line #L98 was not covered by tests
}

@Override
public Double getAttributeDouble(String key) {
return 0.0;

Check warning on line 103 in libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java

View check run for this annotation

Codecov / codecov/patch

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopSpan.java#L103

Added line #L103 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.processor.OtelSpanProcessor;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

Expand Down Expand Up @@ -117,7 +118,11 @@ private static SdkTracerProvider createSdkTracerProvider(
.build();
}

private static BatchSpanProcessor spanProcessor(Settings settings, SpanExporter spanExporter) {
private static OtelSpanProcessor spanProcessor(Settings settings, SpanExporter spanExporter) {
return new OtelSpanProcessor(batchSpanProcessor(settings, spanExporter));
}

private static BatchSpanProcessor batchSpanProcessor(Settings settings, SpanExporter spanExporter) {
return BatchSpanProcessor.builder(spanExporter)
.setScheduleDelay(TRACER_EXPORTER_DELAY_SETTING.get(settings).getSeconds(), TimeUnit.SECONDS)
.setMaxExportBatchSize(TRACER_EXPORTER_BATCH_SIZE_SETTING.get(settings))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

package org.opensearch.telemetry.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.ReadableSpan;

/**
* Default implementation of {@link Span} using Otel span. It keeps a reference of OpenTelemetry Span and handles span
Expand All @@ -32,9 +34,20 @@

@Override
public void endSpan() {
if (getAttributeBoolean(TracerContextStorage.SAMPLED) != null && getAttributeBoolean(TracerContextStorage.SAMPLED)) {
markParentForSampling();

Check warning on line 38 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L38

Added line #L38 was not covered by tests
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
}
delegateSpan.end();
}

private void markParentForSampling() {
org.opensearch.telemetry.tracing.Span current_parent = getParentSpan();

Check warning on line 44 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L44

Added line #L44 was not covered by tests
nishchay21 marked this conversation as resolved.
Show resolved Hide resolved
while (current_parent != null && current_parent.getAttributeBoolean(TracerContextStorage.SAMPLED) == null) {
current_parent.addAttribute(TracerContextStorage.SAMPLED, true);
current_parent = current_parent.getParentSpan();

Check warning on line 47 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L46-L47

Added lines #L46 - L47 were not covered by tests
}
}

Check warning on line 49 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L49

Added line #L49 was not covered by tests

@Override
public void addAttribute(String key, String value) {
delegateSpan.setAttribute(key, value);
Expand Down Expand Up @@ -77,8 +90,43 @@
return delegateSpan.getSpanContext().getSpanId();
}

@Override
public String getAttributeString(String key) {
if (delegateSpan != null && delegateSpan instanceof ReadableSpan) return ((ReadableSpan) delegateSpan).getAttribute(
AttributeKey.stringKey(key)

Check warning on line 96 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L96

Added line #L96 was not covered by tests
);

return null;

Check warning on line 99 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L99

Added line #L99 was not covered by tests
}

@Override
public Boolean getAttributeBoolean(String key) {
if (delegateSpan != null && delegateSpan instanceof ReadableSpan) {
return ((ReadableSpan) delegateSpan).getAttribute(AttributeKey.booleanKey(key));

Check warning on line 105 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L105

Added line #L105 was not covered by tests
}

return null;
}

@Override
public Long getAttributeLong(String key) {
if (delegateSpan != null && delegateSpan instanceof ReadableSpan) return ((ReadableSpan) delegateSpan).getAttribute(
AttributeKey.longKey(key)

Check warning on line 114 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L114

Added line #L114 was not covered by tests
);

return null;

Check warning on line 117 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L117

Added line #L117 was not covered by tests
}

@Override
public Double getAttributeDouble(String key) {
if (delegateSpan != null && delegateSpan instanceof ReadableSpan) return ((ReadableSpan) delegateSpan).getAttribute(
AttributeKey.doubleKey(key)

Check warning on line 123 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L123

Added line #L123 was not covered by tests
);

return null;

Check warning on line 126 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelSpan.java#L126

Added line #L126 was not covered by tests
}

io.opentelemetry.api.trace.Span getDelegateSpan() {
return delegateSpan;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private Span createOtelSpan(SpanCreationContext spanCreationContext, Span parent
OTelAttributesConverter.convert(spanCreationContext.getAttributes()),
OTelSpanKindConverter.convert(spanCreationContext.getSpanKind())
);

Span newSpan = new OTelSpan(spanCreationContext.getSpanName(), otelSpan, parentSpan);

return newSpan;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.processor;

import org.opensearch.telemetry.tracing.TracerContextStorage;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;

/**
* Implementation of the SpanProcessor and delegates to the configured processor.
*/
public class OtelSpanProcessor implements SpanProcessor {

private final SpanProcessor delegateProcessor;

/**
* *
* @param delegateProcessor the span processor to which this processor will delegate
*/
public OtelSpanProcessor(SpanProcessor delegateProcessor) {
this.delegateProcessor = delegateProcessor;
}

/**
* Called when a {@link Span} is started, if the {@link
* Span#isRecording()} returns true.
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* @param parentContext the parent {@code Context} of the span that just started.
* @param span the {@code Span} that just started.
*/
@Override
public void onStart(Context parentContext, ReadWriteSpan span) {
this.delegateProcessor.onStart(parentContext, span);
}

Check warning on line 49 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L48-L49

Added lines #L48 - L49 were not covered by tests

/**
* Returns {@code true} if this {@link SpanProcessor} requires start events.
*
* @return {@code true} if this {@link SpanProcessor} requires start events.
*/
@Override
public boolean isStartRequired() {
return this.delegateProcessor.isStartRequired();

Check warning on line 58 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L58

Added line #L58 was not covered by tests
}

/**
* Called when a {@link Span} is ended, if the {@link
* Span#isRecording()} returns true.
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* @param span the {@code Span} that just ended.
*/
@Override
public void onEnd(ReadableSpan span) {
if (span != null
&& span.getSpanContext().isSampled()
&& Boolean.TRUE.equals(span.getAttribute(AttributeKey.booleanKey(TracerContextStorage.INFERRED_SAMPLER)))) {
if (span.getAttribute(AttributeKey.booleanKey(TracerContextStorage.SAMPLED)) != null) {
this.delegateProcessor.onEnd(span);

Check warning on line 76 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L76

Added line #L76 was not covered by tests
}
} else {
this.delegateProcessor.onEnd(span);

Check warning on line 79 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L79

Added line #L79 was not covered by tests
}
}

Check warning on line 81 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L81

Added line #L81 was not covered by tests

/**
* Returns {@code true} if this {@link SpanProcessor} requires end events.
*
* @return {@code true} if this {@link SpanProcessor} requires end events.
*/
@Override
public boolean isEndRequired() {
return this.delegateProcessor.isEndRequired();

Check warning on line 90 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L90

Added line #L90 was not covered by tests
}

/**
* Processes all span events that have not yet been processed and closes used resources.
*
* @return a {@link CompletableResultCode} which completes when shutdown is finished.
*/
@Override
public CompletableResultCode shutdown() {
return this.delegateProcessor.shutdown();
}

/**
* Processes all span events that have not yet been processed.
*
* @return a {@link CompletableResultCode} which completes when currently queued spans are
* finished processing.
*/
@Override
public CompletableResultCode forceFlush() {
return this.delegateProcessor.forceFlush();

Check warning on line 111 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/processor/OtelSpanProcessor.java#L111

Added line #L111 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for processor.
*/
package org.opensearch.telemetry.tracing.processor;
Loading
Loading