Skip to content

Commit

Permalink
Adds support for asynchronous gauge metric type (#12642) (#12692)
Browse files Browse the repository at this point in the history
* Adds support for asynchronous gauge metric type



* Adds change log



* incorporate pr comments



* fixes build errors



---------

Signed-off-by: Gagan Juneja <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Mar 15, 2024
1 parent 88f2891 commit 087ce3a
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.opensearch.telemetry.metrics;

import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;

/**
* Default implementation for {@link MetricsRegistry}
Expand Down Expand Up @@ -39,6 +43,11 @@ public Histogram createHistogram(String name, String description, String unit) {
return metricsTelemetry.createHistogram(name, description, unit);
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags);
}

@Override
public void close() throws IOException {
metricsTelemetry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package org.opensearch.telemetry.metrics;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.util.function.Supplier;

/**
* MetricsRegistry helps in creating the metric instruments.
Expand Down Expand Up @@ -47,4 +49,18 @@ public interface MetricsRegistry extends Closeable {
* @return histogram.
*/
Histogram createHistogram(String name, String description, String unit);

/**
* Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency
* to capture the value.
*
* @param name name of the observable gauge.
* @param description any description about the metric.
* @param unit unit of the metric.
* @param valueProvider value provider.
* @param tags attributes/dimensions of the metric.
* @return closeable to dispose/close the Gauge metric.
*/
Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;

/**
*No-op {@link MetricsRegistry}
Expand Down Expand Up @@ -44,6 +47,11 @@ public Histogram createHistogram(String name, String description, String unit) {
return NoopHistogram.INSTANCE;
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
return () -> {};
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

package org.opensearch.telemetry.metrics;

import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;

import java.io.Closeable;
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -59,4 +63,20 @@ public void testHistogram() {
assertSame(mockHistogram, histogram);
}

@SuppressWarnings("unchecked")
public void testGauge() {
Closeable mockCloseable = mock(Closeable.class);
when(
defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class), any(Tags.class))
).thenReturn(mockCloseable);
Closeable closeable = defaultMeterRegistry.createGauge(
"org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge",
"test observable gauge",
"ms",
() -> 1.0,
Tags.EMPTY
);
assertSame(mockCloseable, closeable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@
import org.opensearch.telemetry.IntegrationTestOTelTelemetryPlugin;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 1)
Expand Down Expand Up @@ -118,6 +124,41 @@ public void testHistogram() throws Exception {
assertEquals(1.0, histogramPointData.getMin(), 1.0);
}

public void testObservableGauge() throws Exception {
String metricName = "test-observable-gauge";
MetricsRegistry metricsRegistry = internalCluster().getInstance(MetricsRegistry.class);
InMemorySingletonMetricsExporter.INSTANCE.reset();
Tags tags = Tags.create().addTag("test", "integ-test");
final AtomicInteger testValue = new AtomicInteger(0);
Supplier<Double> valueProvider = () -> { return Double.valueOf(testValue.incrementAndGet()); };
Closeable gaugeCloseable = metricsRegistry.createGauge(metricName, "test", "ms", valueProvider, tags);
// Sleep for about 2.2s to wait for metrics to be published.
Thread.sleep(2200);

InMemorySingletonMetricsExporter exporter = InMemorySingletonMetricsExporter.INSTANCE;

assertEquals(2.0, getMaxObservableGaugeValue(exporter, metricName), 0.0);
gaugeCloseable.close();
double observableGaugeValueAfterStop = getMaxObservableGaugeValue(exporter, metricName);

// Sleep for about 1.2s to wait for metrics to see that closed observableGauge shouldn't execute the callable.
Thread.sleep(1200);
assertEquals(observableGaugeValueAfterStop, getMaxObservableGaugeValue(exporter, metricName), 0.0);

}

private static double getMaxObservableGaugeValue(InMemorySingletonMetricsExporter exporter, String metricName) {
List<MetricData> dataPoints = exporter.getFinishedMetricItems()
.stream()
.filter(a -> a.getName().contains(metricName))
.collect(Collectors.toList());
double totalValue = 0;
for (MetricData metricData : dataPoints) {
totalValue = Math.max(totalValue, ((DoublePointData) metricData.getDoubleGaugeData().getPoints().toArray()[0]).getValue());
}
return totalValue;
}

@After
public void reset() {
InMemorySingletonMetricsExporter.INSTANCE.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@
package org.opensearch.telemetry.metrics;

import org.opensearch.common.concurrent.RefCountedReleasable;
import org.opensearch.telemetry.OTelAttributesConverter;
import org.opensearch.telemetry.OTelTelemetryPlugin;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.function.Supplier;

import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.sdk.OpenTelemetrySdk;

/**
Expand Down Expand Up @@ -86,6 +90,17 @@ public Histogram createHistogram(String name, String description, String unit) {
return new OTelHistogram(doubleHistogram);
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
ObservableDoubleGauge doubleObservableGauge = AccessController.doPrivileged(
(PrivilegedAction<ObservableDoubleGauge>) () -> otelMeter.gaugeBuilder(name)
.setUnit(unit)
.setDescription(description)
.buildWithCallback(record -> record.record(valueProvider.get(), OTelAttributesConverter.convert(tags)))
);
return () -> doubleObservableGauge.close();
}

@Override
public void close() throws IOException {
meterProvider.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.test.OpenSearchTestCase;

import java.io.Closeable;
import java.util.function.Consumer;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.DoubleCounterBuilder;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.DoubleUpDownCounter;
Expand All @@ -25,8 +29,10 @@
import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -149,4 +155,29 @@ public void testHistogram() {
histogram.record(2.0, tags);
verify(mockOTelDoubleHistogram).record(2.0, OTelAttributesConverter.convert(tags));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public void testGauge() throws Exception {
String observableGaugeName = "test-gauge";
String description = "test";
String unit = "1";
Meter mockMeter = mock(Meter.class);
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
ObservableDoubleGauge observableDoubleGauge = mock(ObservableDoubleGauge.class);
DoubleGaugeBuilder mockOTelDoubleGaugeBuilder = mock(DoubleGaugeBuilder.class);
MeterProvider meterProvider = mock(MeterProvider.class);
when(meterProvider.get(OTelTelemetryPlugin.INSTRUMENTATION_SCOPE_NAME)).thenReturn(mockMeter);
MetricsTelemetry metricsTelemetry = new OTelMetricsTelemetry(
new RefCountedReleasable("telemetry", mockOpenTelemetry, () -> {}),
meterProvider
);
when(mockMeter.gaugeBuilder(Mockito.contains(observableGaugeName))).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.setDescription(description)).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.setUnit(unit)).thenReturn(mockOTelDoubleGaugeBuilder);
when(mockOTelDoubleGaugeBuilder.buildWithCallback(any(Consumer.class))).thenReturn(observableDoubleGauge);

Closeable closeable = metricsTelemetry.createGauge(observableGaugeName, description, unit, () -> 1.0, Tags.EMPTY);
closeable.close();
verify(observableDoubleGauge).close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import org.opensearch.telemetry.metrics.MetricsTelemetry;
import org.opensearch.telemetry.metrics.noop.NoopCounter;
import org.opensearch.telemetry.metrics.noop.NoopHistogram;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;

import java.io.Closeable;
import java.util.function.Supplier;

/**
* Mock {@link Telemetry} implementation for testing.
*/
Expand Down Expand Up @@ -53,6 +57,11 @@ public Histogram createHistogram(String name, String description, String unit) {
return NoopHistogram.INSTANCE;
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
return () -> {};
}

@Override
public void close() {

Expand Down

0 comments on commit 087ce3a

Please sign in to comment.