From 2bbeb1f0771cc96b93807e84758f0ef84c407ebf Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 18 Jan 2024 21:37:45 -0800 Subject: [PATCH] Test post compile weaving Signed-off-by: Louis Chu --- .../flint/core/metrics/aop/MetricsAspect.java | 258 +++++++++--------- .../core/metrics/aop/PublishMetrics.java | 29 -- .../metrics/aop/RestClientMetricsAspect.java | 43 +++ .../core/storage/FlintOpenSearchClient.java | 7 - .../storage/FlintOpenSearchMetadataLog.java | 7 - .../core/storage/OpenSearchScrollReader.java | 4 - .../flint/core/storage/OpenSearchUpdater.java | 6 - .../flint/core/storage/OpenSearchWriter.java | 3 - .../core/metrics/aop/MetricsAspectTest.java | 77 ------ 9 files changed, 172 insertions(+), 262 deletions(-) delete mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/RestClientMetricsAspect.java delete mode 100644 flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java index 1ea378957..9ead24048 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java @@ -1,129 +1,129 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.metrics.aop; - -import com.codahale.metrics.Counter; -import org.apache.spark.SparkEnv; -import org.apache.spark.metrics.source.FlintMetricSource; -import org.apache.spark.metrics.source.Source; -import org.aspectj.lang.annotation.AfterReturning; -import org.aspectj.lang.annotation.AfterThrowing; -import org.aspectj.lang.annotation.Aspect; -import org.aspectj.lang.annotation.Pointcut; -import org.opensearch.OpenSearchException; -import scala.collection.Seq; - -/** - * Aspect for logging metrics based on the annotated methods in Flint components. - * It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics} - * to log successful executions and exceptions as metrics. - */ -@Aspect -public class MetricsAspect { - - /** - * Pointcut to match methods annotated with {@link PublishMetrics}. - * - * @param publishMetricsAnnotation the PublishMetrics annotation - */ - @Pointcut("@annotation(publishMetricsAnnotation)") - public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {} - - /** - * After returning advice that logs successful method executions. - * This method is invoked after a method annotated with {@link PublishMetrics} successfully returns. - * It publishes a success metric with a standard status code of 200. - * - * @param publishMetricsAnnotation the PublishMetrics annotation - * @return the full metric name for the successful execution - */ - @AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation") - public String logSuccess(PublishMetrics publishMetricsAnnotation) { - int statusCode = 200; // Assume success with a status code of 200 - String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); - return publishStatusMetrics(metricNamePrefix, statusCode); - } - - /** - * After throwing advice that logs exceptions thrown by methods. - * This method is invoked when a method annotated with {@link PublishMetrics} throws an exception. - * It extracts the status code from the OpenSearchException and publishes a corresponding metric. - * - * @param ex the exception thrown by the method - * @param publishMetricsAnnotation the PublishMetrics annotation - * @return the full metric name for the exception, or null if the exception is not an OpenSearchException - */ - @AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation") - public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) { - OpenSearchException openSearchException = extractOpenSearchException(ex); - if (openSearchException != null) { - int statusCode = openSearchException.status().getStatus(); - String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); - return publishStatusMetrics(metricNamePrefix, statusCode); - } - return null; - } - - /** - * Extracts an OpenSearchException from the given Throwable. - * This method checks if the Throwable is an instance of OpenSearchException or caused by one. - * - * @param ex the exception to be checked - * @return the extracted OpenSearchException, or null if not found - */ - private OpenSearchException extractOpenSearchException(Throwable ex) { - if (ex instanceof OpenSearchException) { - return (OpenSearchException) ex; - } else if (ex.getCause() instanceof OpenSearchException) { - return (OpenSearchException) ex.getCause(); - } - return null; - } - - - private String publishStatusMetrics(String metricNamePrefix, int statusCode) { - // TODO: Refactor this impl - String metricName = null; - if (SparkEnv.get() != null) { - FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(); - metricName = constructMetricName(metricNamePrefix, statusCode); - Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); - if (counter == null) { - counter = flintMetricSource.metricRegistry().counter(metricName); - } - counter.inc(); - } - return metricName; - } - - private String constructMetricName(String metricNamePrefix, int statusCode) { - String metricSuffix = null; - - if (statusCode == 200) { - metricSuffix = "2xx.count"; - } else if (statusCode == 403) { - metricSuffix = "403.count"; - } else if (statusCode >= 500) { - metricSuffix = "5xx.count"; - } else if (statusCode >= 400) { - metricSuffix = "4xx.count"; - } - - return metricNamePrefix + "." + metricSuffix; - } - - - private static FlintMetricSource getOrInitFlintMetricSource() { - Seq metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); - - if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { - FlintMetricSource metricSource = new FlintMetricSource(); - SparkEnv.get().metricsSystem().registerSource(metricSource); - return metricSource; - } - return (FlintMetricSource) metricSourceSeq.head(); - } -} +///* +// * Copyright OpenSearch Contributors +// * SPDX-License-Identifier: Apache-2.0 +// */ +// +//package org.opensearch.flint.core.metrics.aop; +// +//import com.codahale.metrics.Counter; +//import org.apache.spark.SparkEnv; +//import org.apache.spark.metrics.source.FlintMetricSource; +//import org.apache.spark.metrics.source.Source; +//import org.aspectj.lang.annotation.AfterReturning; +//import org.aspectj.lang.annotation.AfterThrowing; +//import org.aspectj.lang.annotation.Aspect; +//import org.aspectj.lang.annotation.Pointcut; +//import org.opensearch.OpenSearchException; +//import scala.collection.Seq; +// +///** +// * Aspect for logging metrics based on the annotated methods in Flint components. +// * It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics} +// * to log successful executions and exceptions as metrics. +// */ +//@Aspect +//public class MetricsAspect { +// +// /** +// * Pointcut to match methods annotated with {@link PublishMetrics}. +// * +// * @param publishMetricsAnnotation the PublishMetrics annotation +// */ +// @Pointcut("@annotation(publishMetricsAnnotation)") +// public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {} +// +// /** +// * After returning advice that logs successful method executions. +// * This method is invoked after a method annotated with {@link PublishMetrics} successfully returns. +// * It publishes a success metric with a standard status code of 200. +// * +// * @param publishMetricsAnnotation the PublishMetrics annotation +// * @return the full metric name for the successful execution +// */ +// @AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation") +// public String logSuccess(PublishMetrics publishMetricsAnnotation) { +// int statusCode = 200; // Assume success with a status code of 200 +// String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); +// return publishStatusMetrics(metricNamePrefix, statusCode); +// } +// +// /** +// * After throwing advice that logs exceptions thrown by methods. +// * This method is invoked when a method annotated with {@link PublishMetrics} throws an exception. +// * It extracts the status code from the OpenSearchException and publishes a corresponding metric. +// * +// * @param ex the exception thrown by the method +// * @param publishMetricsAnnotation the PublishMetrics annotation +// * @return the full metric name for the exception, or null if the exception is not an OpenSearchException +// */ +// @AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation") +// public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) { +// OpenSearchException openSearchException = extractOpenSearchException(ex); +// if (openSearchException != null) { +// int statusCode = openSearchException.status().getStatus(); +// String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); +// return publishStatusMetrics(metricNamePrefix, statusCode); +// } +// return null; +// } +// +// /** +// * Extracts an OpenSearchException from the given Throwable. +// * This method checks if the Throwable is an instance of OpenSearchException or caused by one. +// * +// * @param ex the exception to be checked +// * @return the extracted OpenSearchException, or null if not found +// */ +// private OpenSearchException extractOpenSearchException(Throwable ex) { +// if (ex instanceof OpenSearchException) { +// return (OpenSearchException) ex; +// } else if (ex.getCause() instanceof OpenSearchException) { +// return (OpenSearchException) ex.getCause(); +// } +// return null; +// } +// +// +// private String publishStatusMetrics(String metricNamePrefix, int statusCode) { +// // TODO: Refactor this impl +// String metricName = null; +// if (SparkEnv.get() != null) { +// FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(); +// metricName = constructMetricName(metricNamePrefix, statusCode); +// Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); +// if (counter == null) { +// counter = flintMetricSource.metricRegistry().counter(metricName); +// } +// counter.inc(); +// } +// return metricName; +// } +// +// private String constructMetricName(String metricNamePrefix, int statusCode) { +// String metricSuffix = null; +// +// if (statusCode == 200) { +// metricSuffix = "2xx.count"; +// } else if (statusCode == 403) { +// metricSuffix = "403.count"; +// } else if (statusCode >= 500) { +// metricSuffix = "5xx.count"; +// } else if (statusCode >= 400) { +// metricSuffix = "4xx.count"; +// } +// +// return metricNamePrefix + "." + metricSuffix; +// } +// +// +// private static FlintMetricSource getOrInitFlintMetricSource() { +// Seq metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME()); +// +// if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { +// FlintMetricSource metricSource = new FlintMetricSource(); +// SparkEnv.get().metricsSystem().registerSource(metricSource); +// return metricSource; +// } +// return (FlintMetricSource) metricSourceSeq.head(); +// } +//} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java deleted file mode 100644 index 5ce416cf0..000000000 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.opensearch.flint.core.metrics.aop; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - - -/** - * This annotation is used to mark methods or types that publish metrics. - * It is retained at runtime, allowing the application to utilize reflection to - * discover and handle these metrics accordingly. The annotation can be applied - * to both methods and types (classes or interfaces). - * - * @Retention(RetentionPolicy.RUNTIME) - Indicates that the annotation is available at runtime for reflection. - * @Target({ElementType.METHOD, ElementType.TYPE}) - Specifies that this annotation can be applied to methods or types. - */ -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD, ElementType.TYPE}) -public @interface PublishMetrics { - - /** - * Defines the prefix of the metric name. - * This prefix is used to categorize the metrics and typically represents a specific aspect or feature of Flint. - * - * @return the metric name prefix. - */ - String metricNamePrefix(); -} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/RestClientMetricsAspect.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/RestClientMetricsAspect.java new file mode 100644 index 000000000..8c0275332 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/RestClientMetricsAspect.java @@ -0,0 +1,43 @@ +package org.opensearch.flint.core.metrics.aop; + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; + +import java.util.logging.Logger; + + +@Aspect +public class RestClientMetricsAspect { + + private static final Logger LOG = Logger.getLogger(RestClientMetricsAspect.class.getName()); + + @Pointcut("execution(* org.opensearch.client.RestHighLevelClient.*(..))") + public void onRestHighLevelClientMethod() {} + + @Around("onRestHighLevelClientMethod()") + public Object aroundRestHighLevelClientMethod(ProceedingJoinPoint pjp) throws Throwable { + long startTime = System.currentTimeMillis(); + + try { + Object response = pjp.proceed(); + long duration = System.currentTimeMillis() - startTime; + logMetric("success", duration); + return response; + } catch (Exception e) { + long duration = System.currentTimeMillis() - startTime; + logMetric("failure", duration); + throw e; + } + } + + private void logMetric(String status, long duration) { + LOG.info("Request " + status + ": " + duration + "ms"); + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index f9cc61c8d..4e549df2b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -49,8 +49,6 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; -import org.opensearch.flint.core.metrics.aop.MetricConstants; -import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.index.query.AbstractQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -129,7 +127,6 @@ public void createIndex(String indexName, FlintMetadata metadata) { createIndex(indexName, metadata.getContent(), metadata.indexSettings()); } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -146,7 +143,6 @@ protected void createIndex(String indexName, String mapping, Option sett } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -158,7 +154,6 @@ public boolean exists(String indexName) { } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); String osIndexNamePattern = sanitizeIndexName(indexNamePattern); @@ -177,7 +172,6 @@ public List getAllIndexMetadata(String indexNamePattern) { } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); String osIndexName = sanitizeIndexName(indexName); @@ -194,7 +188,6 @@ public FlintMetadata getIndexMetadata(String indexName) { } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); String osIndexName = sanitizeIndexName(indexName); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 031adff97..ab38a5f60 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -27,8 +27,6 @@ import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; -import org.opensearch.flint.core.metrics.aop.MetricConstants; -import org.opensearch.flint.core.metrics.aop.PublishMetrics; /** * Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history @@ -77,7 +75,6 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) { } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) public Optional getLatest() { LOG.info("Fetching latest log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { @@ -103,7 +100,6 @@ public Optional getLatest() { } @Override - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void purge() { LOG.info("Purging log entry with id " + latestId); try (RestHighLevelClient client = flintClient.createClient()) { @@ -117,7 +113,6 @@ public void purge() { } } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Creating log entry " + logEntry); // Assign doc ID here @@ -141,7 +136,6 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { RequestOptions.DEFAULT)); } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { LOG.info("Updating log entry " + logEntry); return writeLogEntry(logEntry, @@ -178,7 +172,6 @@ private FlintMetadataLogEntry writeLogEntry( } } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) private boolean exists() { LOG.info("Checking if Flint index exists " + metaLogIndexName); try (RestHighLevelClient client = flintClient.createClient()) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java index 57ea8d5c1..d71014c20 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchScrollReader.java @@ -15,8 +15,6 @@ import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.flint.core.FlintOptions; -import org.opensearch.flint.core.metrics.aop.MetricConstants; -import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -46,7 +44,6 @@ public OpenSearchScrollReader(RestHighLevelClient client, String indexName, Sear /** * search. */ - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) Optional search(SearchRequest request) throws IOException { if (Strings.isNullOrEmpty(scrollId)) { request.scroll(scrollDuration); @@ -69,7 +66,6 @@ Optional search(SearchRequest request) throws IOException { /** * clean the scroll context. */ - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) void clean() throws IOException { try { if (!Strings.isNullOrEmpty(scrollId)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 67627adc7..4a6424512 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -7,8 +7,6 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; -import org.opensearch.flint.core.metrics.aop.MetricConstants; -import org.opensearch.flint.core.metrics.aop.PublishMetrics; import java.io.IOException; import java.util.logging.Level; @@ -27,7 +25,6 @@ public OpenSearchUpdater(String indexName, FlintClient flintClient) { this.flintClient = flintClient; } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void upsert(String id, String doc) { // we might need to keep the updater for a long time. Reusing the client may not work as the temporary // credentials may expire. @@ -49,7 +46,6 @@ public void upsert(String id, String doc) { } } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void update(String id, String doc) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -66,7 +62,6 @@ public void update(String id, String doc) { } } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) public void updateIf(String id, String doc, long seqNo, long primaryTerm) { try (RestHighLevelClient client = flintClient.createClient()) { assertIndexExist(client, indexName); @@ -85,7 +80,6 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) { } } - @PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX) private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException { LOG.info("Checking if index exists " + indexName); if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index c68cc4879..eac2744f7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -12,8 +12,6 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.flint.core.metrics.aop.MetricConstants; -import org.opensearch.flint.core.metrics.aop.PublishMetrics; import org.opensearch.rest.RestStatus; import java.io.IOException; @@ -48,7 +46,6 @@ public OpenSearchWriter(RestHighLevelClient client, String indexName, String ref * Flush the data in buffer. * Todo. StringWriter is not efficient. it will copy the cbuf when create bytes. */ - @PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX) @Override public void flush() { try { if (sb.length() > 0) { diff --git a/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java b/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java deleted file mode 100644 index 300ca7580..000000000 --- a/flint-core/src/test/java/org/opensearch/flint/core/metrics/aop/MetricsAspectTest.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.opensearch.flint.core.metrics.aop; - -import org.apache.spark.SparkEnv; -import org.apache.spark.metrics.source.FlintMetricSource; -import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; - -import org.opensearch.OpenSearchException; -import org.opensearch.rest.RestStatus; - -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -public class MetricsAspectTest { - private MetricsAspect metricsAspect; - - @Test - public void testLogSuccess() { - try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { - metricsAspect = new MetricsAspect(); - - SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); - sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); - FlintMetricSource flintMetricSource = spy(new FlintMetricSource()); - when(sparkEnv.metricsSystem().getSourcesByName(flintMetricSource.sourceName()).head()) - .thenReturn(flintMetricSource); - - PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); - when(publishMetricsAnnotation.metricNamePrefix()).thenReturn("testMetric"); - - String expectedMetricName = "testMetric.2xx.count"; - String actualMetricName = metricsAspect.logSuccess(publishMetricsAnnotation); - - verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); - verify(flintMetricSource, times(2)).metricRegistry(); - assertEquals(expectedMetricName, actualMetricName); - assertEquals(1, flintMetricSource.metricRegistry().getCounters().get(expectedMetricName).getCount()); - } - } - - @Test - public void testLogExceptionWithOpenSearchException() { - try (MockedStatic sparkEnvMock = mockStatic(SparkEnv.class)) { - metricsAspect = new MetricsAspect(); - - SparkEnv sparkEnv = mock(SparkEnv.class, RETURNS_DEEP_STUBS); - sparkEnvMock.when(SparkEnv::get).thenReturn(sparkEnv); - FlintMetricSource flintMetricSource = spy(new FlintMetricSource()); - when(sparkEnv.metricsSystem().getSourcesByName(flintMetricSource.sourceName()).head()) - .thenReturn(flintMetricSource); - - PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class); - when(publishMetricsAnnotation.metricNamePrefix()).thenReturn("testMetric"); - - OpenSearchException exception = mock(OpenSearchException.class); - when(exception.getMessage()).thenReturn("Error"); - when(exception.getCause()).thenReturn(new RuntimeException()); - when(exception.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); - - String expectedMetricName = "testMetric.5xx.count"; - String actualMetricName = metricsAspect.logException(exception, publishMetricsAnnotation); - - verify(sparkEnv.metricsSystem(), times(0)).registerSource(any()); - verify(flintMetricSource, times(2)).metricRegistry(); - assertEquals(expectedMetricName, actualMetricName); - assertEquals(1, flintMetricSource.metricRegistry().getCounters().get(expectedMetricName).getCount()); - } - } -}