From 9416193bf6738e1f8f73b7ec46c992714d9e6d19 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 10 Jan 2024 20:20:31 -0800 Subject: [PATCH] [POC]Flint metrics AOP --- build.sbt | 5 +- .../core/metrics/aop/MetricConstants.java | 11 ++++ .../flint/core/metrics/aop/MetricsAspect.java | 55 +++++++++++++++++++ .../core/metrics/aop/PublishMetrics.java | 12 ++++ .../core/storage/FlintOpenSearchClient.java | 7 +++ .../core/storage/OpenSearchScrollReader.java | 4 ++ .../flint/core/storage/OpenSearchUpdater.java | 6 ++ .../flint/core/storage/OpenSearchWriter.java | 3 + .../core/metrics/aop/MetricsAspectTest.java | 49 +++++++++++++++++ project/plugins.sbt | 1 + 10 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java diff --git a/build.sbt b/build.sbt index 469d57223..04d03bf11 100644 --- a/build.sbt +++ b/build.sbt @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ import Dependencies._ +import com.lightbend.sbt.SbtAspectj lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" @@ -48,6 +49,7 @@ lazy val root = (project in file(".")) .settings(name := "flint", publish / skip := true) lazy val flintCore = (project in file("flint-core")) + .enablePlugins(SbtAspectj) .disablePlugins(AssemblyPlugin) .settings( commonSettings, @@ -75,7 +77,8 @@ lazy val flintCore = (project in file("flint-core")) "net.aichler" % "jupiter-interface" % "0.11.1" % Test ), libraryDependencies ++= deps(sparkVersion), - publish / skip := true) + publish / skip := true + ) lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java new file mode 100644 index 000000000..13c0f475d --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java @@ -0,0 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.aop; + +public class MetricConstants { + public static final String OS_READ_METRIC = "opensearch.read"; + public static final String OS_WRITE_METRIC = "opensearch.write"; +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java new file mode 100644 index 000000000..c2e301cf5 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.aop; + +import org.aspectj.lang.annotation.AfterReturning; +import org.aspectj.lang.annotation.AfterThrowing; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.opensearch.OpenSearchException; + +@Aspect +public class MetricsAspect { + @Pointcut("@annotation(publishMetricsAnnotation)") + public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {} + + @AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation") + public String logSuccess(PublishMetrics publishMetricsAnnotation) { + int statusCode = 200; // Assume success with a status code of 200 + String metricName = publishMetricsAnnotation.metricName(); + return publishStatusMetrics(metricName, statusCode); + } + + @AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation") + public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) { + if (ex instanceof OpenSearchException) { + OpenSearchException openSearchException = (OpenSearchException) ex; + int statusCode = openSearchException.status().getStatus(); + String metricName = publishMetricsAnnotation.metricName(); + return publishStatusMetrics(metricName, statusCode); + } + return null; + } + + private String publishStatusMetrics(String metricName, int statusCode) { + String metricSuffix = null; + + if (statusCode == 200) { + metricSuffix = "2xx.count"; + } else if (statusCode == 403) { + metricSuffix = "403.count"; + } else if (statusCode >= 500) { + metricSuffix = "5xx.count"; + } else if (statusCode >= 400) { + metricSuffix = "4xx.count"; + } + + String fullMetricName = metricName + "." + metricSuffix; + + // TODO: Add metrics to the source + return fullMetricName; + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java new file mode 100644 index 000000000..d6590be1c --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java @@ -0,0 +1,12 @@ +package org.opensearch.flint.core.metrics.aop; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface PublishMetrics { + String metricName(); +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 4e549df2b..07764cb08 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -49,6 +49,8 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -127,6 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { createIndex(indexName, metadata.getContent(), metadata.indexSettings()); } + @PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC) protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -143,6 +146,7 @@ protected void createIndex(String indexName, String mapping, Option sett } @Override + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -154,6 +158,7 @@ public boolean exists(String indexName) { } @Override + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); @@ -172,6 +177,7 @@ public List getAllIndexMetadata(String indexNamePattern) { } @Override + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -188,6 +194,7 @@ public FlintMetadata getIndexMetadata(String indexName) { } @Override + @PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC) public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d71014c20..8c689bc15 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -15,6 +15,8 @@ import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -44,6 +46,7 @@ public OpenSearchScrollReader(RestHighLevelClient client, String indexName, Sear /** * search. */ + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) Optional search(SearchRequest request) throws IOException { if (Strings.isNullOrEmpty(scrollId)) { request.scroll(scrollDuration); @@ -66,6 +69,7 @@ Optional search(SearchRequest request) throws IOException { /** * clean the scroll context. */ + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) void clean() throws IOException { try { if (!Strings.isNullOrEmpty(scrollId)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 4a6424512..3381dc505 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -7,6 +7,8 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import java.io.IOException; import java.util.logging.Level; @@ -25,6 +27,7 @@ public OpenSearchUpdater(String indexName, FlintClient flintClient) { this.flintClient = flintClient; } + @PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC) public void upsert(String id, String doc) { // we might need to keep the updater for a long time. Reusing the client may not work as the temporary // credentials may expire. @@ -46,6 +49,7 @@ public void upsert(String id, String doc) { } } + @PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC) public void update(String id, String doc) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -62,6 +66,7 @@ public void update(String id, String doc) { } } + @PublishMetrics(metricName= MetricConstants.OS_WRITE_METRIC) public void updateIf(String id, String doc, long seqNo, long primaryTerm) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -80,6 +85,7 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { } } + @PublishMetrics(metricName=MetricConstants.OS_READ_METRIC) private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { LOG.info("Checking if index exists " + indexName); if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index 1e55084b2..f00b5d377 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -13,6 +13,8 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.rest.RestStatus; import java.io.BufferedWriter; @@ -52,6 +54,7 @@ public OpenSearchWriter(RestHighLevelClient client, String indexName, String ref * Flush the data in buffer. * Todo. StringWriter is not efficient. it will copy the cbuf when create bytes. */ + @PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC) @Override public void flush() { try { if (sb.length() > 0) { diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java new file mode 100644 index 000000000..20aae7641 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java @@ -0,0 +1,49 @@ +package org.opensearch.flint.core.metrics.aop; + +import org.aspectj.lang.JoinPoint; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opensearch.OpenSearchException; +import org.opensearch.rest.RestStatus; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class MetricsAspectTest { + + @InjectMocks + private MetricsAspect metricsAspect; + + @Mock + private JoinPoint joinPoint; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testLogSuccess() { + PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); + when(publishMetricsAnnotation.metricName()).thenReturn("testMetric"); + + assert ("testMetric.2xx.count".equals(metricsAspect.logSuccess(publishMetricsAnnotation))); + } + + @Test + public void testLogExceptionWithOpenSearchException() { + PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); + when(publishMetricsAnnotation.metricName()).thenReturn("testMetric"); + + OpenSearchException exception = Mockito.mock(OpenSearchException.class); + Mockito.when(exception.getMessage()).thenReturn("Error"); + Mockito.when(exception.getCause()).thenReturn(new RuntimeException()); + Mockito.when(exception.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); + + assertEquals("testMetric.5xx.count", metricsAspect.logException(exception, publishMetricsAnnotation)); + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 38550667b..f408d6813 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,6 +5,7 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") +addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0") addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")