Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 18, 2023
1 parent a7af359 commit cbb0c14
Show file tree
Hide file tree
Showing 37 changed files with 952 additions and 54 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void execute(
/** Data class that encapsulates ExprValue. */
@Data
class QueryResponse {
private String status = "COMPLETED";
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
return dataSourceMetadataOptional.get();
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
return dataSourceMetadataOptional.get();
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
Expand Down
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
49 changes: 48 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
package org.opensearch.sql.plugin;

import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,6 +89,15 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobExecutorService;
import org.opensearch.sql.spark.jobs.JobExecutorServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.SparkResponseReader;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
Expand Down Expand Up @@ -110,6 +125,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobExecutorService jobExecutorService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -202,6 +218,7 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
this.jobExecutorService = createJobManagementService();

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
Expand All @@ -213,7 +230,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService);
return ImmutableList.of(dataSourceService, jobExecutorService);
}

@Override
Expand Down Expand Up @@ -270,4 +287,34 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

private JobExecutorService createJobManagementService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(emrServerlessClient, this.dataSourceService);
return new JobExecutorServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(
awsemrServerless, new SparkResponseReader(client, null, STEP_ID_FIELD));
});
}
}
9 changes: 9 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";

// Permissions for aws emr servless sdk
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.executor.ExecutionEngine;
Expand All @@ -25,6 +26,8 @@
@RequiredArgsConstructor
public class QueryResult implements Iterable<Object[]> {

@Setter @Getter private String status;

@Getter private final ExecutionEngine.Schema schema;

/** Results which are collection of expression. */
Expand Down
5 changes: 4 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ repositories {

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,42 @@
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;
import org.opensearch.sql.spark.response.SparkResponseReader;

public class EmrClientImpl implements SparkClient {
private final AmazonElasticMapReduce emr;
private final String emrCluster;
private final FlintHelper flint;
private final String sparkApplicationJar;
private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
private SparkResponse sparkResponse;
private SparkResponseReader sparkResponseReader;

/**
* Constructor for EMR Client Implementation.
*
* @param emr EMR helper
* @param flint Opensearch args for flint integration jar
* @param sparkResponse Response object to help with retrieving results from Opensearch index
* @param sparkResponseReader Response object to help with retrieving results from Opensearch
* index
*/
public EmrClientImpl(
AmazonElasticMapReduce emr,
String emrCluster,
FlintHelper flint,
SparkResponse sparkResponse,
SparkResponseReader sparkResponseReader,
String sparkApplicationJar) {
this.emr = emr;
this.emrCluster = emrCluster;
this.flint = flint;
this.sparkResponse = sparkResponse;
this.sparkResponseReader = sparkResponseReader;
this.sparkApplicationJar =
sparkApplicationJar == null ? SPARK_SQL_APPLICATION_JAR : sparkApplicationJar;
}

@Override
public JSONObject sql(String query) throws IOException {
runEmrApplication(query);
return sparkResponse.getResultFromOpensearchIndex();
return sparkResponseReader.getResultFromOpensearchIndex();
}

@VisibleForTesting
Expand Down Expand Up @@ -98,7 +99,7 @@ void runEmrApplication(String query) {
new DescribeStepRequest().withClusterId(emrCluster).withStepId(stepId);

waitForStepExecution(stepRequest);
sparkResponse.setValue(stepId);
sparkResponseReader.setValue(stepId);
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.sql.spark.client;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import org.opensearch.sql.spark.helper.FlintHelper;

public interface EmrServerlessClient {

String startJobRun(
String applicationId,
String query,
String datasourceRoleArn,
String executionRoleArn,
String datasourceName,
FlintHelper flintHelper);

GetJobRunResult getJobResult(String applicationId, String jobId);
}
Loading

0 comments on commit cbb0c14

Please sign in to comment.