Skip to content

Commit

Permalink
[POC]Flint metrics AOP
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 16, 2024
1 parent 3528b66 commit 08504ad
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 7 deletions.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
import Dependencies._
import com.lightbend.sbt.SbtAspectj

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
Expand Down Expand Up @@ -48,6 +49,7 @@ lazy val root = (project in file("."))
.settings(name := "flint", publish / skip := true)

lazy val flintCore = (project in file("flint-core"))
.enablePlugins(SbtAspectj)
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
Expand Down Expand Up @@ -75,7 +77,8 @@ lazy val flintCore = (project in file("flint-core"))
"net.aichler" % "jupiter-interface" % "0.11.1" % Test
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)
publish / skip := true
)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.aop;

/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
* This constant is used as a part of metric names to categorize and identify metrics related to read operations.
*/
public static final String OS_READ_METRIC_PREFIX = "opensearch.read";

/**
* The prefix for all write-related metrics in OpenSearch.
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_METRIC_PREFIX = "opensearch.write";

public static final String FLINT_METRIC_SOURCE_NAME = "FlintMetricSource";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.aop;

import com.codahale.metrics.Counter;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.opensearch.OpenSearchException;
import scala.collection.Seq;

import static org.opensearch.flint.core.metrics.aop.MetricConstants.FLINT_METRIC_SOURCE_NAME;

/**
* Aspect for logging metrics based on the annotated methods in Flint components.
* It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics}
* to log successful executions and exceptions as metrics.
*/
@Aspect
public class MetricsAspect {

/**
* Pointcut to match methods annotated with {@link PublishMetrics}.
*
* @param publishMetricsAnnotation the PublishMetrics annotation
*/
@Pointcut("@annotation(publishMetricsAnnotation)")
public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {}

/**
* After returning advice that logs successful method executions.
* This method is invoked after a method annotated with {@link PublishMetrics} successfully returns.
* It publishes a success metric with a standard status code of 200.
*
* @param publishMetricsAnnotation the PublishMetrics annotation
* @return the full metric name for the successful execution
*/
@AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation")
public String logSuccess(PublishMetrics publishMetricsAnnotation) {
int statusCode = 200; // Assume success with a status code of 200
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
return publishStatusMetrics(metricNamePrefix, statusCode);
}

/**
* After throwing advice that logs exceptions thrown by methods.
* This method is invoked when a method annotated with {@link PublishMetrics} throws an exception.
* It extracts the status code from the OpenSearchException and publishes a corresponding metric.
*
* @param ex the exception thrown by the method
* @param publishMetricsAnnotation the PublishMetrics annotation
* @return the full metric name for the exception, or null if the exception is not an OpenSearchException
*/
@AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation")
public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) {
OpenSearchException openSearchException = extractOpenSearchException(ex);
if (openSearchException != null) {
int statusCode = openSearchException.status().getStatus();
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
return publishStatusMetrics(metricNamePrefix, statusCode);
}
return null;
}

/**
* Extracts an OpenSearchException from the given Throwable.
* This method checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}


private String publishStatusMetrics(String metricNamePrefix, int statusCode) {
// TODO: Refactor this impl
String metricName = null;
if (SparkEnv.get() != null) {
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource();
metricName = constructMetricName(metricNamePrefix, statusCode);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
counter.inc();
}
return metricName;
}

private String constructMetricName(String metricNamePrefix, int statusCode) {
String metricSuffix = null;

if (statusCode == 200) {
metricSuffix = "2xx.count";
} else if (statusCode == 403) {
metricSuffix = "403.count";
} else if (statusCode >= 500) {
metricSuffix = "5xx.count";
} else if (statusCode >= 400) {
metricSuffix = "4xx.count";
}

return metricNamePrefix + "." + metricSuffix;
}


private static FlintMetricSource getOrInitFlintMetricSource() {
Seq<Source> metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FLINT_METRIC_SOURCE_NAME);

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource(FLINT_METRIC_SOURCE_NAME);
SparkEnv.get().metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.flint.core.metrics.aop;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


/**
* This annotation is used to mark methods or types that publish metrics.
* It is retained at runtime, allowing the application to utilize reflection to
* discover and handle these metrics accordingly. The annotation can be applied
* to both methods and types (classes or interfaces).
*
* @Retention(RetentionPolicy.RUNTIME) - Indicates that the annotation is available at runtime for reflection.
* @Target({ElementType.METHOD, ElementType.TYPE}) - Specifies that this annotation can be applied to methods or types.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface PublishMetrics {

/**
* Defines the prefix of the metric name.
* This prefix is used to categorize the metrics and typically represents a specific aspect or feature of Flint.
*
* @return the metric name prefix.
*/
String metricNamePrefix();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry

class FlintMetricSource(override val sourceName: String) extends Source {
override val metricRegistry: MetricRegistry = new MetricRegistry
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -127,6 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
createIndex(indexName, metadata.getContent(), metadata.indexSettings());
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -143,6 +146,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -154,6 +158,7 @@ public boolean exists(String indexName) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
Expand All @@ -172,6 +177,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -188,6 +194,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;

/**
* Flint metadata log in OpenSearch store. For now use single doc instead of maintaining history
Expand Down Expand Up @@ -75,6 +77,7 @@ public FlintMetadataLogEntry add(FlintMetadataLogEntry logEntry) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public Optional<FlintMetadataLogEntry> getLatest() {
LOG.info("Fetching latest log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
Expand All @@ -100,6 +103,7 @@ public Optional<FlintMetadataLogEntry> getLatest() {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void purge() {
LOG.info("Purging log entry with id " + latestId);
try (RestHighLevelClient client = flintClient.createClient()) {
Expand All @@ -113,6 +117,7 @@ public void purge() {
}
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Creating log entry " + logEntry);
// Assign doc ID here
Expand All @@ -136,6 +141,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
RequestOptions.DEFAULT));
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
LOG.info("Updating log entry " + logEntry);
return writeLogEntry(logEntry,
Expand Down Expand Up @@ -172,6 +178,7 @@ private FlintMetadataLogEntry writeLogEntry(
}
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
private boolean exists() {
LOG.info("Checking if Flint index exists " + metaLogIndexName);
try (RestHighLevelClient client = flintClient.createClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -44,6 +46,7 @@ public OpenSearchScrollReader(RestHighLevelClient client, String indexName, Sear
/**
* search.
*/
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
Optional<SearchResponse> search(SearchRequest request) throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
request.scroll(scrollDuration);
Expand All @@ -66,6 +69,7 @@ Optional<SearchResponse> search(SearchRequest request) throws IOException {
/**
* clean the scroll context.
*/
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
void clean() throws IOException {
try {
if (!Strings.isNullOrEmpty(scrollId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;

import java.io.IOException;
import java.util.logging.Level;
Expand All @@ -25,6 +27,7 @@ public OpenSearchUpdater(String indexName, FlintClient flintClient) {
this.flintClient = flintClient;
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void upsert(String id, String doc) {
// we might need to keep the updater for a long time. Reusing the client may not work as the temporary
// credentials may expire.
Expand All @@ -46,6 +49,7 @@ public void upsert(String id, String doc) {
}
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Expand All @@ -62,6 +66,7 @@ public void update(String id, String doc) {
}
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Expand All @@ -80,6 +85,7 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
}
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException {
LOG.info("Checking if index exists " + indexName);
if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
Expand Down
Loading

0 comments on commit 08504ad

Please sign in to comment.