From c51818af03ab593b76d9e177753a123f0a15861e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 5 Feb 2024 16:17:22 -0500 Subject: [PATCH 1/3] Bump com.google.api:gax-httpjson from 2.39.0 to 2.42.0 in /plugins/repository-gcs (#12165) * Bump com.google.api:gax-httpjson in /plugins/repository-gcs Bumps com.google.api:gax-httpjson from 2.39.0 to 2.42.0. --- updated-dependencies: - dependency-name: com.google.api:gax-httpjson dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Updating SHAs Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] * Update missing classes Signed-off-by: Kunal Kotwani --------- Signed-off-by: dependabot[bot] Signed-off-by: Kunal Kotwani Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] Co-authored-by: Kunal Kotwani --- CHANGELOG.md | 2 +- plugins/repository-gcs/build.gradle | 7 +++++-- .../repository-gcs/licenses/gax-httpjson-2.39.0.jar.sha1 | 1 - .../repository-gcs/licenses/gax-httpjson-2.42.0.jar.sha1 | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) delete mode 100644 plugins/repository-gcs/licenses/gax-httpjson-2.39.0.jar.sha1 create mode 100644 plugins/repository-gcs/licenses/gax-httpjson-2.42.0.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e5741f91928d..5f5ac4bfb1ffa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -180,7 +180,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `lycheeverse/lychee-action` from 1.8.0 to 1.9.1 ([#11795](https://github.com/opensearch-project/OpenSearch/pull/11795), [#11887](https://github.com/opensearch-project/OpenSearch/pull/11887)) - Bump `Lucene` from 9.8.0 to 9.9.2 ([#11421](https://github.com/opensearch-project/OpenSearch/pull/11421)), ([#12063](https://github.com/opensearch-project/OpenSearch/pull/12063)) - Bump `com.networknt:json-schema-validator` from 1.0.86 to 1.2.0 ([#11886](https://github.com/opensearch-project/OpenSearch/pull/11886), [#11963](https://github.com/opensearch-project/OpenSearch/pull/11963)) -- Bump `com.google.api:gax-httpjson` from 0.103.1 to 2.39.0 ([#11794](https://github.com/opensearch-project/OpenSearch/pull/11794)) +- Bump `com.google.api:gax-httpjson` from 0.103.1 to 2.42.0 ([#11794](https://github.com/opensearch-project/OpenSearch/pull/11794), [#12165](https://github.com/opensearch-project/OpenSearch/pull/12165)) - Bump `com.google.oauth-client:google-oauth-client` from 1.34.1 to 1.35.0 ([#11960](https://github.com/opensearch-project/OpenSearch/pull/11960)) - Bump `com.diffplug.spotless` from 6.23.2 to 6.25.0 ([#11962](https://github.com/opensearch-project/OpenSearch/pull/11962), [#12055](https://github.com/opensearch-project/OpenSearch/pull/12055)) - Bump `com.google.cloud:google-cloud-core` from 2.5.10 to 2.30.0 ([#11961](https://github.com/opensearch-project/OpenSearch/pull/11961)) diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 2ce67b1f25a80..b28f97677b0df 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -54,7 +54,7 @@ versions << [ dependencies { api 'com.google.api:api-common:1.8.1' api 'com.google.api:gax:2.35.0' - api 'com.google.api:gax-httpjson:2.39.0' + api 'com.google.api:gax-httpjson:2.42.0' api 'com.google.apis:google-api-services-storage:v1-rev20230617-2.0.0' @@ -206,7 +206,10 @@ thirdPartyAudit { // commons-logging provided dependencies 'javax.jms.Message', 'javax.servlet.ServletContextEvent', - 'javax.servlet.ServletContextListener' + 'javax.servlet.ServletContextListener', + // Bump for gax 2.42.0 + 'com.google.api.gax.rpc.EndpointContext', + 'com.google.api.gax.rpc.RequestMutator' ) } diff --git a/plugins/repository-gcs/licenses/gax-httpjson-2.39.0.jar.sha1 b/plugins/repository-gcs/licenses/gax-httpjson-2.39.0.jar.sha1 deleted file mode 100644 index d0f3b8dfa6d25..0000000000000 --- a/plugins/repository-gcs/licenses/gax-httpjson-2.39.0.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -ebff086f21c54c1eb02056f94255b4f1836a8efe \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/gax-httpjson-2.42.0.jar.sha1 b/plugins/repository-gcs/licenses/gax-httpjson-2.42.0.jar.sha1 new file mode 100644 index 0000000000000..672506572ed4d --- /dev/null +++ b/plugins/repository-gcs/licenses/gax-httpjson-2.42.0.jar.sha1 @@ -0,0 +1 @@ +4db06bc31c2fb34b0490362e8666c20fdc1fb3f2 \ No newline at end of file From a4bc4af5773d0ba8bd1a9d09a03a034e7b833992 Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 6 Feb 2024 04:16:37 +0530 Subject: [PATCH 2/3] [Metric Framework] Adds support for Histogram metric (#12062) * [Metric Framework] Adds support for Histogram metric Signed-off-by: Gagan Juneja * Adds test Signed-off-by: Gagan Juneja * Addresses review comments Signed-off-by: Gagan Juneja * Adds change log Signed-off-by: Gagan Juneja * Fixed spotless Signed-off-by: Gagan Juneja * Fixes javadoc Signed-off-by: Gagan Juneja * Fixes javadoc Signed-off-by: Gagan Juneja * Fixes test Signed-off-by: Gagan Juneja * Removes explicit approach Signed-off-by: Gagan Juneja * Removes explicit approach Signed-off-by: Gagan Juneja * Addresses review comments Signed-off-by: Gagan Juneja --------- Signed-off-by: Gagan Juneja Co-authored-by: Gagan Juneja --- CHANGELOG.md | 1 + .../metrics/DefaultMetricsRegistry.java | 5 +++ .../telemetry/metrics/Histogram.java | 35 ++++++++++++++++ .../telemetry/metrics/MetricsRegistry.java | 11 +++++ .../telemetry/metrics/noop/NoopHistogram.java | 38 ++++++++++++++++++ .../metrics/noop/NoopMetricsRegistry.java | 6 +++ .../metrics/DefaultMetricsRegistryTests.java | 11 +++++ .../TelemetryMetricsDisabledSanityIT.java | 4 ++ .../TelemetryMetricsEnabledSanityIT.java | 26 ++++++++++++ .../telemetry/metrics/OTelHistogram.java | 40 +++++++++++++++++++ .../metrics/OTelMetricsTelemetry.java | 18 +++++++++ .../tracing/OTelResourceProvider.java | 9 +++++ .../metrics/OTelMetricsTelemetryTests.java | 31 ++++++++++++++ .../test/telemetry/MockTelemetry.java | 7 ++++ 14 files changed, 242 insertions(+) create mode 100644 libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Histogram.java create mode 100644 libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopHistogram.java create mode 100644 plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelHistogram.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f5ac4bfb1ffa..50a1a6ea99243 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -138,6 +138,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394)) - Remove concurrent segment search feature flag for GA launch ([#12074](https://github.com/opensearch-project/OpenSearch/pull/12074)) - Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) +- [Metrics Framework] Adds support for Histogram metric ([#12062](https://github.com/opensearch-project/OpenSearch/pull/12062)) ### Dependencies - Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822)) diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java index d57def9406b17..f38fdd6412d79 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java @@ -34,6 +34,11 @@ public Counter createUpDownCounter(String name, String description, String unit) return metricsTelemetry.createUpDownCounter(name, description, unit); } + @Override + public Histogram createHistogram(String name, String description, String unit) { + return metricsTelemetry.createHistogram(name, description, unit); + } + @Override public void close() throws IOException { metricsTelemetry.close(); diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Histogram.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Histogram.java new file mode 100644 index 0000000000000..95ada626e21ee --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/Histogram.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * Histogram records the value for an existing metric. + * {@opensearch.experimental} + */ +@ExperimentalApi +public interface Histogram { + + /** + * record value. + * @param value value to be added. + */ + void record(double value); + + /** + * record value along with the attributes. + * + * @param value value to be added. + * @param tags attributes/dimensions of the metric. + */ + void record(double value, Tags tags); + +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java index 61b3df089928b..94d19bda31f34 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java @@ -36,4 +36,15 @@ public interface MetricsRegistry extends Closeable { * @return counter. */ Counter createUpDownCounter(String name, String description, String unit); + + /** + * Creates the histogram type of Metric. Implementation framework will take care + * of the bucketing strategy. + * + * @param name name of the histogram. + * @param description any description about the metric. + * @param unit unit of the metric. + * @return histogram. + */ + Histogram createHistogram(String name, String description, String unit); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopHistogram.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopHistogram.java new file mode 100644 index 0000000000000..20e72bccad899 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopHistogram.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics.noop; + +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * No-op {@link Histogram} + * {@opensearch.internal} + */ +@InternalApi +public class NoopHistogram implements Histogram { + + /** + * No-op Histogram instance + */ + public final static NoopHistogram INSTANCE = new NoopHistogram(); + + private NoopHistogram() {} + + @Override + public void record(double value) { + + } + + @Override + public void record(double value, Tags tags) { + + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java index 640c6842a8960..d3dda68cfae71 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.InternalApi; import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; import java.io.IOException; @@ -38,6 +39,11 @@ public Counter createUpDownCounter(String name, String description, String unit) return NoopCounter.INSTANCE; } + @Override + public Histogram createHistogram(String name, String description, String unit) { + return NoopHistogram.INSTANCE; + } + @Override public void close() throws IOException { diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java index 6171641db5f07..02f126075845b 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java @@ -48,4 +48,15 @@ public void testUpDownCounter() { assertSame(mockCounter, counter); } + public void testHistogram() { + Histogram mockHistogram = mock(Histogram.class); + when(defaultMeterRegistry.createHistogram(any(String.class), any(String.class), any(String.class))).thenReturn(mockHistogram); + Histogram histogram = defaultMeterRegistry.createHistogram( + "org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testHistogram", + "test up-down counter", + "ms" + ); + assertSame(mockHistogram, histogram); + } + } diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsDisabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsDisabledSanityIT.java index bcdcb657c4f42..e77e69d121036 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsDisabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsDisabledSanityIT.java @@ -15,6 +15,7 @@ import org.opensearch.telemetry.OTelTelemetrySettings; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.telemetry.metrics.noop.NoopCounter; +import org.opensearch.telemetry.metrics.noop.NoopHistogram; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchIntegTestCase; @@ -53,10 +54,13 @@ public void testSanityChecksWhenMetricsDisabled() throws Exception { Counter counter = metricsRegistry.createCounter("test-counter", "test", "1"); counter.add(1.0); + Histogram histogram = metricsRegistry.createHistogram("test-histogram", "test", "1"); + Thread.sleep(2000); assertTrue(metricsRegistry instanceof NoopMetricsRegistry); assertTrue(counter instanceof NoopCounter); + assertTrue(histogram instanceof NoopHistogram); } } diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java index ed341595d327d..1b8f694709a9c 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 1) public class TelemetryMetricsEnabledSanityIT extends OpenSearchIntegTestCase { @@ -92,6 +93,31 @@ public void testUpDownCounter() throws Exception { assertEquals(-1.0, value, 0.0); } + public void testHistogram() throws Exception { + MetricsRegistry metricsRegistry = internalCluster().getInstance(MetricsRegistry.class); + InMemorySingletonMetricsExporter.INSTANCE.reset(); + + Histogram histogram = metricsRegistry.createHistogram("test-histogram", "test", "ms"); + histogram.record(2.0); + histogram.record(1.0); + histogram.record(3.0); + // Sleep for about 2s to wait for metrics to be published. + Thread.sleep(2000); + + InMemorySingletonMetricsExporter exporter = InMemorySingletonMetricsExporter.INSTANCE; + ImmutableExponentialHistogramPointData histogramPointData = ((ImmutableExponentialHistogramPointData) ((ArrayList) exporter + .getFinishedMetricItems() + .stream() + .filter(a -> a.getName().contains("test-histogram")) + .collect(Collectors.toList()) + .get(0) + .getExponentialHistogramData() + .getPoints()).get(0)); + assertEquals(1.0, histogramPointData.getSum(), 6.0); + assertEquals(1.0, histogramPointData.getMax(), 3.0); + assertEquals(1.0, histogramPointData.getMin(), 1.0); + } + @After public void reset() { InMemorySingletonMetricsExporter.INSTANCE.reset(); diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelHistogram.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelHistogram.java new file mode 100644 index 0000000000000..73bb0d8adff62 --- /dev/null +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelHistogram.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +import org.opensearch.telemetry.OTelAttributesConverter; +import org.opensearch.telemetry.metrics.tags.Tags; + +import io.opentelemetry.api.metrics.DoubleHistogram; + +/** + * OTel aware implementation {@link Histogram} + */ +class OTelHistogram implements Histogram { + + private final DoubleHistogram otelDoubleHistogram; + + /** + * Constructor + * @param otelDoubleCounter delegate counter. + */ + public OTelHistogram(DoubleHistogram otelDoubleCounter) { + this.otelDoubleHistogram = otelDoubleCounter; + } + + @Override + public void record(double value) { + otelDoubleHistogram.record(value); + } + + @Override + public void record(double value, Tags tags) { + otelDoubleHistogram.record(value, OTelAttributesConverter.convert(tags)); + } +} diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java index af235b9604669..82ae2cdd198b2 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java @@ -17,6 +17,7 @@ import java.security.PrivilegedAction; import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleUpDownCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; @@ -68,6 +69,23 @@ public Counter createUpDownCounter(String name, String description, String unit) return new OTelUpDownCounter(doubleUpDownCounter); } + /** + * Creates the Otel Histogram. In {@link org.opensearch.telemetry.tracing.OTelResourceProvider} + * we can configure the bucketing/aggregation strategy through view. Default startegy configured + * is the {@link io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation}. + * @param name name of the histogram. + * @param description any description about the metric. + * @param unit unit of the metric. + * @return histogram + */ + @Override + public Histogram createHistogram(String name, String description, String unit) { + DoubleHistogram doubleHistogram = AccessController.doPrivileged( + (PrivilegedAction) () -> otelMeter.histogramBuilder(name).setUnit(unit).setDescription(description).build() + ); + return new OTelHistogram(doubleHistogram); + } + @Override public void close() throws IOException { meterProvider.close(); diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java index b1a45f2e7c2d2..39b97b7d6441a 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java @@ -23,8 +23,12 @@ import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.View; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.metrics.internal.view.Base2ExponentialHistogramAggregation; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; @@ -40,6 +44,7 @@ * This class encapsulates all OpenTelemetry related resources */ public final class OTelResourceProvider { + private OTelResourceProvider() {} /** @@ -92,6 +97,10 @@ private static SdkMeterProvider createSdkMetricProvider(Settings settings, Resou .setInterval(TelemetrySettings.METRICS_PUBLISH_INTERVAL_SETTING.get(settings).getSeconds(), TimeUnit.SECONDS) .build() ) + .registerView( + InstrumentSelector.builder().setType(InstrumentType.HISTOGRAM).build(), + View.builder().setAggregation(Base2ExponentialHistogramAggregation.getDefault()).build() + ) .build(); } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java index 9de575b69774a..4b39e3d0d607d 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java @@ -17,12 +17,15 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.metrics.DoubleCounter; import io.opentelemetry.api.metrics.DoubleCounterBuilder; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; import io.opentelemetry.api.metrics.DoubleUpDownCounter; import io.opentelemetry.api.metrics.DoubleUpDownCounterBuilder; import io.opentelemetry.api.metrics.LongCounterBuilder; import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; +import org.mockito.Mockito; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -118,4 +121,32 @@ public void testUpDownCounter() { counter.add(-2.0, tags); verify(mockOTelUpDownDoubleCounter).add((-2.0), OTelAttributesConverter.convert(tags)); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testHistogram() { + String histogramName = "test-histogram"; + String description = "test"; + String unit = "1"; + Meter mockMeter = mock(Meter.class); + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + DoubleHistogram mockOTelDoubleHistogram = mock(DoubleHistogram.class); + DoubleHistogramBuilder mockOTelDoubleHistogramBuilder = mock(DoubleHistogramBuilder.class); + MeterProvider meterProvider = mock(MeterProvider.class); + when(meterProvider.get(OTelTelemetryPlugin.INSTRUMENTATION_SCOPE_NAME)).thenReturn(mockMeter); + MetricsTelemetry metricsTelemetry = new OTelMetricsTelemetry( + new RefCountedReleasable("telemetry", mockOpenTelemetry, () -> {}), + meterProvider + ); + when(mockMeter.histogramBuilder(Mockito.contains(histogramName))).thenReturn(mockOTelDoubleHistogramBuilder); + when(mockOTelDoubleHistogramBuilder.setDescription(description)).thenReturn(mockOTelDoubleHistogramBuilder); + when(mockOTelDoubleHistogramBuilder.setUnit(unit)).thenReturn(mockOTelDoubleHistogramBuilder); + when(mockOTelDoubleHistogramBuilder.build()).thenReturn(mockOTelDoubleHistogram); + + Histogram histogram = metricsTelemetry.createHistogram(histogramName, description, unit); + histogram.record(1.0); + verify(mockOTelDoubleHistogram).record(1.0); + Tags tags = Tags.create().addTag("test", "test"); + histogram.record(2.0, tags); + verify(mockOTelDoubleHistogram).record(2.0, OTelAttributesConverter.convert(tags)); + } } diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java index dda413ce2818e..44daf1b1554e0 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java @@ -11,8 +11,10 @@ import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsTelemetry; import org.opensearch.telemetry.metrics.noop.NoopCounter; +import org.opensearch.telemetry.metrics.noop.NoopHistogram; import org.opensearch.telemetry.tracing.TracingTelemetry; import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; @@ -46,6 +48,11 @@ public Counter createUpDownCounter(String name, String description, String unit) return NoopCounter.INSTANCE; } + @Override + public Histogram createHistogram(String name, String description, String unit) { + return NoopHistogram.INSTANCE; + } + @Override public void close() { From 3cbf54e7358fd707a0d73e6c016b04ce78884d30 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 5 Feb 2024 18:10:06 -0800 Subject: [PATCH 3/3] Query insights plugin implementation (#11903) * Query insights plugin implementation Signed-off-by: Chenyang Ji * Increase JavaDoc coverage and update PR based comments Signed-off-by: Chenyang Ji * Refactor record and service to make them generic Signed-off-by: Chenyang Ji * refactor service for improving multithreading efficiency Signed-off-by: Chenyang Ji --------- Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + plugins/query-insights/build.gradle | 18 ++ .../plugin/insights/QueryInsightsPlugin.java | 112 +++++++ .../core/service/QueryInsightsService.java | 180 +++++++++++ .../core/service/TopQueriesService.java | 282 ++++++++++++++++++ .../insights/core/service/package-info.java | 12 + .../plugin/insights/package-info.java | 12 + .../insights/rules/model/Attribute.java | 74 +++++ .../insights/rules/model/MetricType.java | 121 ++++++++ .../rules/model/SearchQueryRecord.java | 176 +++++++++++ .../insights/rules/model/package-info.java | 12 + .../settings/QueryInsightsSettings.java | 116 +++++++ .../insights/settings/package-info.java | 12 + .../insights/QueryInsightsPluginTests.java | 91 ++++++ .../insights/QueryInsightsTestUtils.java | 155 ++++++++++ .../service/QueryInsightsServiceTests.java | 49 +++ .../core/service/TopQueriesServiceTests.java | 102 +++++++ .../rules/model/SearchQueryRecordTests.java | 71 +++++ .../action/search/SearchRequest.java | 2 +- .../action/search/SearchRequestContext.java | 6 +- .../SearchRequestOperationsListener.java | 22 +- .../action/search/SearchRequestSlowLog.java | 10 +- .../action/search/SearchRequestStats.java | 6 +- ...erationsCompositeListenerFactoryTests.java | 6 +- 24 files changed, 1622 insertions(+), 26 deletions(-) create mode 100644 plugins/query-insights/build.gradle create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java create mode 100644 plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 50a1a6ea99243..5afdf6b707d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -223,6 +223,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000)) - Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968)) - Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508)) +- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903)) ### Deprecated diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle new file mode 100644 index 0000000000000..eabbd395bd3bd --- /dev/null +++ b/plugins/query-insights/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Query Insights Plugin.' + classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' +} + +dependencies { +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java new file mode 100644 index 0000000000000..0acd7e2dcac0d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for Query Insights. + */ +public class QueryInsightsPlugin extends Plugin implements ActionPlugin { + /** + * Default constructor + */ + public QueryInsightsPlugin() {} + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier repositoriesServiceSupplier + ) { + // create top n queries service + final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + return List.of(queryInsightsService); + } + + @Override + public List> getExecutorBuilders(final Settings settings) { + return List.of( + new ScalingExecutorBuilder( + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, + 1, + Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT), + TimeValue.timeValueMinutes(5) + ) + ); + } + + @Override + public List getRestHandlers( + final Settings settings, + final RestController restController, + final ClusterSettings clusterSettings, + final IndexScopedSettings indexScopedSettings, + final SettingsFilter settingsFilter, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier nodesInCluster + ) { + return List.of(); + } + + @Override + public List> getActions() { + return List.of(); + } + + @Override + public List> getSettings() { + return List.of( + // Settings for top N queries + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java new file mode 100644 index 0000000000000..525ca0d4a3d33 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Service responsible for gathering, analyzing, storing and exporting + * information related to search queries + * + * @opensearch.internal + */ +public class QueryInsightsService extends AbstractLifecycleComponent { + /** + * The internal OpenSearch thread pool that execute async processing and exporting tasks + */ + private final ThreadPool threadPool; + + /** + * Services to capture top n queries for different metric types + */ + private final Map topQueriesServices; + + /** + * Flags for enabling insight data collection for different metric types + */ + private final Map enableCollect; + + /** + * The internal thread-safe queue to ingest the search query data and subsequently forward to processors + */ + private final LinkedBlockingQueue queryRecordsQueue; + + /** + * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * the service closed concurrently. + */ + protected volatile Scheduler.Cancellable scheduledFuture; + + /** + * Constructor of the QueryInsightsService + * + * @param threadPool The OpenSearch thread pool to run async tasks + */ + @Inject + public QueryInsightsService(final ThreadPool threadPool) { + enableCollect = new HashMap<>(); + queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + topQueriesServices = new HashMap<>(); + for (MetricType metricType : MetricType.allMetricTypes()) { + enableCollect.put(metricType, false); + topQueriesServices.put(metricType, new TopQueriesService(metricType)); + } + this.threadPool = threadPool; + } + + /** + * Ingest the query data into in-memory stores + * + * @param record the record to ingest + */ + public boolean addRecord(final SearchQueryRecord record) { + boolean shouldAdd = false; + for (Map.Entry entry : topQueriesServices.entrySet()) { + if (!enableCollect.get(entry.getKey())) { + continue; + } + List currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot(); + // skip add to top N queries store if the incoming record is smaller than the Nth record + if (currentSnapshot.size() < entry.getValue().getTopNSize() + || SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) { + shouldAdd = true; + break; + } + } + if (shouldAdd) { + return queryRecordsQueue.offer(record); + } + return false; + } + + /** + * Drain the queryRecordsQueue into internal stores and services + */ + public void drainRecords() { + final List records = new ArrayList<>(); + queryRecordsQueue.drainTo(records); + records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + for (MetricType metricType : MetricType.allMetricTypes()) { + if (enableCollect.get(metricType)) { + // ingest the records into topQueriesService + topQueriesServices.get(metricType).consumeRecords(records); + } + } + } + + /** + * Get the top queries service based on metricType + * @param metricType {@link MetricType} + * @return {@link TopQueriesService} + */ + public TopQueriesService getTopQueriesService(final MetricType metricType) { + return topQueriesServices.get(metricType); + } + + /** + * Set flag to enable or disable Query Insights data collection + * + * @param metricType {@link MetricType} + * @param enable Flag to enable or disable Query Insights data collection + */ + public void enableCollection(final MetricType metricType, final boolean enable) { + this.enableCollect.put(metricType, enable); + this.topQueriesServices.get(metricType).setEnabled(enable); + } + + /** + * Get if the Query Insights data collection is enabled for a MetricType + * + * @param metricType {@link MetricType} + * @return if the Query Insights data collection is enabled + */ + public boolean isCollectionEnabled(final MetricType metricType) { + return this.enableCollect.get(metricType); + } + + /** + * Check if query insights service is enabled + * + * @return if query insights service is enabled + */ + public boolean isEnabled() { + for (MetricType t : MetricType.allMetricTypes()) { + if (isCollectionEnabled(t)) { + return true; + } + } + return false; + } + + @Override + protected void doStart() { + if (isEnabled()) { + scheduledFuture = threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ); + } + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java new file mode 100644 index 0000000000000..d2c30cbdf98e7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Service responsible for gathering and storing top N queries + * with high latency or resource usage + * + * @opensearch.internal + */ +public class TopQueriesService { + private boolean enabled; + /** + * The metric type to measure top n queries + */ + private final MetricType metricType; + private int topNSize; + /** + * The window size to keep the top n queries + */ + private TimeValue windowSize; + /** + * The current window start timestamp + */ + private long windowStart; + /** + * The internal thread-safe store that holds the top n queries insight data + */ + private final PriorityQueue topQueriesStore; + + /** + * The AtomicReference of a snapshot of the current window top queries for getters to consume + */ + private final AtomicReference> topQueriesCurrentSnapshot; + + /** + * The AtomicReference of a snapshot of the last window top queries for getters to consume + */ + private final AtomicReference> topQueriesHistorySnapshot; + + TopQueriesService(final MetricType metricType) { + this.enabled = false; + this.metricType = metricType; + this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; + this.windowStart = -1L; + topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); + topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); + } + + /** + * Set the top N size for TopQueriesService service. + * + * @param topNSize the top N size to set + */ + public void setTopNSize(final int topNSize) { + this.topNSize = topNSize; + } + + /** + * Get the current configured top n size + * + * @return top n size + */ + public int getTopNSize() { + return topNSize; + } + + /** + * Validate the top N size based on the internal constrains + * + * @param size the wanted top N size + */ + public void validateTopNSize(final int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting for [" + + metricType + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } + } + + /** + * Set enable flag for the service + * @param enabled boolean + */ + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + + /** + * Set the window size for top N queries service + * + * @param windowSize window size to set + */ + public void setWindowSize(final TimeValue windowSize) { + this.windowSize = windowSize; + // reset the window start time since the window size has changed + this.windowStart = -1L; + } + + /** + * Validate if the window size is valid, based on internal constrains. + * + * @param windowSize the window size to validate + */ + public void validateWindowSize(final TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0 + || windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be between [" + + QueryInsightsSettings.MIN_WINDOW_SIZE + + "," + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "]" + + "was (" + + windowSize + + ")" + ); + } + if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be multiple of 1 hour, or one of " + + QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES + + ", was (" + + windowSize + + ")" + ); + } + } + + /** + * Get all top queries records that are in the current top n queries store + * Optionally include top N records from the last window. + * + * By default, return the records in sorted order. + * + * @param includeLastWindow if the top N queries from the last window should be included + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) + ); + } + // read from window snapshots + final List queries = new ArrayList<>(topQueriesCurrentSnapshot.get()); + if (includeLastWindow) { + queries.addAll(topQueriesHistorySnapshot.get()); + } + return Stream.of(queries) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .collect(Collectors.toList()); + } + + /** + * Consume records to top queries stores + * + * @param records a list of {@link SearchQueryRecord} + */ + void consumeRecords(final List records) { + final long currentWindowStart = calculateWindowStart(System.currentTimeMillis()); + List recordsInLastWindow = new ArrayList<>(); + List recordsInThisWindow = new ArrayList<>(); + for (SearchQueryRecord record : records) { + // skip the records that does not have the corresponding measurement + if (!record.getMeasurements().containsKey(metricType)) { + continue; + } + if (record.getTimestamp() < currentWindowStart) { + recordsInLastWindow.add(record); + } else { + recordsInThisWindow.add(record); + } + } + // add records in last window, if there are any, to the top n store + addToTopNStore(recordsInLastWindow); + // rotate window and reset window start if necessary + rotateWindowIfNecessary(currentWindowStart); + // add records in current window, if there are any, to the top n store + addToTopNStore(recordsInThisWindow); + // update the current window snapshot for getters to consume + final List newSnapShot = new ArrayList<>(topQueriesStore); + newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot.set(newSnapShot); + } + + private void addToTopNStore(final List records) { + topQueriesStore.addAll(records); + // remove top elements for fix sizing priority queue + while (topQueriesStore.size() > topNSize) { + topQueriesStore.poll(); + } + } + + /** + * Reset the current window and rotate the data to history snapshot for top n queries, + * This function would be invoked zero time or only once in each consumeRecords call + * + * @param newWindowStart the new windowStart to set to + */ + private void rotateWindowIfNecessary(final long newWindowStart) { + // reset window if the current window is outdated + if (windowStart < newWindowStart) { + final List history = new ArrayList<>(); + // rotate the current window to history store only if the data belongs to the last window + if (windowStart == newWindowStart - windowSize.getMillis()) { + history.addAll(topQueriesStore); + } + topQueriesHistorySnapshot.set(history); + topQueriesStore.clear(); + topQueriesCurrentSnapshot.set(new ArrayList<>()); + windowStart = newWindowStart; + } + } + + /** + * Calculate the window start for the given timestamp + * + * @param timestamp the given timestamp to calculate window start + */ + private long calculateWindowStart(final long timestamp) { + final LocalDateTime currentTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")); + LocalDateTime windowStartTime = currentTime.truncatedTo(ChronoUnit.HOURS); + while (!windowStartTime.plusMinutes(windowSize.getMinutes()).isAfter(currentTime)) { + windowStartTime = windowStartTime.plusMinutes(windowSize.getMinutes()); + } + return windowStartTime.toInstant(ZoneOffset.UTC).getEpochSecond() * 1000; + } + + /** + * Get the current top queries snapshot from the AtomicReference. + * + * @return a list of {@link SearchQueryRecord} + */ + public List getTopQueriesCurrentSnapshot() { + return topQueriesCurrentSnapshot.get(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java new file mode 100644 index 0000000000000..5068f28234f6d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Classes for Query Insights + */ +package org.opensearch.plugin.insights.core.service; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java new file mode 100644 index 0000000000000..04d1f9bfff7e1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of Query Insights + */ +package org.opensearch.plugin.insights; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java new file mode 100644 index 0000000000000..c1d17edf9ff14 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Locale; + +/** + * Valid attributes for a search query record + * + * @opensearch.internal + */ +public enum Attribute { + /** + * The search query type + */ + SEARCH_TYPE, + /** + * The search query source + */ + SOURCE, + /** + * Total shards queried + */ + TOTAL_SHARDS, + /** + * The indices involved + */ + INDICES, + /** + * The per phase level latency map for a search query + */ + PHASE_LATENCY_MAP, + /** + * The node id for this request + */ + NODE_ID; + + /** + * Read an Attribute from a StreamInput + * + * @param in the StreamInput to read from + * @return Attribute + * @throws IOException IOException + */ + static Attribute readFromStream(final StreamInput in) throws IOException { + return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); + } + + /** + * Write Attribute to a StreamOutput + * + * @param out the StreamOutput to write + * @param attribute the Attribute to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { + out.writeString(attribute.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java new file mode 100644 index 0000000000000..cdd090fbf4804 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Valid metric types for a search query record + * + * @opensearch.internal + */ +public enum MetricType implements Comparator { + /** + * Latency metric type + */ + LATENCY, + /** + * CPU usage metric type + */ + CPU, + /** + * JVM heap usage metric type + */ + JVM; + + /** + * Read a MetricType from a StreamInput + * + * @param in the StreamInput to read from + * @return MetricType + * @throws IOException IOException + */ + public static MetricType readFromStream(final StreamInput in) throws IOException { + return fromString(in.readString()); + } + + /** + * Create MetricType from String + * + * @param metricType the String representation of MetricType + * @return MetricType + */ + public static MetricType fromString(final String metricType) { + return MetricType.valueOf(metricType.toUpperCase(Locale.ROOT)); + } + + /** + * Write MetricType to a StreamOutput + * + * @param out the StreamOutput to write + * @param metricType the MetricType to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { + out.writeString(metricType.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } + + /** + * Get all valid metrics + * + * @return A set of String that contains all valid metrics + */ + public static Set allMetricTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Compare two numbers based on the metric type + * + * @param a the first Number to be compared. + * @param b the second Number to be compared. + * @return a negative integer, zero, or a positive integer as the first argument is less than, equal to, or greater than the second + */ + public int compare(final Number a, final Number b) { + switch (this) { + case LATENCY: + return Long.compare(a.longValue(), b.longValue()); + case JVM: + case CPU: + return Double.compare(a.doubleValue(), b.doubleValue()); + } + return -1; + } + + /** + * Parse a value with the correct type based on MetricType + * + * @param o the generic object to parse + * @return {@link Number} + */ + Number parseValue(final Object o) { + switch (this) { + case LATENCY: + return (Long) o; + case JVM: + case CPU: + return (Double) o; + default: + return (Number) o; + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java new file mode 100644 index 0000000000000..060711edb5580 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, + * which contains extensive information related to a search query. + * + * @opensearch.internal + */ +public class SearchQueryRecord implements ToXContentObject, Writeable { + private final long timestamp; + private final Map measurements; + private final Map attributes; + + /** + * Constructor of SearchQueryRecord + * + * @param in the StreamInput to read the SearchQueryRecord from + * @throws IOException IOException + * @throws ClassCastException ClassCastException + */ + public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { + this.timestamp = in.readLong(); + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) + .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); + } + + /** + * Constructor of SearchQueryRecord + * + * @param timestamp The timestamp of the query. + * @param measurements A list of Measurement associated with this query + * @param attributes A list of Attributes associated with this query + */ + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + if (measurements == null) { + throw new IllegalArgumentException("Measurements cannot be null"); + } + this.measurements = measurements; + this.attributes = attributes; + this.timestamp = timestamp; + } + + /** + * Returns the observation time of the metric. + * + * @return the observation time in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the measurement associated with the specified name. + * + * @param name the name of the measurement + * @return the measurement object, or null if not found + */ + public Number getMeasurement(final MetricType name) { + return measurements.get(name); + } + + /** + * Returns a map of all the measurements associated with the metric. + * + * @return a map of measurement names to measurement objects + */ + public Map getMeasurements() { + return measurements; + } + + /** + * Returns a map of the attributes associated with the metric. + * + * @return a map of attribute keys to attribute values + */ + public Map getAttributes() { + return attributes; + } + + /** + * Add an attribute to this record + * + * @param attribute attribute to add + * @param value the value associated with the attribute + */ + public void addAttribute(final Attribute attribute, final Object value) { + attributes.put(attribute, value); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("timestamp", timestamp); + for (Map.Entry entry : attributes.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + return builder.endObject(); + } + + /** + * Write a SearchQueryRecord to a StreamOutput + * + * @param out the StreamOutput to write + * @throws IOException IOException + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); + } + + /** + * Compare two SearchQueryRecord, based on the given MetricType + * + * @param a the first SearchQueryRecord to compare + * @param b the second SearchQueryRecord to compare + * @param metricType the MetricType to compare on + * @return 0 if the first SearchQueryRecord is numerically equal to the second SearchQueryRecord; + * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; + * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. + */ + public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { + return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); + } + + /** + * Check if a SearchQueryRecord is deep equal to another record + * + * @param o the other SearchQueryRecord record + * @return true if two records are deep equal, false otherwise. + */ + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SearchQueryRecord)) { + return false; + } + final SearchQueryRecord other = (SearchQueryRecord) o; + return timestamp == other.getTimestamp() + && measurements.equals(other.getMeasurements()) + && attributes.size() == other.getAttributes().size(); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, measurements, attributes); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java new file mode 100644 index 0000000000000..c59ec1550f54b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Data Models for Query Insight Records + */ +package org.opensearch.plugin.insights.rules.model; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java new file mode 100644 index 0000000000000..52cc1fbde790f --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.settings; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Settings for Query Insights Plugin + * + * @opensearch.api + * @opensearch.experimental + */ +public class QueryInsightsSettings { + /** + * Executors settings + */ + public static final String QUERY_INSIGHTS_EXECUTOR = "query_insights_executor"; + /** + * Max number of thread + */ + public static final int MAX_THREAD_COUNT = 5; + /** + * Max number of requests for the consumer to collect at one time + */ + public static final int QUERY_RECORD_QUEUE_CAPACITY = 1000; + /** + * Time interval for record queue consumer to run + */ + public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(5, TimeUnit.SECONDS); + /** + * Default Values and Settings + */ + public static final TimeValue MAX_WINDOW_SIZE = new TimeValue(1, TimeUnit.DAYS); + /** + * Minimal window size + */ + public static final TimeValue MIN_WINDOW_SIZE = new TimeValue(1, TimeUnit.MINUTES); + /** + * Valid window sizes + */ + public static final Set VALID_WINDOW_SIZES_IN_MINUTES = new HashSet<>( + Arrays.asList( + new TimeValue(1, TimeUnit.MINUTES), + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(10, TimeUnit.MINUTES), + new TimeValue(30, TimeUnit.MINUTES) + ) + ); + + /** Default N size for top N queries */ + public static final int MAX_N_SIZE = 100; + /** Default window size in seconds to keep the top N queries with latency data in query insight store */ + public static final TimeValue DEFAULT_WINDOW_SIZE = new TimeValue(60, TimeUnit.SECONDS); + /** Default top N size to keep the data in query insight store */ + public static final int DEFAULT_TOP_N_SIZE = 3; + /** + * Query Insights base uri + */ + public static final String PLUGINS_BASE_URI = "/_insights"; + + /** + * Settings for Top Queries + * + */ + public static final String TOP_QUERIES_BASE_URI = PLUGINS_BASE_URI + "/top_queries"; + /** Default prefix for top N queries feature */ + public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.insights.top_queries"; + /** Default prefix for top N queries by latency feature */ + public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency"; + /** + * Boolean setting for enabling top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Int setting to define the top n size for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".top_n_size", + DEFAULT_TOP_N_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Time setting to define the window size in seconds for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", + DEFAULT_WINDOW_SIZE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default constructor + */ + public QueryInsightsSettings() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java new file mode 100644 index 0000000000000..f3152bbf966cb --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Settings for Query Insights Plugin + */ +package org.opensearch.plugin.insights.settings; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java new file mode 100644 index 0000000000000..f6d95450855de --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.rest.RestHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class QueryInsightsPluginTests extends OpenSearchTestCase { + + private QueryInsightsPlugin queryInsightsPlugin; + + private final Client client = mock(Client.class); + private ClusterService clusterService; + private final ThreadPool threadPool = mock(ThreadPool.class); + + @Before + public void setup() { + queryInsightsPlugin = new QueryInsightsPlugin(); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + clusterService = new ClusterService(settings, clusterSettings, threadPool); + + } + + public void testGetSettings() { + assertEquals( + Arrays.asList( + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ), + queryInsightsPlugin.getSettings() + ); + } + + public void testCreateComponent() { + List components = (List) queryInsightsPlugin.createComponents( + client, + clusterService, + threadPool, + null, + null, + null, + null, + null, + null, + null, + null + ); + assertEquals(1, components.size()); + assertTrue(components.get(0) instanceof QueryInsightsService); + } + + public void testGetRestHandlers() { + List components = queryInsightsPlugin.getRestHandlers(Settings.EMPTY, null, null, null, null, null, null); + assertEquals(0, components.size()); + } + + public void testGetActions() { + List> components = queryInsightsPlugin.getActions(); + assertEquals(0, components.size()); + } + +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..04f49d5fbc7dd --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.common.util.Maps; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomDouble; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + long timestamp = startTimeStamp; + for (int i = 0; i < countOfRecords; ++i) { + Map measurements = Map.of( + MetricType.LATENCY, + randomLongBetween(1000, 10000), + MetricType.CPU, + randomDouble(), + MetricType.JVM, + randomDouble() + ); + + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, "{\"size\":20}"); + attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); + attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + + records.add(new SearchQueryRecord(timestamp, measurements, attributes)); + timestamp += interval; + } + return records; + } + + public static SearchQueryRecord createFixedSearchQueryRecord() { + long timestamp = 1706574180000L; + Map measurements = Map.of(MetricType.LATENCY, 1L); + + Map phaseLatencyMap = new HashMap<>(); + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + + return new SearchQueryRecord(timestamp, measurements, attributes); + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + @SuppressWarnings("unchecked") + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + Map attributes1 = records1.get(i).getAttributes(); + Map attributes2 = records2.get(i).getAttributes(); + for (Map.Entry entry : attributes1.entrySet()) { + Attribute attribute = entry.getKey(); + Object value = entry.getValue(); + if (!attributes2.containsKey(attribute)) { + return false; + } + if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { + return false; + } else if (value instanceof Map + && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { + return false; + } + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder( + List records1, + List records2, + MetricType metricType + ) { + Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java new file mode 100644 index 0000000000000..c29b48b9690d1 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import static org.mockito.Mockito.mock; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class QueryInsightsServiceTests extends OpenSearchTestCase { + private final ThreadPool threadPool = mock(ThreadPool.class); + private QueryInsightsService queryInsightsService; + + @Before + public void setup() { + queryInsightsService = new QueryInsightsService(threadPool); + queryInsightsService.enableCollection(MetricType.LATENCY, true); + queryInsightsService.enableCollection(MetricType.CPU, true); + queryInsightsService.enableCollection(MetricType.JVM, true); + } + + public void testAddRecordToLimitAndDrain() { + SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1, 1, System.currentTimeMillis(), 0).get(0); + for (int i = 0; i < QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY; i++) { + assertTrue(queryInsightsService.addRecord(record)); + } + // exceed capacity + assertFalse(queryInsightsService.addRecord(record)); + queryInsightsService.drainRecords(); + assertEquals( + QueryInsightsSettings.DEFAULT_TOP_N_SIZE, + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + ); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java new file mode 100644 index 0000000000000..060df84a89485 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class TopQueriesServiceTests extends OpenSearchTestCase { + private TopQueriesService topQueriesService; + + @Before + public void setup() { + topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService.setTopNSize(Integer.MAX_VALUE); + topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + topQueriesService.setEnabled(true); + } + + public void testIngestQueryDataWithLargeWindow() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.consumeRecords(records); + assertTrue( + QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( + topQueriesService.getTopQueriesRecords(false), + records, + MetricType.LATENCY + ) + ); + } + + public void testRollingWindows() { + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + } + + public void testSmallNSize() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.setTopNSize(1); + topQueriesService.consumeRecords(records); + assertEquals(1, topQueriesService.getTopQueriesRecords(false).size()); + } + + public void testValidateTopNSize() { + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); }); + } + + public void testGetTopQueriesWhenNotEnabled() { + topQueriesService.setEnabled(false); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + } + + public void testValidateWindowSize() { + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MAX_WINDOW_SIZE.getSeconds() + 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); }); + } + + private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime + && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java new file mode 100644 index 0000000000000..793d5878e2300 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Granular tests for the {@link SearchQueryRecord} class. + */ +public class SearchQueryRecordTests extends OpenSearchTestCase { + + /** + * Check that if the serialization, deserialization and equals functions are working as expected + */ + public void testSerializationAndEquals() throws Exception { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + List copiedRecords = new ArrayList<>(); + for (SearchQueryRecord record : records) { + copiedRecords.add(roundTripRecord(record)); + } + assertTrue(QueryInsightsTestUtils.checkRecordsEquals(records, copiedRecords)); + + } + + public void testAllMetricTypes() { + Set allMetrics = MetricType.allMetricTypes(); + Set expected = new HashSet<>(Arrays.asList(MetricType.LATENCY, MetricType.CPU, MetricType.JVM)); + assertEquals(expected, allMetrics); + } + + public void testCompare() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(0, SearchQueryRecord.compare(record1, record2, MetricType.LATENCY)); + } + + public void testEqual() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(record1, record2); + } + + /** + * Serialize and deserialize a SearchQueryRecord. + * @param record A SearchQueryRecord to serialize. + * @return The deserialized, "round-tripped" record. + */ + private static SearchQueryRecord roundTripRecord(SearchQueryRecord record) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + record.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new SearchQueryRecord(in); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 96cea17ff4972..f738c182c06da 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -359,7 +359,7 @@ boolean isFinalReduce() { * request. When created through {@link #subSearchRequest(SearchRequest, String[], String, long, boolean)}, this method returns * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ - long getOrCreateAbsoluteStartMillis() { + public long getOrCreateAbsoluteStartMillis() { return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index eceac7204b196..383d9b5e82fe2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -22,7 +22,7 @@ * @opensearch.internal */ @InternalApi -class SearchRequestContext { +public class SearchRequestContext { private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; @@ -47,7 +47,7 @@ void updatePhaseTookMap(String phaseName, Long tookTime) { this.phaseTookMap.put(phaseName, tookTime); } - Map phaseTookMap() { + public Map phaseTookMap() { return phaseTookMap; } @@ -70,7 +70,7 @@ void setAbsoluteStartNanos(long absoluteStartNanos) { /** * Request start time in nanos */ - long getAbsoluteStartNanos() { + public long getAbsoluteStartNanos() { return absoluteStartNanos; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2a09cc084f79f..2acb35af667f0 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -31,21 +31,21 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - abstract void onPhaseStart(SearchPhaseContext context); + protected abstract void onPhaseStart(SearchPhaseContext context); - abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); + protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context); - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean isEnabled(SearchRequest searchRequest) { + protected boolean isEnabled(SearchRequest searchRequest) { return isEnabled(); } - boolean isEnabled() { + protected boolean isEnabled() { return enabled; } @@ -69,7 +69,7 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseStart(context); @@ -80,7 +80,7 @@ void onPhaseStart(SearchPhaseContext context) { } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseEnd(context, searchRequestContext); @@ -91,7 +91,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseFailure(context); @@ -102,7 +102,7 @@ void onPhaseFailure(SearchPhaseContext context) { } @Override - void onRequestStart(SearchRequestContext searchRequestContext) { + protected void onRequestStart(SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onRequestStart(searchRequestContext); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 7f25f9026f215..74e04d976cb1c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,19 +134,19 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} @Override - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 88d599a0dcdaa..ac32b08afb7f6 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -58,12 +58,12 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); phaseStats.current.dec(); phaseStats.total.inc(); @@ -71,7 +71,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 78c5ba4412c68..1cb336e18b12c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -119,13 +119,13 @@ public void testStandardListenerAndPerRequestListenerDisabled() { public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { return new SearchRequestOperationsListener() { @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} }; } }