Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tags to the emr jobs created based on the query. #2150

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_async_query
HTTP VERB: POST
HTTP URI: ``_plugins/_async_query``

HTTP VERB: ``POST``

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Expand All @@ -60,8 +61,9 @@ Async Query Result API
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: GET
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``GET``

Sample Request BODY::

Expand All @@ -75,6 +77,7 @@ Sample Response if the Query is in Progress ::
Sample Response If the Query is successful ::

{
"status": "SUCCESS",
"schema": [
{
"name": "indexed_col_name",
Expand Down Expand Up @@ -105,8 +108,9 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: DELETE
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``DELETE``

Sample Request Body ::

Expand Down
17 changes: 10 additions & 7 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.AsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.client.SparkJobClient;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImplEMR;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -297,20 +297,23 @@ private DataSourceServiceImpl createDataSourceService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService() {
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
SparkJobClient sparkJobClient = createEMRServerlessClient();
EMRServerlessClient EMRServerlessClient = createEMRServerlessClient();
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
sparkJobClient, this.dataSourceService, jobExecutionResponseReader);
EMRServerlessClient,
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private SparkJobClient createEMRServerlessClient() {
private EMRServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<SparkJobClient>)
(PrivilegedAction<EMRServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
Expand All @@ -320,7 +323,7 @@ private SparkJobClient createEMRServerlessClient() {
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
return new EmrServerlessClientImplEMR(awsemrServerless);
});
}
}
36 changes: 33 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,42 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'antlr'
}

repositories {
mavenCentral()
}

tasks.register('downloadG4Files', Exec) {
description = 'Download remote .g4 files from GitHub'

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/spark/antlr/parser")
}
configurations {
compile {
extendsFrom = extendsFrom.findAll { it != configurations.antlr }
}
}

// Make sure the downloadG4File task runs before the generateGrammarSource task
generateGrammarSource.dependsOn downloadG4Files

dependencies {
antlr "org.antlr:antlr4:4.7.1"

api project(':core')
implementation project(':protocol')
implementation project(':datasources')
Expand Down Expand Up @@ -46,7 +75,7 @@ jacocoTestReport {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand All @@ -61,7 +90,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*'
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*'
]
limit {
counter = 'LINE'
Expand All @@ -75,7 +105,7 @@ jacocoTestCoverageVerification {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand Down
91 changes: 91 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

grammar FlintSparkSqlExtensions;

import SparkSqlBase;


// Flint SQL Syntax Extension

singleStatement
: statement SEMICOLON* EOF
;

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName ON tableName
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;

indexName
: identifier
;

tableName
: multipartIdentifier
;
Loading