Skip to content

Commit

Permalink
Create Job API (#2070)
Browse files Browse the repository at this point in the history
* Create Job API

Signed-off-by: Vamsi Manohar <[email protected]>

* Refactor to Async Query API

Signed-off-by: Vamsi Manohar <[email protected]>

---------

Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Sep 22, 2023
1 parent cda01e9 commit 71f4155
Show file tree
Hide file tree
Showing 66 changed files with 2,574 additions and 594 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information without any filtering.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,17 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
public DataSourceMetadata getDataSourceMetadata(String dataSourceName) {
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
removeAuthInfo(dataSourceMetadata);
return dataSourceMetadata;
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}

@Override
Expand Down Expand Up @@ -146,11 +134,20 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadataFromName(String dataSourceName) {
@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return DataSourceMetadata.defaultOpenSearchDataSourceMetadata();

} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
Optional<DataSourceMetadata> dataSourceMetadataOptional =
this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
return dataSourceMetadataOptional.get();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ void testRemovalOfAuthorizationInfo() {
@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
IllegalArgumentException exception =
DataSourceNotFoundException exception =
assertThrows(
IllegalArgumentException.class,
DataSourceNotFoundException.class,
() -> dataSourceService.getDataSourceMetadata("testDS"));
assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage());
assertEquals("DataSource with name testDS doesn't exist.", exception.getMessage());
}

@Test
Expand All @@ -385,4 +385,28 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
}

@Test
void testGetRawDataSourceMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "basicauth");
properties.put("prometheus.auth.username", "username");
properties.put("prometheus.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}
}
108 changes: 108 additions & 0 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
.. highlight:: sh

=======================
Async Query Interface Endpoints
=======================

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

For supporting `S3Glue <../ppl/admin/connector/s3glue_connector.rst>`_ and Cloudwatch datasources connectors, we have introduced a new execution engine on top of Spark.
All the queries to be executed on spark execution engine can only be submitted via Async Query APIs. Below sections will list all the new APIs introduced.


Configuration required for Async Query APIs
======================================
Currently, we only support AWS emr serverless as SPARK execution engine. The details of execution engine should be configured under
``plugins.query.executionengine.spark.config`` cluster setting. The value should be a stringified json comprising of ``applicationId``, ``executionRoleARN``,``region``.
Sample Setting Value ::

plugins.query.executionengine.spark.config: '{"applicationId":"xxxxx", "executionRoleARN":"arn:aws:iam::***********:role/emr-job-execution-role","region":"eu-west-1"}'


If this setting is not configured during bootstrap, Async Query APIs will be disabled and it requires a cluster restart to enable them back again.
We make use of default aws credentials chain to make calls to the emr serverless application and also make sure the default credentials
have pass role permissions for emr-job-execution-role mentioned in the engine configuration.



Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_query/_async_query
HTTP VERB: POST



Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Sample Response::

{
"queryId": "00fd796ut1a7eg0q"
}

Async Query Result API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.


HTTP URI: _plugins/_query/_async_query/{queryId}
HTTP VERB: GET


Sample Request BODY::

curl --location --request GET 'http://localhost:9200/_plugins/_async_query/00fd796ut1a7eg0q' \
--header 'Content-Type: application/json' \
--data '{
"query" : "select * from default.http_logs limit 1"
}'

Sample Response if the Query is in Progress ::

{"status":"RUNNING"}

Sample Response If the Query is successful ::

{
"schema": [
{
"name": "indexed_col_name",
"type": "string"
},
{
"name": "data_type",
"type": "string"
},
{
"name": "skip_type",
"type": "string"
}
],
"datarows": [
[
"status",
"int",
"VALUE_SET"
]
],
"total": 1,
"size": 1
}
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ public void deleteDataSourceTest() {
Assert.assertThrows(
ResponseException.class, () -> client().performRequest(prometheusGetRequest));
Assert.assertEquals(
400, prometheusGetResponseException.getResponse().getStatusLine().getStatusCode());
404, prometheusGetResponseException.getResponse().getStatusLine().getStatusCode());
String prometheusGetResponseString =
getResponseBody(prometheusGetResponseException.getResponse());
JsonObject errorMessage = new Gson().fromJson(prometheusGetResponseString, JsonObject.class);
Assert.assertEquals(
"DataSource with name: delete_prometheus doesn't exist.",
"DataSource with name delete_prometheus doesn't exist.",
errorMessage.get("error").getAsJsonObject().get("details").getAsString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
Loading

0 comments on commit 71f4155

Please sign in to comment.