Skip to content

Commit

Permalink
Add tags to the emr jobs based on the query types
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Sep 26, 2023
1 parent be82714 commit 03ab1e6
Show file tree
Hide file tree
Showing 23 changed files with 3,537 additions and 83 deletions.
2 changes: 1 addition & 1 deletion docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Sample Request::
curl --location 'http://localhost:9200/_plugins/_async_query' \
--header 'Content-Type: application/json' \
--data '{
"kind" : "sql",
"lang" : "sql",
"query" : "select * from my_glue.default.http_logs limit 10"
}'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
sparkJobClient, this.dataSourceService, jobExecutionResponseReader);
sparkJobClient,
this.dataSourceService,
new DataSourceUserAuthorizationHelperImpl(client),
jobExecutionResponseReader);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}
Expand Down
36 changes: 33 additions & 3 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,42 @@ plugins {
id 'java-library'
id "io.freefair.lombok"
id 'jacoco'
id 'antlr'
}

repositories {
mavenCentral()
}

tasks.register('downloadG4Files', Exec) {
description = 'Download remote .g4 files from GitHub'

executable 'curl'

// Need to add these back once the grammar issues with indexName and tableName is addressed in flint integration jar.
// args '-o', 'src/main/antlr/FlintSparkSqlExtensions.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4'
// args '-o', 'src/main/antlr/SparkSqlBase.g4', 'https://raw.githubusercontent.com/opensearch-project/opensearch-spark/main/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4'
args '-o', 'src/main/antlr/SqlBaseParser.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4'
args '-o', 'src/main/antlr/SqlBaseLexer.g4', 'https://raw.githubusercontent.com/apache/spark/master/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4'
}

generateGrammarSource {
arguments += ['-visitor', '-package', 'org.opensearch.sql.spark.antlr.parser']
source = sourceSets.main.antlr
outputDirectory = file("build/generated-src/antlr/main/org/opensearch/sql/spark/antlr/parser")
}
configurations {
compile {
extendsFrom = extendsFrom.findAll { it != configurations.antlr }
}
}

// Make sure the downloadG4File task runs before the generateGrammarSource task
generateGrammarSource.dependsOn downloadG4Files

dependencies {
antlr "org.antlr:antlr4:4.7.1"

api project(':core')
implementation project(':protocol')
implementation project(':datasources')
Expand Down Expand Up @@ -46,7 +75,7 @@ jacocoTestReport {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand All @@ -61,7 +90,8 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.asyncquery.model.*',
'org.opensearch.sql.spark.asyncquery.exceptions.*'
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*'
]
limit {
counter = 'LINE'
Expand All @@ -75,7 +105,7 @@ jacocoTestCoverageVerification {
}
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it)
fileTree(dir: it, exclude: ['**/antlr/parser/**'])
}))
}
}
Expand Down
91 changes: 91 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

grammar FlintSparkSqlExtensions;

import SparkSqlBase;


// Flint SQL Syntax Extension

singleStatement
: statement SEMICOLON* EOF
;

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| dropSkippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshSkippingIndexStatement
: REFRESH SKIPPING INDEX ON tableName
;

describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName ON tableName
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

indexColTypeList
: indexColType (COMMA indexColType)*
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
;

indexName
: identifier
;

tableName
: multipartIdentifier
;
Loading

0 comments on commit 03ab1e6

Please sign in to comment.