Skip to content

Commit

Permalink
Test post compile weaving
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 19, 2024
1 parent d6c1285 commit 2bbeb1f
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 262 deletions.
Original file line number Diff line number Diff line change
@@ -1,129 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.aop;

import com.codahale.metrics.Counter;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.opensearch.OpenSearchException;
import scala.collection.Seq;

/**
* Aspect for logging metrics based on the annotated methods in Flint components.
* It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics}
* to log successful executions and exceptions as metrics.
*/
@Aspect
public class MetricsAspect {

/**
* Pointcut to match methods annotated with {@link PublishMetrics}.
*
* @param publishMetricsAnnotation the PublishMetrics annotation
*/
@Pointcut("@annotation(publishMetricsAnnotation)")
public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {}

/**
* After returning advice that logs successful method executions.
* This method is invoked after a method annotated with {@link PublishMetrics} successfully returns.
* It publishes a success metric with a standard status code of 200.
*
* @param publishMetricsAnnotation the PublishMetrics annotation
* @return the full metric name for the successful execution
*/
@AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation")
public String logSuccess(PublishMetrics publishMetricsAnnotation) {
int statusCode = 200; // Assume success with a status code of 200
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
return publishStatusMetrics(metricNamePrefix, statusCode);
}

/**
* After throwing advice that logs exceptions thrown by methods.
* This method is invoked when a method annotated with {@link PublishMetrics} throws an exception.
* It extracts the status code from the OpenSearchException and publishes a corresponding metric.
*
* @param ex the exception thrown by the method
* @param publishMetricsAnnotation the PublishMetrics annotation
* @return the full metric name for the exception, or null if the exception is not an OpenSearchException
*/
@AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation")
public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) {
OpenSearchException openSearchException = extractOpenSearchException(ex);
if (openSearchException != null) {
int statusCode = openSearchException.status().getStatus();
String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
return publishStatusMetrics(metricNamePrefix, statusCode);
}
return null;
}

