Skip to content

Commit

Permalink
Code refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Jul 25, 2023
1 parent 4ea9921 commit aaf19a0
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class DiagnosticSpan implements Span {
private final Map<String, Object> baggage;
private final Map<String, MetricPoint> metricMap;

private final static String SPAN_NAME = "span_name";
/**
* Constructs a DiagnosticSpan object with the specified underlying span.
*
Expand All @@ -33,6 +34,7 @@ public DiagnosticSpan(Span span) {
this.delegateSpan = span;
this.baggage = new HashMap<>();
this.metricMap = new HashMap<>();
baggage.put(SPAN_NAME, span.getSpanName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.telemetry.metrics.Measurement;
import org.opensearch.telemetry.metrics.MetricPoint;
import org.opensearch.telemetry.metrics.MetricEmitter;
import org.opensearch.telemetry.tracing.listeners.TraceEventListener;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* One of the pre-defined TraceEventListener for recording the thread usage using {@link ThreadResourceRecorder} and
* emitting metrics using {@link MetricEmitter}.
Expand All @@ -25,13 +30,16 @@
* this class records the resource consumption of the complete trace using provided {@link ThreadResourceRecorder} and emits corresponding metrics using
* {@link MetricEmitter}.
* The span created by {@link org.opensearch.telemetry.tracing.TracingTelemetry#createSpan(String, Span)} must be wrapped with {@link DiagnosticSpan}
* using {@link TraceEventsService#wrapSpan(Span)}
* using {@link TraceEventsService#wrapWithDiagnosticSpan(Span)}
*/
public class DiagnosticsEventListener implements TraceEventListener {
private static final Logger logger = LogManager.getLogger(DiagnosticsEventListener.class);
private final ThreadResourceRecorder<?> threadResourceRecorder;
private final MetricEmitter metricEmitter;

public final static String START_SPAN_TIME = "start_span_time";
public final static String ELAPSED_TIME = "elapsed_time";

/**
* Constructs a new DiagnosticsTraceEventListener with the specified tracer, thread resource recorder,
* and metric emitter.
Expand All @@ -55,7 +63,9 @@ public void onSpanStart(Span span, Thread t) {
if (!ensureDiagnosticSpan(span)) {
return;
}
threadResourceRecorder.startRecording((DiagnosticSpan) span, t);
DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span;
threadResourceRecorder.startRecording(diagnosticSpan, t, true);
diagnosticSpan.putMetric(START_SPAN_TIME, new MetricPoint(Collections.emptyMap(), null, System.currentTimeMillis()));
}

/**
Expand All @@ -70,13 +80,9 @@ public void onSpanComplete(Span span, Thread t) {
if (!ensureDiagnosticSpan(span)) {
return;
}
MetricPoint diffMetric = threadResourceRecorder.endRecording((DiagnosticSpan) span, t, true);
MetricPoint endMetric = new MetricPoint(
diffMetric.getMeasurements(),
((DiagnosticSpan) span).getAttributes(),
diffMetric.getObservationTime()
);
metricEmitter.emitMetric(endMetric);
DiagnosticSpan diagnosticSpan = (DiagnosticSpan) span;
MetricPoint diffMetric = threadResourceRecorder.endRecording(diagnosticSpan, t, true);
metricEmitter.emitMetric(addElapsedTimeMeasurement(diagnosticSpan, diffMetric));
}

/**
Expand All @@ -91,7 +97,7 @@ public void onRunnableStart(Span span, Thread t) {
if (!ensureDiagnosticSpan(span)) {
return;
}
threadResourceRecorder.startRecording((DiagnosticSpan) span, t);
threadResourceRecorder.startRecording((DiagnosticSpan) span, t, false);
}

/**
Expand Down Expand Up @@ -128,4 +134,16 @@ private boolean ensureDiagnosticSpan(Span span) {
return false;
}
}

private MetricPoint addElapsedTimeMeasurement(DiagnosticSpan span, MetricPoint diffMetric) {
long elapsedTime = System.currentTimeMillis() - span.removeMetric(START_SPAN_TIME).getObservationTime();
Measurement<Number> elapsedTimeMeasurement = new Measurement<>(ELAPSED_TIME, elapsedTime);
Map<String, Measurement<Number>> diffMeasurements = new HashMap<>(diffMetric.getMeasurements());
diffMeasurements.put(ELAPSED_TIME, elapsedTimeMeasurement);
return new MetricPoint(
diffMeasurements,
span.getAttributes(),
diffMetric.getObservationTime()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* It abstracts out the diff logic between gauge {@link MetricPoint} reported by {@link ThreadResourceRecorder#startRecording} and
* {@link ThreadResourceRecorder#endRecording} when endRecording is invoked.
* Implementation of this class should be thread-safe.
* It maintains the state between {@link #startRecording(DiagnosticSpan, Thread)} and {@link #endRecording(DiagnosticSpan, Thread, boolean)}
* It maintains the state between {@link #startRecording(DiagnosticSpan, Thread, boolean)} and {@link #endRecording(DiagnosticSpan, Thread, boolean)}
* using {@link DiagnosticSpan#putMetric(String, MetricPoint)} assuming {@link Span} context propagation is taken care by tracing framework.
* @param <T> the type of ThreadResourceObserver
*/
Expand All @@ -42,10 +42,11 @@ public ThreadResourceRecorder(T observer) {
* Starts recording the metric for the given Span and thread.
* The observation is obtained from the associated ThreadResourceObserver.
*
* @param span the DiagnosticSpan to record the metric for
* @param t the thread for which to record the metric
* @param span the DiagnosticSpan to record the metric for
* @param t the thread for which to record the metric
* @param startSpanEvent true if it is invoked as a result of start span trace event
*/
public void startRecording(DiagnosticSpan span, Thread t) {
public void startRecording(DiagnosticSpan span, Thread t, boolean startSpanEvent) {
MetricPoint observation = observer.observe(t);
span.putMetric(String.valueOf(t.getId()), observation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class MetricPoint {
* @throws IllegalArgumentException if any of the input parameters are null
*/
public MetricPoint(Map<String, Measurement<Number>> measurements, Map<String, Object> attributes, long observationTime) {
if (measurements == null || measurements.isEmpty()) {
if (measurements == null) {
throw new IllegalArgumentException("Measurements cannot be null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.listeners.wrappers;
package org.opensearch.telemetry.tracing.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.telemetry.tracing.listeners.RunnableEventListener;
import org.opensearch.telemetry.tracing.listeners.TraceEventListener;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;

/**
* Runnable implementation that wraps another Runnable and adds trace event listener functionality.
Expand All @@ -31,7 +28,7 @@ public class TraceEventsRunnable implements Runnable {
* @param delegate the underlying Runnable to be executed
* @param traceEventsService traceEventListenerService
*/
public TraceEventsRunnable(Runnable delegate, TraceEventsService traceEventsService) {
TraceEventsRunnable(Runnable delegate, TraceEventsService traceEventsService) {
this.delegate = delegate;
this.traceEventsService = traceEventsService;
}
Expand All @@ -47,9 +44,7 @@ public void run() {
Span span = traceEventsService.getTracer().getCurrentSpan();
// repeat it for all the spans in the hierarchy
while (span != null) {
if (span.hasEnded()) {
logger.debug("TraceEventsRunnable is invoked post span completion", new Throwable());
} else {
if (!span.hasEnded()) {
Span finalSpan = span;
traceEventsService.executeListeners(
span,
Expand All @@ -68,9 +63,7 @@ public void run() {
if (traceEventsService.isTracingEnabled()) {
Span span = traceEventsService.getTracer().getCurrentSpan();
while (span != null) {
if (span.hasEnded()) {
logger.debug("TraceEventsRunnable is invoked post span completion", new Throwable());
} else {
if (!span.hasEnded()) {
Span finalSpan = span;
traceEventsService.executeListeners(
span,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listeners.wrappers.TraceEventsRunnable;
import org.opensearch.telemetry.tracing.listeners.wrappers.TracerWrapper;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -26,11 +24,9 @@

/**
* The TraceEventService manages trace event listeners and provides their registration and de-registration functionality.
* It allows components to register themselves as consumers of trace event listeners and notifies them when a new
* trace event listener is registered or deregistered or a trace related settings has changed.
*
* It also provides wrapper utility to wrap {@link Tracer} using {@link #wrapAndSetTracer(Tracer)}, Runnable using {@link #wrapRunnable(Runnable)}
* and {@link Span} using {@link #wrapSpan(Span)}.
* and {@link Span} using {@link #wrapWithDiagnosticSpan(Span)}.
*
* This is the core service for trace event listeners and must be instantiated at application start.
* Once the telemetry and tracing is instantiated, this service should be used to wrap Tracer, Span and Runnables. Wrap will not have any effect until
Expand All @@ -39,7 +35,7 @@
* The application must ensure that this service is updated with the latest set of TraceEventListener and latest value of {@link #tracingEnabled} and {@link #diagnosisEnabled}
* are set, or it may produce undesirable results.
*/
public class TraceEventsService {
public final class TraceEventsService {

private volatile Map<String, TraceEventListener> traceEventListeners;
private volatile Tracer tracer;
Expand Down Expand Up @@ -194,7 +190,7 @@ public Tracer unwrapTracer(TracerWrapper tracerWrapper) {
* @param span the Span to wrap
* @return the wrapped DiagnosticSpan
*/
public static DiagnosticSpan wrapSpan(Span span) {
public static DiagnosticSpan wrapWithDiagnosticSpan(Span span) {
return new DiagnosticSpan(span);
}

Expand All @@ -203,7 +199,7 @@ public static DiagnosticSpan wrapSpan(Span span) {
* @param span associated span
* @param listenerMethod the listener method to be invoked
*/
public void executeListeners(Span span, Consumer<TraceEventListener> listenerMethod) {
void executeListeners(Span span, Consumer<TraceEventListener> listenerMethod) {
if (span == null || traceEventListeners == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.listeners.wrappers;
package org.opensearch.telemetry.tracing.listeners;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.telemetry.tracing.listeners.SpanEventListener;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listeners.TraceEventsService;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -41,7 +39,7 @@ public class TracerWrapper implements Tracer {
* @param delegate the underlying Tracer implementation
* @param traceEventsService traceEventListenerService
*/
public TracerWrapper(Tracer delegate, TraceEventsService traceEventsService) {
TracerWrapper(Tracer delegate, TraceEventsService traceEventsService) {
assert delegate != null;
this.tracer = delegate;
this.traceEventsService = traceEventsService;
Expand Down

This file was deleted.

0 comments on commit aaf19a0

Please sign in to comment.