Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract metrics to reduce dependency to legacy #2747

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.metrics.MetricsService;

/** Implementation of {@link EMRServerlessClientFactory}. */
@RequiredArgsConstructor
public class EMRServerlessClientFactoryImpl implements EMRServerlessClientFactory {

private final SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
private final MetricsService metricsService;
private EMRServerlessClient emrServerlessClient;
private String region;

Expand Down Expand Up @@ -68,7 +70,7 @@ private EMRServerlessClient createEMRServerlessClient(String awsRegion) {
.withRegion(awsRegion)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
return new EmrServerlessClientImpl(awsemrServerless, metricsService);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_GET_JOB_RESULT_FAILURE_COUNT;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_START_JOB_REQUEST_FAILURE_COUNT;

import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.model.CancelJobRunRequest;
Expand All @@ -20,25 +23,23 @@
import com.amazonaws.services.emrserverless.model.ValidationException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.metrics.MetricsService;

@RequiredArgsConstructor
public class EmrServerlessClientImpl implements EMRServerlessClient {

private final AWSEMRServerless emrServerless;
private final MetricsService metricsService;
private static final Logger logger = LogManager.getLogger(EmrServerlessClientImpl.class);

private static final int MAX_JOB_NAME_LENGTH = 255;

public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
}

@Override
public String startJobRun(StartJobRequest startJobRequest) {
String resultIndex =
Expand Down Expand Up @@ -68,8 +69,7 @@ public String startJobRun(StartJobRequest startJobRequest) {
return emrServerless.startJobRun(request);
} catch (Throwable t) {
logger.error("Error while making start job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_START_JOB_REQUEST_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_START_JOB_REQUEST_FAILURE_COUNT);
if (t instanceof ValidationException) {
throw new IllegalArgumentException(
"The input fails to satisfy the constraints specified by AWS EMR"
Expand All @@ -94,8 +94,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
return emrServerless.getJobRun(request);
} catch (Throwable t) {
logger.error("Error while making get job run request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_GET_JOB_RESULT_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_GET_JOB_RESULT_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
});
Expand All @@ -119,8 +118,7 @@ public CancelJobRunResult cancelJobRun(
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
metricsService.incrementNumericalMetric(EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -25,6 +24,7 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -36,6 +36,7 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final EMRServerlessClient emrServerlessClient;
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;
protected final MetricsService metricsService;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -90,7 +91,7 @@ public DispatchQueryResponse submit(
false,
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import lombok.RequiredArgsConstructor;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand All @@ -32,6 +30,8 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -45,6 +45,7 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
private final SessionManager sessionManager;
private final JobExecutionResponseReader jobExecutionResponseReader;
private final LeaseManager leaseManager;
private final MetricsService metricsService;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
Expand Down Expand Up @@ -121,7 +122,7 @@ public DispatchQueryResponse submit(
dataSourceMetadata.getResultIndex(),
dataSourceMetadata.getName()),
context.getAsyncQueryRequestContext());
MetricUtils.incrementNumericalMetric(MetricName.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EmrMetrics.EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT);
}
session.submit(
new QueryRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

@RequiredArgsConstructor
Expand All @@ -24,28 +25,37 @@ public class QueryHandlerFactory {
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final MetricsService metricsService;

public RefreshQueryHandler getRefreshQueryHandler() {
return new RefreshQueryHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
leaseManager,
flintIndexOpFactory);
flintIndexOpFactory,
metricsService);
}

public StreamingQueryHandler getStreamingQueryHandler() {
return new StreamingQueryHandler(
emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager);
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
leaseManager,
metricsService);
}

public BatchQueryHandler getBatchQueryHandler() {
return new BatchQueryHandler(
emrServerlessClientFactory.getClient(), jobExecutionResponseReader, leaseManager);
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
leaseManager,
metricsService);
}

public InteractiveQueryHandler getInteractiveQueryHandler() {
return new InteractiveQueryHandler(sessionManager, jobExecutionResponseReader, leaseManager);
return new InteractiveQueryHandler(
sessionManager, jobExecutionResponseReader, leaseManager, metricsService);
}

public IndexDMLHandler getIndexDMLHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -35,8 +36,9 @@ public RefreshQueryHandler(
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataService flintIndexMetadataService,
LeaseManager leaseManager,
FlintIndexOpFactory flintIndexOpFactory) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
FlintIndexOpFactory flintIndexOpFactory,
MetricsService metricsService) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOpFactory = flintIndexOpFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.INDEX_TAG_KEY;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY;
import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -23,6 +22,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand All @@ -34,8 +34,9 @@ public class StreamingQueryHandler extends BatchQueryHandler {
public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
LeaseManager leaseManager,
MetricsService metricsService) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager, metricsService);
}

@Override
Expand Down Expand Up @@ -81,7 +82,7 @@ public DispatchQueryResponse submit(
indexQueryDetails.getFlintIndexOptions().autoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT);
return DispatchQueryResponse.builder()
.queryId(context.getQueryId())
.jobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.metrics;

public enum EmrMetrics {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT,
EMR_GET_JOB_RESULT_FAILURE_COUNT,
EMR_START_JOB_REQUEST_FAILURE_COUNT,
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT,
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT,
EMR_BATCH_QUERY_JOBS_CREATION_COUNT;
}
Comment on lines +8 to +15
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last three metrics relate specifically to the dispatcher, not directly to EMR APIs. It may be beneficial to group these separately from the EMR API failure metrics and consider removing the 'EMR_' prefix to avoid confusion and maintain clarity in naming conventions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those metrics are already used ones. Do you know if it is OK to change the metrics name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets only do refactoring..changing names we can handle in a different PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.metrics;

/** Interface to abstract the emit of metrics */
public interface MetricsService {
void incrementNumericalMetric(EmrMetrics emrMetrics);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.constants.TestConstants;
import org.opensearch.sql.spark.metrics.MetricsService;

@ExtendWith(MockitoExtension.class)
public class EMRServerlessClientFactoryImplTest {

@Mock private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
@Mock private MetricsService metricsService;

@Test
public void testGetClient() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(createSparkExecutionEngineConfig());
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient();
Assertions.assertNotNull(emrserverlessClient);
}
Expand All @@ -38,7 +40,7 @@ public void testGetClientWithChangeInSetting() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(sparkExecutionEngineConfig);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
EMRServerlessClient emrserverlessClient = emrServerlessClientFactory.getClient();
Assertions.assertNotNull(emrserverlessClient);

Expand All @@ -57,7 +59,7 @@ public void testGetClientWithChangeInSetting() {
public void testGetClientWithException() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any())).thenReturn(null);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, emrServerlessClientFactory::getClient);
Expand All @@ -74,7 +76,7 @@ public void testGetClientWithExceptionWithNullRegion() {
when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(any()))
.thenReturn(sparkExecutionEngineConfig);
EMRServerlessClientFactory emrServerlessClientFactory =
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier);
new EMRServerlessClientFactoryImpl(sparkExecutionEngineConfigSupplier, metricsService);
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class, emrServerlessClientFactory::getClient);
Expand Down
Loading
Loading