Skip to content

Commit

Permalink
Abstract metrics to reduce dependency to legacy (#2747)
Browse files Browse the repository at this point in the history
* Abstract metrics to reduce dependency to legacy

Signed-off-by: Tomoyuki Morita <[email protected]>

* Add comment

Signed-off-by: Tomoyuki Morita <[email protected]>

* Fix style

Signed-off-by: Tomoyuki Morita <[email protected]>

---------

Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Jun 21, 2024
1 parent 07e52d9 commit ef2cef3
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.metrics.MetricsService;

/** Implementation of {@link EMRServerlessClientFactory}. */
@RequiredArgsConstructor
public class EMRServerlessClientFactoryImpl implements EMRServerlessClientFactory {

private final SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
private final MetricsService metricsService;
private EMRServerlessClient emrServerlessClient;
private String region;

Expand Down Expand Up @@ -68,7 +70,7 @@ private EMRServerlessClient createEMRServerlessClient(String awsRegion) {
.withRegion(awsRegion)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
return new EmrServerlessClientImpl(awsemrServerless, metricsService);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_GET_JOB_RESULT_FAILURE_COUNT;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_START_JOB_REQUEST_FAILURE_COUNT;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
Expand All @@ -20,25 +23,23 @@
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.metrics.MetricsService;

@RequiredArgsConstructor
public class EmrServerlessClientImpl implements EMRServerlessClient {

private final AWSEMRServerless emrServerless;
private final MetricsService metricsService;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

private static final int MAX_JOB_NAME_LENGTH = 255;

public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}

@Override
public String startJobRun(StartJobRequest startJobRequest) {
String resultIndex =
Expand Down Expand Up @@ -68,8 +69,7 @@ public String startJobRun(StartJobRequest startJobRequest) {
return emrServerless.startJobRun(request);
} catch (Throwable t) {
logger.error("Error while making start job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_START_JOB_REQUEST_FAILURE_COUNT);
if (t instanceof ValidationException) {
throw new IllegalArgumentException(
"The input fails to satisfy the constraints specified by AWS EMR"
Expand All @@ -94,8 +94,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
return emrServerless.getJobRun(request);
} catch (Throwable t) {
logger.error("Error while making get job run request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_GET_JOB_RESULT_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
Expand All @@ -119,8 +118,7 @@ public CancelJobRunResult cancelJobRun(
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -25,6 +24,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -36,6 +36,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final EMRServerlessClient emrServerlessClient;
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
protected final MetricsService metricsService;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -90,7 +91,7 @@ public DispatchQueryResponse submit(
false,
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import lombok.RequiredArgsConstructor;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand All @@ -32,6 +30,8 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -45,6 +45,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
private final SessionManager sessionManager;
private final JobExecutionResponseReader jobExecutionResponseReader;
private final LeaseManager leaseManager;
private final MetricsService metricsService;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -121,7 +122,7 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex(),
dataSourceMetadata.getName()),
context.getAsyncQueryRequestContext());
MetricUtils.incrementNumericalMetric(MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EmrMetrics.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT);
}
session.submit(
new QueryRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

@RequiredArgsConstructor
Expand All @@ -24,28 +25,37 @@ public class QueryHandlerFactory {
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final MetricsService metricsService;

public RefreshQueryHandler getRefreshQueryHandler() {
return new RefreshQueryHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
leaseManager,
flintIndexOpFactory);
flintIndexOpFactory,
metricsService);
}

public StreamingQueryHandler getStreamingQueryHandler() {
return new StreamingQueryHandler(
emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager);
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
leaseManager,
metricsService);
}

public BatchQueryHandler getBatchQueryHandler() {
return new BatchQueryHandler(
emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager);
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
leaseManager,
metricsService);
}

public InteractiveQueryHandler getInteractiveQueryHandler() {
return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager);
return new InteractiveQueryHandler(
sessionManager, jobExecutionResponseReader, leaseManager, metricsService);
}

public IndexDMLHandler getIndexDMLHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -35,8 +36,9 @@ public RefreshQueryHandler(
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataService flintIndexMetadataService,
LeaseManager leaseManager,
FlintIndexOpFactory flintIndexOpFactory) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
FlintIndexOpFactory flintIndexOpFactory,
MetricsService metricsService) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOpFactory = flintIndexOpFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.INDEX_TAG_KEY;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -23,6 +22,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -34,8 +34,9 @@ public class StreamingQueryHandler extends BatchQueryHandler {
public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
LeaseManager leaseManager,
MetricsService metricsService) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService);
}

@Override
Expand Down Expand Up @@ -81,7 +82,7 @@ public DispatchQueryResponse submit(
indexQueryDetails.getFlintIndexOptions().autoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.metrics;

public enum EmrMetrics {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT,
EMR_GET_JOB_RESULT_FAILURE_COUNT,
EMR_START_JOB_REQUEST_FAILURE_COUNT,
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT,
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT,
EMR_BATCH_QUERY_JOBS_CREATION_COUNT;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.metrics;

/** Interface to abstract the emit of metrics */
public interface MetricsService {
void incrementNumericalMetric(EmrMetrics emrMetrics);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.constants.TestConstants;
import org.opensearch.sql.spark.metrics.MetricsService;

@ExtendWith(MockitoExtension.class)
public class EMRServerlessClientFactoryImplTest {

@Mock private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
@Mock private MetricsService metricsService;

@Test
public void testGetClient() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(createSparkExecutionEngineConfig());
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient();
Assertions.assertNotNull(emrserverlessClient);
}
Expand All @@ -38,7 +40,7 @@ public void testGetClientWithChangeInSetting() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(sparkExecutionEngineConfig);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient();
Assertions.assertNotNull(emrserverlessClient);

Expand All @@ -57,7 +59,7 @@ public void testGetClientWithChangeInSetting() {
public void testGetClientWithException() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())).thenReturn(null);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, emrServerlessClientFactory::getClient);
Expand All @@ -74,7 +76,7 @@ public void testGetClientWithExceptionWithNullRegion() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(sparkExecutionEngineConfig);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, emrServerlessClientFactory::getClient);
Expand Down
Loading

0 comments on commit ef2cef3

Please sign in to comment.