Skip to content

Commit

Permalink
Validate MockTracingTelemetry span state on shutdown. (#9818)
Browse files Browse the repository at this point in the history
* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Sep 6, 2023
1 parent 81d2ae1 commit 9b26bd9
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.store.MockFSIndexStore;
import org.opensearch.test.telemetry.MockTelemetryPlugin;
import org.opensearch.test.telemetry.tracing.StrictCheckSpanProcessor;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
Expand Down Expand Up @@ -2295,7 +2296,9 @@ public static void afterClass() throws Exception {
INSTANCE.printTestMessage("cleaning up after");
INSTANCE.afterInternal(true);
checkStaticState(true);
StrictCheckSpanProcessor.validateTracingStateOnShutdown();
}

} finally {
SUITE_SEED = null;
currentCluster = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.telemetry.MockTelemetryPlugin;
import org.opensearch.test.telemetry.tracing.StrictCheckSpanProcessor;
import org.opensearch.transport.TransportSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -190,6 +191,7 @@ public static void setUpClass() throws Exception {
@AfterClass
public static void tearDownClass() throws Exception {
stopNode();
StrictCheckSpanProcessor.validateTracingStateOnShutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,22 @@
import org.opensearch.telemetry.metrics.MetricsTelemetry;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

/**
* Mock {@link Telemetry} implementation for testing.
*/
public class MockTelemetry implements Telemetry {
private final ThreadPool threadPool;

/**
* Constructor with settings.
* @param settings telemetry settings.
*/
public MockTelemetry(TelemetrySettings settings) {
this(settings, null);
}

/**
* Constructor with settings.
* @param settings telemetry settings.
* @param threadPool thread pool to watch for termination
*/
public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public TracingTelemetry getTracingTelemetry() {
return new MockTracingTelemetry(() -> {
// There could be some asynchronous tasks running that we should await for before the closing
// up the tracer instance.
if (threadPool != null) {
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
return new MockTracingTelemetry();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,18 @@

package org.opensearch.test.telemetry;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Mock {@link TelemetryPlugin} implementation for testing.
*/
public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin {
private static final String MOCK_TRACER_NAME = "mock";
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();

/**
* Base constructor.
Expand All @@ -44,27 +28,9 @@ public MockTelemetryPlugin() {

}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool.set(threadPool);
return Collections.emptyList();
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(new MockTelemetry(settings, threadPool.get()));
return Optional.of(new MockTelemetry(settings));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,28 @@
import org.opensearch.telemetry.tracing.TracingContextPropagator;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.test.telemetry.tracing.validators.AllSpansAreEndedProperly;
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveUniqueId;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Mock {@link TracingTelemetry} implementation for testing.
*/
public class MockTracingTelemetry implements TracingTelemetry {

private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor();
private final Runnable onClose;
private final AtomicBoolean shutdown = new AtomicBoolean(false);

/**
* Base constructor.
*/
public MockTracingTelemetry() {
this(() -> {});
}

/**
* Base constructor.
*
* @param onClose on close hook
*/
public MockTracingTelemetry(final Runnable onClose) {
this.onClose = onClose;
}
public MockTracingTelemetry() {}

@Override
public Span createSpan(String spanName, Span parentSpan, Attributes attributes) {
Span span = new MockSpan(spanName, parentSpan, spanProcessor, attributes);
spanProcessor.onStart(span);
if (shutdown.get() == false) {
spanProcessor.onStart(span);
}
return span;
}

Expand All @@ -56,15 +44,7 @@ public TracingContextPropagator getContextPropagator() {

@Override
public void close() {
// Run onClose hook
onClose.run();

List<MockSpanData> spanData = ((StrictCheckSpanProcessor) spanProcessor).getFinishedSpanItems();
if (spanData.size() != 0) {
TelemetryValidators validators = new TelemetryValidators(
Arrays.asList(new AllSpansAreEndedProperly(), new AllSpansHaveUniqueId())
);
validators.validate(spanData, 1);
}
shutdown.set(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
package org.opensearch.test.telemetry.tracing;

import org.opensearch.telemetry.tracing.Span;
import org.opensearch.test.telemetry.tracing.validators.AllSpansAreEndedProperly;
import org.opensearch.test.telemetry.tracing.validators.AllSpansHaveUniqueId;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -61,4 +64,23 @@ private MockSpanData toMockSpanData(Span span) {
);
return spanData;
}

/**
* Ensures the strict check succeeds for all the spans.
*/
public static void validateTracingStateOnShutdown() {
List<MockSpanData> spanData = new ArrayList<>(spanMap.values());
if (spanData.size() != 0) {
TelemetryValidators validators = new TelemetryValidators(
Arrays.asList(new AllSpansAreEndedProperly(), new AllSpansHaveUniqueId())
);
try {
validators.validate(spanData, 1);
} catch (Error e) {
spanMap.clear();
throw e;
}
}

}
}

0 comments on commit 9b26bd9

Please sign in to comment.