diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java index 1c7ae4302c73d..6d346c3bbac65 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticSpan.java @@ -24,6 +24,7 @@ public class DiagnosticSpan implements Span { private final Map baggage; private final Map metricMap; + private final static String SPAN_NAME = "span_name"; /** * Constructs a DiagnosticSpan object with the specified underlying span. * @@ -33,6 +34,7 @@ public DiagnosticSpan(Span span) { this.delegateSpan = span; this.baggage = new HashMap<>(); this.metricMap = new HashMap<>(); + baggage.put(SPAN_NAME, span.getSpanName()); } /** diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java index b0a09974bff9d..629567f344614 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/DiagnosticsEventListener.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.telemetry.metrics.Measurement; import org.opensearch.telemetry.metrics.MetricPoint; import org.opensearch.telemetry.metrics.MetricEmitter; import org.opensearch.telemetry.tracing.listeners.TraceEventListener; @@ -17,6 +18,10 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.listeners.TraceEventsService; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * One of the pre-defined TraceEventListener for recording the thread usage using {@link ThreadResourceRecorder} and * emitting metrics using {@link MetricEmitter}. @@ -25,13 +30,16 @@ * this class records the resource consumption of the complete trace using provided {@link ThreadResourceRecorder} and emits corresponding metrics using * {@link MetricEmitter}. * The span created by {@link org.opensearch.telemetry.tracing.TracingTelemetry#createSpan(String, Span)} must be wrapped with {@link DiagnosticSpan} - * using {@link TraceEventsService#wrapSpan(Span)} + * using {@link TraceEventsService#wrapWithDiagnosticSpan(Span)} */ public class DiagnosticsEventListener implements TraceEventListener { private static final Logger logger = LogManager.getLogger(DiagnosticsEventListener.class); private final ThreadResourceRecorder threadResourceRecorder; private final MetricEmitter metricEmitter; + public final static String START_SPAN_TIME = "start_span_time"; + public final static String ELAPSED_TIME = "elapsed_time"; + /** * Constructs a new DiagnosticsTraceEventListener with the specified tracer, thread resource recorder, * and metric emitter. @@ -55,7 +63,9 @@ public void onSpanStart(Span span, Thread t) { if (!ensureDiagnosticSpan(span)) { return; } - threadResourceRecorder.startRecording((DiagnosticSpan) span, t); + DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span; + threadResourceRecorder.startRecording(diagnosticSpan, t, true); + diagnosticSpan.putMetric(START_SPAN_TIME, new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis())); } /** @@ -70,13 +80,9 @@ public void onSpanComplete(Span span, Thread t) { if (!ensureDiagnosticSpan(span)) { return; } - MetricPoint diffMetric = threadResourceRecorder.endRecording((DiagnosticSpan) span, t, true); - MetricPoint endMetric = new MetricPoint( - diffMetric.getMeasurements(), - ((DiagnosticSpan) span).getAttributes(), - diffMetric.getObservationTime() - ); - metricEmitter.emitMetric(endMetric); + DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span; + MetricPoint diffMetric = threadResourceRecorder.endRecording(diagnosticSpan, t, true); + metricEmitter.emitMetric(addElapsedTimeMeasurement(diagnosticSpan, diffMetric)); } /** @@ -91,7 +97,7 @@ public void onRunnableStart(Span span, Thread t) { if (!ensureDiagnosticSpan(span)) { return; } - threadResourceRecorder.startRecording((DiagnosticSpan) span, t); + threadResourceRecorder.startRecording((DiagnosticSpan) span, t, false); } /** @@ -128,4 +134,16 @@ private boolean ensureDiagnosticSpan(Span span) { return false; } } + + private MetricPoint addElapsedTimeMeasurement(DiagnosticSpan span, MetricPoint diffMetric) { + long elapsedTime = System.currentTimeMillis() - span.removeMetric(START_SPAN_TIME).getObservationTime(); + Measurement elapsedTimeMeasurement = new Measurement<>(ELAPSED_TIME, elapsedTime); + Map> diffMeasurements = new HashMap<>(diffMetric.getMeasurements()); + diffMeasurements.put(ELAPSED_TIME, elapsedTimeMeasurement); + return new MetricPoint( + diffMeasurements, + span.getAttributes(), + diffMetric.getObservationTime() + ); + } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java index 2bb157fcd8166..6da1f23693e34 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/diagnostics/ThreadResourceRecorder.java @@ -19,7 +19,7 @@ * It abstracts out the diff logic between gauge {@link MetricPoint} reported by {@link ThreadResourceRecorder#startRecording} and * {@link ThreadResourceRecorder#endRecording} when endRecording is invoked. * Implementation of this class should be thread-safe. - * It maintains the state between {@link #startRecording(DiagnosticSpan, Thread)} and {@link #endRecording(DiagnosticSpan, Thread, boolean)} + * It maintains the state between {@link #startRecording(DiagnosticSpan, Thread, boolean)} and {@link #endRecording(DiagnosticSpan, Thread, boolean)} * using {@link DiagnosticSpan#putMetric(String, MetricPoint)} assuming {@link Span} context propagation is taken care by tracing framework. * @param the type of ThreadResourceObserver */ @@ -42,10 +42,11 @@ public ThreadResourceRecorder(T observer) { * Starts recording the metric for the given Span and thread. * The observation is obtained from the associated ThreadResourceObserver. * - * @param span the DiagnosticSpan to record the metric for - * @param t the thread for which to record the metric + * @param span the DiagnosticSpan to record the metric for + * @param t the thread for which to record the metric + * @param startSpanEvent true if it is invoked as a result of start span trace event */ - public void startRecording(DiagnosticSpan span, Thread t) { + public void startRecording(DiagnosticSpan span, Thread t, boolean startSpanEvent) { MetricPoint observation = observer.observe(t); span.putMetric(String.valueOf(t.getId()), observation); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java index 2e79c8bc6803e..62af88b0c6a3a 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricPoint.java @@ -27,7 +27,7 @@ public class MetricPoint { * @throws IllegalArgumentException if any of the input parameters are null */ public MetricPoint(Map> measurements, Map attributes, long observationTime) { - if (measurements == null || measurements.isEmpty()) { + if (measurements == null) { throw new IllegalArgumentException("Measurements cannot be null"); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TraceEventsRunnable.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java similarity index 80% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TraceEventsRunnable.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java index 46cd49cf4f082..4a3f5501b8965 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TraceEventsRunnable.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsRunnable.java @@ -6,14 +6,11 @@ * compatible open source license. */ -package org.opensearch.telemetry.tracing.listeners.wrappers; +package org.opensearch.telemetry.tracing.listeners; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.telemetry.tracing.listeners.RunnableEventListener; -import org.opensearch.telemetry.tracing.listeners.TraceEventListener; import org.opensearch.telemetry.tracing.Span; -import org.opensearch.telemetry.tracing.listeners.TraceEventsService; /** * Runnable implementation that wraps another Runnable and adds trace event listener functionality. @@ -31,7 +28,7 @@ public class TraceEventsRunnable implements Runnable { * @param delegate the underlying Runnable to be executed * @param traceEventsService traceEventListenerService */ - public TraceEventsRunnable(Runnable delegate, TraceEventsService traceEventsService) { + TraceEventsRunnable(Runnable delegate, TraceEventsService traceEventsService) { this.delegate = delegate; this.traceEventsService = traceEventsService; } @@ -47,9 +44,7 @@ public void run() { Span span = traceEventsService.getTracer().getCurrentSpan(); // repeat it for all the spans in the hierarchy while (span != null) { - if (span.hasEnded()) { - logger.debug("TraceEventsRunnable is invoked post span completion", new Throwable()); - } else { + if (!span.hasEnded()) { Span finalSpan = span; traceEventsService.executeListeners( span, @@ -68,9 +63,7 @@ public void run() { if (traceEventsService.isTracingEnabled()) { Span span = traceEventsService.getTracer().getCurrentSpan(); while (span != null) { - if (span.hasEnded()) { - logger.debug("TraceEventsRunnable is invoked post span completion", new Throwable()); - } else { + if (!span.hasEnded()) { Span finalSpan = span; traceEventsService.executeListeners( span, diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java index 58f443dffb11e..b7f02a6b57e3f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TraceEventsService.java @@ -14,8 +14,6 @@ import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.listeners.wrappers.TraceEventsRunnable; -import org.opensearch.telemetry.tracing.listeners.wrappers.TracerWrapper; import java.util.HashMap; import java.util.Map; @@ -26,11 +24,9 @@ /** * The TraceEventService manages trace event listeners and provides their registration and de-registration functionality. - * It allows components to register themselves as consumers of trace event listeners and notifies them when a new - * trace event listener is registered or deregistered or a trace related settings has changed. * * It also provides wrapper utility to wrap {@link Tracer} using {@link #wrapAndSetTracer(Tracer)}, Runnable using {@link #wrapRunnable(Runnable)} - * and {@link Span} using {@link #wrapSpan(Span)}. + * and {@link Span} using {@link #wrapWithDiagnosticSpan(Span)}. * * This is the core service for trace event listeners and must be instantiated at application start. * Once the telemetry and tracing is instantiated, this service should be used to wrap Tracer, Span and Runnables. Wrap will not have any effect until @@ -39,7 +35,7 @@ * The application must ensure that this service is updated with the latest set of TraceEventListener and latest value of {@link #tracingEnabled} and {@link #diagnosisEnabled} * are set, or it may produce undesirable results. */ -public class TraceEventsService { +public final class TraceEventsService { private volatile Map traceEventListeners; private volatile Tracer tracer; @@ -194,7 +190,7 @@ public Tracer unwrapTracer(TracerWrapper tracerWrapper) { * @param span the Span to wrap * @return the wrapped DiagnosticSpan */ - public static DiagnosticSpan wrapSpan(Span span) { + public static DiagnosticSpan wrapWithDiagnosticSpan(Span span) { return new DiagnosticSpan(span); } @@ -203,7 +199,7 @@ public static DiagnosticSpan wrapSpan(Span span) { * @param span associated span * @param listenerMethod the listener method to be invoked */ - public void executeListeners(Span span, Consumer listenerMethod) { + void executeListeners(Span span, Consumer listenerMethod) { if (span == null || traceEventListeners == null) { return; } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TracerWrapper.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java similarity index 94% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TracerWrapper.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java index d1e509a2f1f57..64ff17ba32eff 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/TracerWrapper.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/TracerWrapper.java @@ -6,15 +6,13 @@ * compatible open source license. */ -package org.opensearch.telemetry.tracing.listeners.wrappers; +package org.opensearch.telemetry.tracing.listeners; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.telemetry.tracing.listeners.SpanEventListener; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import java.io.IOException; import java.util.Collections; @@ -41,7 +39,7 @@ public class TracerWrapper implements Tracer { * @param delegate the underlying Tracer implementation * @param traceEventsService traceEventListenerService */ - public TracerWrapper(Tracer delegate, TraceEventsService traceEventsService) { + TracerWrapper(Tracer delegate, TraceEventsService traceEventsService) { assert delegate != null; this.tracer = delegate; this.traceEventsService = traceEventsService; diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/package-info.java deleted file mode 100644 index 51ca5512e7a4f..0000000000000 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/listeners/wrappers/package-info.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ -/** - * contains all wrapper classes for TraceEvents - */ -package org.opensearch.telemetry.tracing.listeners.wrappers;