-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Louis Chu <[email protected]>
- Loading branch information
Showing
12 changed files
with
325 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 26 additions & 0 deletions
26
flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.metrics.aop; | ||
|
||
/** | ||
* This class defines custom metric constants used for monitoring flint operations. | ||
*/ | ||
public class MetricConstants { | ||
|
||
/** | ||
* The prefix for all read-related metrics in OpenSearch. | ||
* This constant is used as a part of metric names to categorize and identify metrics related to read operations. | ||
*/ | ||
public static final String OS_READ_METRIC_PREFIX = "opensearch.read"; | ||
|
||
/** | ||
* The prefix for all write-related metrics in OpenSearch. | ||
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations. | ||
*/ | ||
public static final String OS_WRITE_METRIC_PREFIX = "opensearch.write"; | ||
|
||
public static final String FLINT_METRIC_SOURCE_NAME = "FlintMetricSource"; | ||
} |
131 changes: 131 additions & 0 deletions
131
flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/MetricsAspect.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.metrics.aop; | ||
|
||
import com.codahale.metrics.Counter; | ||
import org.apache.spark.SparkEnv; | ||
import org.apache.spark.metrics.source.FlintMetricSource; | ||
import org.apache.spark.metrics.source.Source; | ||
import org.aspectj.lang.annotation.AfterReturning; | ||
import org.aspectj.lang.annotation.AfterThrowing; | ||
import org.aspectj.lang.annotation.Aspect; | ||
import org.aspectj.lang.annotation.Pointcut; | ||
import org.opensearch.OpenSearchException; | ||
import scala.collection.Seq; | ||
|
||
import static org.opensearch.flint.core.metrics.aop.MetricConstants.FLINT_METRIC_SOURCE_NAME; | ||
|
||
/** | ||
* Aspect for logging metrics based on the annotated methods in Flint components. | ||
* It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics} | ||
* to log successful executions and exceptions as metrics. | ||
*/ | ||
@Aspect | ||
public class MetricsAspect { | ||
|
||
/** | ||
* Pointcut to match methods annotated with {@link PublishMetrics}. | ||
* | ||
* @param publishMetricsAnnotation the PublishMetrics annotation | ||
*/ | ||
@Pointcut("@annotation(publishMetricsAnnotation)") | ||
public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {} | ||
|
||
/** | ||
* After returning advice that logs successful method executions. | ||
* This method is invoked after a method annotated with {@link PublishMetrics} successfully returns. | ||
* It publishes a success metric with a standard status code of 200. | ||
* | ||
* @param publishMetricsAnnotation the PublishMetrics annotation | ||
* @return the full metric name for the successful execution | ||
*/ | ||
@AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation") | ||
public String logSuccess(PublishMetrics publishMetricsAnnotation) { | ||
int statusCode = 200; // Assume success with a status code of 200 | ||
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); | ||
return publishStatusMetrics(metricNamePrefix, statusCode); | ||
} | ||
|
||
/** | ||
* After throwing advice that logs exceptions thrown by methods. | ||
* This method is invoked when a method annotated with {@link PublishMetrics} throws an exception. | ||
* It extracts the status code from the OpenSearchException and publishes a corresponding metric. | ||
* | ||
* @param ex the exception thrown by the method | ||
* @param publishMetricsAnnotation the PublishMetrics annotation | ||
* @return the full metric name for the exception, or null if the exception is not an OpenSearchException | ||
*/ | ||
@AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation") | ||
public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) { | ||
OpenSearchException openSearchException = extractOpenSearchException(ex); | ||
if (openSearchException != null) { | ||
int statusCode = openSearchException.status().getStatus(); | ||
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix(); | ||
return publishStatusMetrics(metricNamePrefix, statusCode); | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
* Extracts an OpenSearchException from the given Throwable. | ||
* This method checks if the Throwable is an instance of OpenSearchException or caused by one. | ||
* | ||
* @param ex the exception to be checked | ||
* @return the extracted OpenSearchException, or null if not found | ||
*/ | ||
private OpenSearchException extractOpenSearchException(Throwable ex) { | ||
if (ex instanceof OpenSearchException) { | ||
return (OpenSearchException) ex; | ||
} else if (ex.getCause() instanceof OpenSearchException) { | ||
return (OpenSearchException) ex.getCause(); | ||
} | ||
return null; | ||
} | ||
|
||
|
||
private String publishStatusMetrics(String metricNamePrefix, int statusCode) { | ||
// TODO: Refactor this impl | ||
String metricName = null; | ||
if (SparkEnv.get() != null) { | ||
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(); | ||
metricName = constructMetricName(metricNamePrefix, statusCode); | ||
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName); | ||
if (counter == null) { | ||
counter = flintMetricSource.metricRegistry().counter(metricName); | ||
} | ||
counter.inc(); | ||
} | ||
return metricName; | ||
} | ||
|
||
private String constructMetricName(String metricNamePrefix, int statusCode) { | ||
String metricSuffix = null; | ||
|
||
if (statusCode == 200) { | ||
metricSuffix = "2xx.count"; | ||
} else if (statusCode == 403) { | ||
metricSuffix = "403.count"; | ||
} else if (statusCode >= 500) { | ||
metricSuffix = "5xx.count"; | ||
} else if (statusCode >= 400) { | ||
metricSuffix = "4xx.count"; | ||
} | ||
|
||
return metricNamePrefix + "." + metricSuffix; | ||
} | ||
|
||
|
||
private static FlintMetricSource getOrInitFlintMetricSource() { | ||
Seq<Source> metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FLINT_METRIC_SOURCE_NAME); | ||
|
||
if (metricSourceSeq == null || metricSourceSeq.isEmpty()) { | ||
FlintMetricSource metricSource = new FlintMetricSource(FLINT_METRIC_SOURCE_NAME); | ||
SparkEnv.get().metricsSystem().registerSource(metricSource); | ||
return metricSource; | ||
} | ||
return (FlintMetricSource) metricSourceSeq.head(); | ||
} | ||
} |
29 changes: 29 additions & 0 deletions
29
flint-core/src/main/java/org/opensearch/flint/core/metrics/aop/PublishMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package org.opensearch.flint.core.metrics.aop; | ||
|
||
import java.lang.annotation.ElementType; | ||
import java.lang.annotation.Retention; | ||
import java.lang.annotation.RetentionPolicy; | ||
import java.lang.annotation.Target; | ||
|
||
|
||
/** | ||
* This annotation is used to mark methods or types that publish metrics. | ||
* It is retained at runtime, allowing the application to utilize reflection to | ||
* discover and handle these metrics accordingly. The annotation can be applied | ||
* to both methods and types (classes or interfaces). | ||
* | ||
* @Retention(RetentionPolicy.RUNTIME) - Indicates that the annotation is available at runtime for reflection. | ||
* @Target({ElementType.METHOD, ElementType.TYPE}) - Specifies that this annotation can be applied to methods or types. | ||
*/ | ||
@Retention(RetentionPolicy.RUNTIME) | ||
@Target({ElementType.METHOD, ElementType.TYPE}) | ||
public @interface PublishMetrics { | ||
|
||
/** | ||
* Defines the prefix of the metric name. | ||
* This prefix is used to categorize the metrics and typically represents a specific aspect or feature of Flint. | ||
* | ||
* @return the metric name prefix. | ||
*/ | ||
String metricNamePrefix(); | ||
} |
26 changes: 26 additions & 0 deletions
26
flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.apache.spark.metrics.source | ||
|
||
import com.codahale.metrics.MetricRegistry | ||
import org.opensearch.flint.core.metrics.aop.MetricConstants.FLINT_METRIC_SOURCE_NAME; | ||
|
||
/** | ||
* A metric source class for Flint, extending the base `Source` class in Spark. This class is | ||
* responsible for creating and managing a MetricRegistry instance. | ||
* | ||
* @param sourceName | ||
* the name of the source, passed to the base class. | ||
*/ | ||
class FlintMetricSource(override val sourceName: String = FLINT_METRIC_SOURCE_NAME) | ||
extends Source { | ||
|
||
/** | ||
* The MetricRegistry instance associated with this source. This registry is used to create and | ||
* manage various metrics. | ||
*/ | ||
override val metricRegistry: MetricRegistry = new MetricRegistry | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.