Skip to content

Commit

Permalink
Add tags to the emr jobs based on the query types
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Sep 27, 2023
1 parent be82714 commit d281e07
Show file tree
Hide file tree
Showing 24 changed files with 3,632 additions and 89 deletions.
18 changes: 11 additions & 7 deletions docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ Async Query Creation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/create``.

HTTP URI: _plugins/_async_query
HTTP VERB: POST
HTTP URI: ``_plugins/_async_query``

HTTP VERB: ``POST``

Sample Request::

curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Expand All @@ -60,8 +61,9 @@ Async Query Result API
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/async_query/result``.
Async Query Creation and Result Query permissions are orthogonal, so any user with result api permissions and queryId can query the corresponding query results irrespective of the user who created the async query.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: GET
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``GET``

Sample Request BODY::

Expand All @@ -75,6 +77,7 @@ Sample Response if the Query is in Progress ::
Sample Response If the Query is successful ::

{
"status": "SUCCESS",
"schema": [
{
"name": "indexed_col_name",
Expand Down Expand Up @@ -105,8 +108,9 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

HTTP URI: _plugins/_async_query/{queryId}
HTTP VERB: DELETE
HTTP URI: ``_plugins/_async_query/{queryId}``

HTTP VERB: ``DELETE``

Sample Request Body ::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
sparkJobClient, this.dataSourceService, jobExecutionResponseReader);
sparkJobClient,
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
36 changes: 33 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,42 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'antlr'
}

repositories {
mavenCentral()
}

tasks.register('downloadG4Files', Exec) {
description = 'Download remote .g4 files from GitHub'

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/spark/antlr/parser")
}
configurations {
compile {
extendsFrom = extendsFrom.findAll { it != configurations.antlr }
}
}

// Make sure the downloadG4File task runs before the generateGrammarSource task
generateGrammarSource.dependsOn downloadG4Files

dependencies {
antlr "org.antlr:antlr4:4.7.1"

api project(':core')
implementation project(':protocol')
implementation project(':datasources')
Expand Down Expand Up @@ -46,7 +75,7 @@ jacocoTestReport {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand All @@ -61,7 +90,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*'
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*'
]
limit {
counter = 'LINE'
Expand All @@ -75,7 +105,7 @@ jacocoTestCoverageVerification {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand Down
91 changes: 91 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

grammar FlintSparkSqlExtensions;

import SparkSqlBase;


// Flint SQL Syntax Extension

singleStatement
: statement SEMICOLON* EOF
;

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName ON tableName
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;

indexName
: identifier
;

tableName
: multipartIdentifier
;
Loading

0 comments on commit d281e07

Please sign in to comment.