/**
* Extracts an OpenSearchException from the given Throwable.
* This method checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}


private String publishStatusMetrics(String metricNamePrefix, int statusCode) {
// TODO: Refactor this impl
String metricName = null;
if (SparkEnv.get() != null) {
FlintMetricSource flintMetricSource = getOrInitFlintMetricSource();
metricName = constructMetricName(metricNamePrefix, statusCode);
Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
if (counter == null) {
counter = flintMetricSource.metricRegistry().counter(metricName);
}
counter.inc();
}
return metricName;
}

private String constructMetricName(String metricNamePrefix, int statusCode) {
String metricSuffix = null;

if (statusCode == 200) {
metricSuffix = "2xx.count";
} else if (statusCode == 403) {
metricSuffix = "403.count";
} else if (statusCode >= 500) {
metricSuffix = "5xx.count";
} else if (statusCode >= 400) {
metricSuffix = "4xx.count";
}

return metricNamePrefix + "." + metricSuffix;
}


private static FlintMetricSource getOrInitFlintMetricSource() {
Seq<Source> metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
SparkEnv.get().metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
}
}
///*
// * Copyright OpenSearch Contributors
// * SPDX-License-Identifier: Apache-2.0
// */
//
//package org.opensearch.flint.core.metrics.aop;
//
//import com.codahale.metrics.Counter;
//import org.apache.spark.SparkEnv;
//import org.apache.spark.metrics.source.FlintMetricSource;
//import org.apache.spark.metrics.source.Source;
//import org.aspectj.lang.annotation.AfterReturning;
//import org.aspectj.lang.annotation.AfterThrowing;
//import org.aspectj.lang.annotation.Aspect;
//import org.aspectj.lang.annotation.Pointcut;
//import org.opensearch.OpenSearchException;
//import scala.collection.Seq;
//
///**
// * Aspect for logging metrics based on the annotated methods in Flint components.
// * It utilizes AspectJ for intercepting method calls that are annotated with {@link PublishMetrics}
// * to log successful executions and exceptions as metrics.
// */
//@Aspect
//public class MetricsAspect {
//
// /**
// * Pointcut to match methods annotated with {@link PublishMetrics}.
// *
// * @param publishMetricsAnnotation the PublishMetrics annotation
// */
// @Pointcut("@annotation(publishMetricsAnnotation)")
// public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {}
//
// /**
// * After returning advice that logs successful method executions.
// * This method is invoked after a method annotated with {@link PublishMetrics} successfully returns.
// * It publishes a success metric with a standard status code of 200.
// *
// * @param publishMetricsAnnotation the PublishMetrics annotation
// * @return the full metric name for the successful execution
// */
// @AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation")
// public String logSuccess(PublishMetrics publishMetricsAnnotation) {
// int statusCode = 200; // Assume success with a status code of 200
// String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
// return publishStatusMetrics(metricNamePrefix, statusCode);
// }
//
// /**
// * After throwing advice that logs exceptions thrown by methods.
// * This method is invoked when a method annotated with {@link PublishMetrics} throws an exception.
// * It extracts the status code from the OpenSearchException and publishes a corresponding metric.
// *
// * @param ex the exception thrown by the method
// * @param publishMetricsAnnotation the PublishMetrics annotation
// * @return the full metric name for the exception, or null if the exception is not an OpenSearchException
// */
// @AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation")
// public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) {
// OpenSearchException openSearchException = extractOpenSearchException(ex);
// if (openSearchException != null) {
// int statusCode = openSearchException.status().getStatus();
// String metricNamePrefix = publishMetricsAnnotation.metricNamePrefix();
// return publishStatusMetrics(metricNamePrefix, statusCode);
// }
// return null;
// }
//
// /**
// * Extracts an OpenSearchException from the given Throwable.
// * This method checks if the Throwable is an instance of OpenSearchException or caused by one.
// *
// * @param ex the exception to be checked
// * @return the extracted OpenSearchException, or null if not found
// */
// private OpenSearchException extractOpenSearchException(Throwable ex) {
// if (ex instanceof OpenSearchException) {
// return (OpenSearchException) ex;
// } else if (ex.getCause() instanceof OpenSearchException) {
// return (OpenSearchException) ex.getCause();
// }
// return null;
// }
//
//
// private String publishStatusMetrics(String metricNamePrefix, int statusCode) {
// // TODO: Refactor this impl
// String metricName = null;
// if (SparkEnv.get() != null) {
// FlintMetricSource flintMetricSource = getOrInitFlintMetricSource();
// metricName = constructMetricName(metricNamePrefix, statusCode);
// Counter counter = flintMetricSource.metricRegistry().getCounters().get(metricName);
// if (counter == null) {
// counter = flintMetricSource.metricRegistry().counter(metricName);
// }
// counter.inc();
// }
// return metricName;
// }
//
// private String constructMetricName(String metricNamePrefix, int statusCode) {
// String metricSuffix = null;
//
// if (statusCode == 200) {
// metricSuffix = "2xx.count";
// } else if (statusCode == 403) {
// metricSuffix = "403.count";
// } else if (statusCode >= 500) {
// metricSuffix = "5xx.count";
// } else if (statusCode >= 400) {
// metricSuffix = "4xx.count";
// }
//
// return metricNamePrefix + "." + metricSuffix;
// }
//
//
// private static FlintMetricSource getOrInitFlintMetricSource() {
// Seq<Source> metricSourceSeq = SparkEnv.get().metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
//
// if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
// FlintMetricSource metricSource = new FlintMetricSource();
// SparkEnv.get().metricsSystem().registerSource(metricSource);
// return metricSource;
// }
// return (FlintMetricSource) metricSourceSeq.head();
// }
//}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.opensearch.flint.core.metrics.aop;

/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;

import java.util.logging.Logger;


@Aspect
public class RestClientMetricsAspect {

private static final Logger LOG = Logger.getLogger(RestClientMetricsAspect.class.getName());

@Pointcut("execution(* org.opensearch.client.RestHighLevelClient.*(..))")
public void onRestHighLevelClientMethod() {}

@Around("onRestHighLevelClientMethod()")
public Object aroundRestHighLevelClientMethod(ProceedingJoinPoint pjp) throws Throwable {
long startTime = System.currentTimeMillis();

try {
Object response = pjp.proceed();
long duration = System.currentTimeMillis() - startTime;
logMetric("success", duration);
return response;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
logMetric("failure", duration);
throw e;
}
}

private void logMetric(String status, long duration) {
LOG.info("Request " + status + ": " + duration + "ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -129,7 +127,6 @@ public void createIndex(String indexName, FlintMetadata metadata) {
createIndex(indexName, metadata.getContent(), metadata.indexSettings());
}

@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -146,7 +143,6 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -158,7 +154,6 @@ public boolean exists(String indexName) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
Expand All @@ -177,7 +172,6 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_READ_METRIC_PREFIX)
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -194,7 +188,6 @@ public FlintMetadata getIndexMetadata(String indexName) {
}

@Override
@PublishMetrics(metricNamePrefix=MetricConstants.OS_WRITE_METRIC_PREFIX)
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand Down
Loading

0 comments on commit 2bbeb1f

Please sign in to comment.