Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Sep 18, 2023
1 parent a7af359 commit 4976df1
Show file tree
Hide file tree
Showing 45 changed files with 1,947 additions and 60 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information without any filtering.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,17 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
public DataSourceMetadata getDataSourceMetadata(String dataSourceName) {
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
removeAuthInfo(dataSourceMetadata);
return dataSourceMetadata;
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}

@Override
Expand Down Expand Up @@ -146,11 +134,20 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadataFromName(String dataSourceName) {
@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return DataSourceMetadata.defaultOpenSearchDataSourceMetadata();

} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
Optional<DataSourceMetadata> dataSourceMetadataOptional =
this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
return dataSourceMetadataOptional.get();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ void testRemovalOfAuthorizationInfo() {
@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
IllegalArgumentException exception =
DataSourceNotFoundException exception =
assertThrows(
IllegalArgumentException.class,
DataSourceNotFoundException.class,
() -> dataSourceService.getDataSourceMetadata("testDS"));
assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage());
assertEquals("DataSource with name testDS doesn't exist.", exception.getMessage());
}

@Test
Expand All @@ -385,4 +385,28 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
}

@Test
void testGetRawDataSourceMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "basicauth");
properties.put("prometheus.auth.username", "username");
properties.put("prometheus.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}
}
41 changes: 41 additions & 0 deletions docs/user/interfaces/jobinterface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
.. highlight:: sh

========
Endpoint
========

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

For supporting s3Glue and Cloudwatch datasources, we have introduced a new execution engine on top of Spark.
All the queries to be executed on spark execution engine can only be submitted via Job APIs. Below sections will list all the new APIs introduced.


Configuration required for Job APIs
======================================
Currently, we support AWS emrserveless as the only SPARK execution engine. So the corresponding details of execution engine should be configured under .
We make use of default aws credentails chain to make calls to the emr serverless application.



Job Execution APIs
======================================






Authorization of Job APIs
==============================================




1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ dependencies {

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.5.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.5.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
}

Expand Down
58 changes: 57 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@

package org.opensearch.sql.plugin;

import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,6 +89,15 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobExecutorService;
import org.opensearch.sql.spark.jobs.JobExecutorServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
Expand Down Expand Up @@ -110,6 +125,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobExecutorService jobExecutorService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -202,6 +218,16 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
if (this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG) == null) {
LOGGER.warn(
String.format(
"Job APIs are disabled as %s is not configured in cluster settings. "
+ "Please configure and restart the domain to enable JobAPIs",
SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue()));
this.jobExecutorService = new JobExecutorServiceImpl();
} else {
this.jobExecutorService = createJobExecutorService();
}

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
Expand All @@ -213,7 +239,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService);
return ImmutableList.of(dataSourceService, jobExecutorService);
}

@Override
Expand Down Expand Up @@ -270,4 +296,34 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

private JobExecutorService createJobExecutorService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
emrServerlessClient, this.dataSourceService, jobExecutionResponseReader);
return new JobExecutorServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
});
}
}
9 changes: 9 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";

// Permissions for aws emr servless sdk
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
10 changes: 8 additions & 2 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ repositories {

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down Expand Up @@ -56,7 +59,10 @@ jacocoTestCoverageVerification {
excludes = [
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*'
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.jobs.model.*',
'org.opensearch.sql.spark.jobs.config.*',
'org.opensearch.sql.spark.jobs.execution.*'
]
limit {
counter = 'LINE'
Expand Down
Loading

0 comments on commit 4976df1

Please sign in to comment.