Skip to content

Commit

Permalink
[POC]Flint metrics AOP
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Jan 11, 2024
1 parent 3528b66 commit 9416193
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 1 deletion.
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
import Dependencies._
import com.lightbend.sbt.SbtAspectj

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
Expand Down Expand Up @@ -48,6 +49,7 @@ lazy val root = (project in file("."))
.settings(name := "flint", publish / skip := true)

lazy val flintCore = (project in file("flint-core"))
.enablePlugins(SbtAspectj)
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
Expand Down Expand Up @@ -75,7 +77,8 @@ lazy val flintCore = (project in file("flint-core"))
"net.aichler" % "jupiter-interface" % "0.11.1" % Test
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true)
publish / skip := true
)

lazy val pplSparkIntegration = (project in file("ppl-spark-integration"))
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.aop;

public class MetricConstants {
public static final String OS_READ_METRIC = "opensearch.read";
public static final String OS_WRITE_METRIC = "opensearch.write";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.aop;

import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.opensearch.OpenSearchException;

@Aspect
public class MetricsAspect {
@Pointcut("@annotation(publishMetricsAnnotation)")
public void annotatedWithPublishMetrics(PublishMetrics publishMetricsAnnotation) {}

@AfterReturning(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", argNames = "publishMetricsAnnotation")
public String logSuccess(PublishMetrics publishMetricsAnnotation) {
int statusCode = 200; // Assume success with a status code of 200
String metricName = publishMetricsAnnotation.metricName();
return publishStatusMetrics(metricName, statusCode);
}

@AfterThrowing(pointcut = "annotatedWithPublishMetrics(publishMetricsAnnotation)", throwing = "ex", argNames = "ex,publishMetricsAnnotation")
public String logException(Throwable ex, PublishMetrics publishMetricsAnnotation) {
if (ex instanceof OpenSearchException) {
OpenSearchException openSearchException = (OpenSearchException) ex;
int statusCode = openSearchException.status().getStatus();
String metricName = publishMetricsAnnotation.metricName();
return publishStatusMetrics(metricName, statusCode);
}
return null;
}

private String publishStatusMetrics(String metricName, int statusCode) {
String metricSuffix = null;

if (statusCode == 200) {
metricSuffix = "2xx.count";
} else if (statusCode == 403) {
metricSuffix = "403.count";
} else if (statusCode >= 500) {
metricSuffix = "5xx.count";
} else if (statusCode >= 400) {
metricSuffix = "4xx.count";
}

String fullMetricName = metricName + "." + metricSuffix;

// TODO: Add metrics to the source
return fullMetricName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.flint.core.metrics.aop;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface PublishMetrics {
String metricName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
Expand Down Expand Up @@ -127,6 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
createIndex(indexName, metadata.getContent(), metadata.indexSettings());
}

@PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC)
protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -143,6 +146,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
}

@Override
@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -154,6 +158,7 @@ public boolean exists(String indexName) {
}

@Override
@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
Expand All @@ -172,6 +177,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
}

@Override
@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand All @@ -188,6 +194,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
}

@Override
@PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC)
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = sanitizeIndexName(indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -44,6 +46,7 @@ public OpenSearchScrollReader(RestHighLevelClient client, String indexName, Sear
/**
* search.
*/
@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
Optional<SearchResponse> search(SearchRequest request) throws IOException {
if (Strings.isNullOrEmpty(scrollId)) {
request.scroll(scrollDuration);
Expand All @@ -66,6 +69,7 @@ Optional<SearchResponse> search(SearchRequest request) throws IOException {
/**
* clean the scroll context.
*/
@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
void clean() throws IOException {
try {
if (!Strings.isNullOrEmpty(scrollId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;

import java.io.IOException;
import java.util.logging.Level;
Expand All @@ -25,6 +27,7 @@ public OpenSearchUpdater(String indexName, FlintClient flintClient) {
this.flintClient = flintClient;
}

@PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC)
public void upsert(String id, String doc) {
// we might need to keep the updater for a long time. Reusing the client may not work as the temporary
// credentials may expire.
Expand All @@ -46,6 +49,7 @@ public void upsert(String id, String doc) {
}
}

@PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC)
public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Expand All @@ -62,6 +66,7 @@ public void update(String id, String doc) {
}
}

@PublishMetrics(metricName= MetricConstants.OS_WRITE_METRIC)
public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
assertIndexExist(client, indexName);
Expand All @@ -80,6 +85,7 @@ public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
}
}

@PublishMetrics(metricName=MetricConstants.OS_READ_METRIC)
private void assertIndexExist(RestHighLevelClient client, String indexName) throws IOException {
LOG.info("Checking if index exists " + indexName);
if (!client.indices().exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.metrics.aop.MetricConstants;
import org.opensearch.flint.core.metrics.aop.PublishMetrics;
import org.opensearch.rest.RestStatus;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -52,6 +54,7 @@ public OpenSearchWriter(RestHighLevelClient client, String indexName, String ref
* Flush the data in buffer.
* Todo. StringWriter is not efficient. it will copy the cbuf when create bytes.
*/
@PublishMetrics(metricName=MetricConstants.OS_WRITE_METRIC)
@Override public void flush() {
try {
if (sb.length() > 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.flint.core.metrics.aop;

import org.aspectj.lang.JoinPoint;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.OpenSearchException;
import org.opensearch.rest.RestStatus;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;

public class MetricsAspectTest {

@InjectMocks
private MetricsAspect metricsAspect;

@Mock
private JoinPoint joinPoint;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testLogSuccess() {
PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class);
when(publishMetricsAnnotation.metricName()).thenReturn("testMetric");

assert ("testMetric.2xx.count".equals(metricsAspect.logSuccess(publishMetricsAnnotation)));
}

@Test
public void testLogExceptionWithOpenSearchException() {
PublishMetrics publishMetricsAnnotation = mock(PublishMetrics.class);
when(publishMetricsAnnotation.metricName()).thenReturn("testMetric");

OpenSearchException exception = Mockito.mock(OpenSearchException.class);
Mockito.when(exception.getMessage()).thenReturn("Error");
Mockito.when(exception.getCause()).thenReturn(new RuntimeException());
Mockito.when(exception.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);

assertEquals("testMetric.5xx.count", metricsAspect.logException(exception, publishMetricsAnnotation));
}
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0")
addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")
Expand Down

0 comments on commit 9416193

Please sign in to comment.