diff --git a/build.sbt b/build.sbt index 469d57223..00ac0b6bc 100644 --- a/build.sbt +++ b/build.sbt @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ import Dependencies._ +import com.lightbend.sbt.SbtAspectj lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" @@ -48,6 +49,7 @@ lazy val root = (project in file(".")) .settings(name := "flint", publish / skip := true) lazy val flintCore = (project in file("flint-core")) + .enablePlugins(SbtAspectj) .disablePlugins(AssemblyPlugin) .settings( commonSettings, @@ -67,15 +69,17 @@ lazy val flintCore = (project in file("flint-core")) "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", - "org.mockito" % "mockito-core" % "2.23.0" % "test", - "org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test", + "org.mockito" % "mockito-core" % "5.2.0" % "test", + "org.mockito" % "mockito-inline" % "5.2.0" % "test", + "org.mockito" % "mockito-junit-jupiter" % "5.2.0" % "test", "org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test", "org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test", "com.google.truth" % "truth" % "1.1.5" % "test", "net.aichler" % "jupiter-interface" % "0.11.1" % Test ), libraryDependencies ++= deps(sparkVersion), - publish / skip := true) + publish / skip := true + ) lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) .enablePlugins(AssemblyPlugin, Antlr4Plugin) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java new file mode 100644 index 000000000..e64d1193d --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.aop; + +/** + * This class defines custom metric constants used for monitoring flint operations. + */ +public class MetricConstants { + + /** + * The prefix for all read-related metrics in OpenSearch. + * This constant is used as a part of metric names to categorize and identify metrics related to read operations. + */ + public static final String OS_READ_METRIC_PREFIX = "opensearch.read"; + + /** + * The prefix for all write-related metrics in OpenSearch. + * Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations. + */ + public static final String OS_WRITE_METRIC_PREFIX = "opensearch.write"; +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java new file mode 100644 index 000000000..1ea378957 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metrics.aop; + +import com.codahale.metrics.Counter; +import org.apache.spark.SparkEnv; +import org.apache.spark.metrics.source.FlintMetricSource; +import org.apache.spark.metrics.source.Source; +import org.aspectj.lang.annotation.AfterReturning; +import org.aspectj.lang.annotation.AfterThrowing; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.opensearch.OpenSearchException; +import scala.collection.Seq; + +/** + * Aspect for logging metrics based on the annotated methods in Flint components. + * It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics} + * to log successful executions and exceptions as metrics. + */ +@Aspect +public class MetricsAspect { + + /** + * Pointcut to match methods annotated with {@link PublishMetrics}. + * + * @param publishMetricsAnnotation the PublishMetrics annotation + */ + @Pointcut("@annotation(publishMetricsAnnotation)") + public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {} + + /** + * After returning advice that logs successful method executions. + * This method is invoked after a method annotated with {@link PublishMetrics} successfully returns. + * It publishes a success metric with a standard status code of 200. + * + * @param publishMetricsAnnotation the PublishMetrics annotation + * @return the full metric name for the successful execution + */ + @AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation") + public String logSuccess(PublishMetrics publishMetricsAnnotation) { + int statusCode = 200; // Assume success with a status code of 200 + String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); + return publishStatusMetrics(metricNamePrefix, statusCode); + } + + /** + * After throwing advice that logs exceptions thrown by methods. + * This method is invoked when a method annotated with {@link PublishMetrics} throws an exception. + * It extracts the status code from the OpenSearchException and publishes a corresponding metric. + * + * @param ex the exception thrown by the method + * @param publishMetricsAnnotation the PublishMetrics annotation + * @return the full metric name for the exception, or null if the exception is not an OpenSearchException + */ + @AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation") + public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) { + OpenSearchException openSearchException = extractOpenSearchException(ex); + if (openSearchException != null) { + int statusCode = openSearchException.status().getStatus(); + String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); + return publishStatusMetrics(metricNamePrefix, statusCode); + } + return null; + } + + /** + * Extracts an OpenSearchException from the given Throwable. + * This method checks if the Throwable is an instance of OpenSearchException or caused by one. + * + * @param ex the exception to be checked + * @return the extracted OpenSearchException, or null if not found + */ + private OpenSearchException extractOpenSearchException(Throwable ex) { + if (ex instanceof OpenSearchException) { + return (OpenSearchException) ex; + } else if (ex.getCause() instanceof OpenSearchException) { + return (OpenSearchException) ex.getCause(); + } + return null; + } + + + private String publishStatusMetrics(String metricNamePrefix, int statusCode) { + // TODO: Refactor this impl + String metricName = null; + if (SparkEnv.get() != null) { + FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(); + metricName = constructMetricName(metricNamePrefix, statusCode); + Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); + if (counter == null) { + counter = flintMetricSource.metricRegistry().counter(metricName); + } + counter.inc(); + } + return metricName; + } + + private String constructMetricName(String metricNamePrefix, int statusCode) { + String metricSuffix = null; + + if (statusCode == 200) { + metricSuffix = "2xx.count"; + } else if (statusCode == 403) { + metricSuffix = "403.count"; + } else if (statusCode >= 500) { + metricSuffix = "5xx.count"; + } else if (statusCode >= 400) { + metricSuffix = "4xx.count"; + } + + return metricNamePrefix + "." + metricSuffix; + } + + + private static FlintMetricSource getOrInitFlintMetricSource() { + Seq metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); + + if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { + FlintMetricSource metricSource = new FlintMetricSource(); + SparkEnv.get().metricsSystem().registerSource(metricSource); + return metricSource; + } + return (FlintMetricSource) metricSourceSeq.head(); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java new file mode 100644 index 000000000..5ce416cf0 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java @@ -0,0 +1,29 @@ +package org.opensearch.flint.core.metrics.aop; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +/** + * This annotation is used to mark methods or types that publish metrics. + * It is retained at runtime, allowing the application to utilize reflection to + * discover and handle these metrics accordingly. The annotation can be applied + * to both methods and types (classes or interfaces). + * + * @Retention(RetentionPolicy.RUNTIME) - Indicates that the annotation is available at runtime for reflection. + * @Target({ElementType.METHOD, ElementType.TYPE}) - Specifies that this annotation can be applied to methods or types. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface PublishMetrics { + + /** + * Defines the prefix of the metric name. + * This prefix is used to categorize the metrics and typically represents a specific aspect or feature of Flint. + * + * @return the metric name prefix. + */ + String metricNamePrefix(); +} diff --git a/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala new file mode 100644 index 000000000..f9af30d59 --- /dev/null +++ b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +class FlintMetricSource() extends Source { + + // Implementing the Source trait + override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME + override val metricRegistry: MetricRegistry = new MetricRegistry +} + +object FlintMetricSource { + val FLINT_METRIC_SOURCE_NAME = "FlintMetricSource" // Default source name +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 4e549df2b..f9cc61c8d 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -49,6 +49,8 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -127,6 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { createIndex(indexName, metadata.getContent(), metadata.indexSettings()); } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -143,6 +146,7 @@ protected void createIndex(String indexName, String mapping, Option sett } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -154,6 +158,7 @@ public boolean exists(String indexName) { } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); @@ -172,6 +177,7 @@ public List getAllIndexMetadata(String indexNamePattern) { } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -188,6 +194,7 @@ public FlintMetadata getIndexMetadata(String indexName) { } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index ab38a5f60..031adff97 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -27,6 +27,8 @@ import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history @@ -75,6 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public Optional getLatest() { LOG.info("Fetching latest log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { @@ -100,6 +103,7 @@ public Optional getLatest() { } @Override + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void purge() { LOG.info("Purging log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { @@ -113,6 +117,7 @@ public void purge() { } } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here @@ -136,6 +141,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { RequestOptions.DEFAULT)); } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Updating log entry " + logEntry); return writeLogEntry(logEntry, @@ -172,6 +178,7 @@ private FlintMetadataLogEntry writeLogEntry( } } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); try (RestHighLevelClient client = flintClient.createClient()) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index d71014c20..57ea8d5c1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -15,6 +15,8 @@ import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -44,6 +46,7 @@ public OpenSearchScrollReader(RestHighLevelClient client, String indexName, Sear /** * search. */ + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) Optional search(SearchRequest request) throws IOException { if (Strings.isNullOrEmpty(scrollId)) { request.scroll(scrollDuration); @@ -66,6 +69,7 @@ Optional search(SearchRequest request) throws IOException { /** * clean the scroll context. */ + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) void clean() throws IOException { try { if (!Strings.isNullOrEmpty(scrollId)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 4a6424512..67627adc7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -7,6 +7,8 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import java.io.IOException; import java.util.logging.Level; @@ -25,6 +27,7 @@ public OpenSearchUpdater(String indexName, FlintClient flintClient) { this.flintClient = flintClient; } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void upsert(String id, String doc) { // we might need to keep the updater for a long time. Reusing the client may not work as the temporary // credentials may expire. @@ -46,6 +49,7 @@ public void upsert(String id, String doc) { } } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void update(String id, String doc) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -62,6 +66,7 @@ public void update(String id, String doc) { } } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void updateIf(String id, String doc, long seqNo, long primaryTerm) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -80,6 +85,7 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { } } + @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { LOG.info("Checking if index exists " + indexName); if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index 1e55084b2..c68cc4879 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -9,18 +9,14 @@ import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.support.WriteRequest; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.metrics.aop.MetricConstants; +import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.rest.RestStatus; -import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.StringWriter; -import java.io.Writer; import java.util.Arrays; /** @@ -52,6 +48,7 @@ public OpenSearchWriter(RestHighLevelClient client, String indexName, String ref * Flush the data in buffer. * Todo. StringWriter is not efficient. it will copy the cbuf when create bytes. */ + @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) @Override public void flush() { try { if (sb.length() > 0) { diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java new file mode 100644 index 000000000..300ca7580 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java @@ -0,0 +1,77 @@ +package org.opensearch.flint.core.metrics.aop; + +import org.apache.spark.SparkEnv; +import org.apache.spark.metrics.source.FlintMetricSource; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import org.opensearch.OpenSearchException; +import org.opensearch.rest.RestStatus; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class MetricsAspectTest { + private MetricsAspect metricsAspect; + + @Test + public void testLogSuccess() { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + metricsAspect = new MetricsAspect(); + + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + FlintMetricSource flintMetricSource = spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(flintMetricSource.sourceName()).head()) + .thenReturn(flintMetricSource); + + PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); + when(publishMetricsAnnotation.metricNamePrefix()).thenReturn("testMetric"); + + String expectedMetricName = "testMetric.2xx.count"; + String actualMetricName = metricsAspect.logSuccess(publishMetricsAnnotation); + + verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); + verify(flintMetricSource, times(2)).metricRegistry(); + assertEquals(expectedMetricName, actualMetricName); + assertEquals(1, flintMetricSource.metricRegistry().getCounters().get(expectedMetricName).getCount()); + } + } + + @Test + public void testLogExceptionWithOpenSearchException() { + try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { + metricsAspect = new MetricsAspect(); + + SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); + sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); + FlintMetricSource flintMetricSource = spy(new FlintMetricSource()); + when(sparkEnv.metricsSystem().getSourcesByName(flintMetricSource.sourceName()).head()) + .thenReturn(flintMetricSource); + + PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); + when(publishMetricsAnnotation.metricNamePrefix()).thenReturn("testMetric"); + + OpenSearchException exception = mock(OpenSearchException.class); + when(exception.getMessage()).thenReturn("Error"); + when(exception.getCause()).thenReturn(new RuntimeException()); + when(exception.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); + + String expectedMetricName = "testMetric.5xx.count"; + String actualMetricName = metricsAspect.logException(exception, publishMetricsAnnotation); + + verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); + verify(flintMetricSource, times(2)).metricRegistry(); + assertEquals(expectedMetricName, actualMetricName); + assertEquals(1, flintMetricSource.metricRegistry().getCounters().get(expectedMetricName).getCount()); + } + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 38550667b..f408d6813 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,6 +5,7 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") +addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0") addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")