Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing tags and MV support #2336

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ repositories {
dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
Expand Down
2 changes: 1 addition & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ dependencies {
testImplementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
testImplementation group: 'org.opensearch.driver', name: 'opensearch-sql-jdbc', version: System.getProperty("jdbcDriverVersion", '1.2.0.0')
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
testImplementation project(':opensearch-sql-plugin')
testImplementation project(':legacy')
testImplementation('org.junit.jupiter:junit-jupiter-api:5.6.2')
Expand Down
2 changes: 1 addition & 1 deletion ppl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies {
implementation "org.antlr:antlr4-runtime:4.7.1"
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'org.json', name: 'json', version: '20231013'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api project(':common')
api project(':core')
api project(':protocol')
Expand Down
34 changes: 34 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ singleStatement
statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -76,6 +77,39 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
;

createMaterializedViewStatement
: CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
AS query=materializedViewQuery
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
vmmusings marked this conversation as resolved.
Show resolved Hide resolved

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
*/
materializedViewQuery
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
5 changes: 5 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,26 @@ COMMA: ',';
DOT: '.';


AS: 'AS';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ PIPE: '|';
CONCAT_PIPE: '||';
HAT: '^';
COLON: ':';
DOUBLE_COLON: '::';
ARROW: '->';
FAT_ARROW : '=>';
HENT_START: '/*+';
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ primaryExpression
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
| CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase
| name=(CAST | TRY_CAST) LEFT_PAREN expression AS dataType RIGHT_PAREN #cast
| primaryExpression DOUBLE_COLON dataType #castByColon
| STRUCT LEFT_PAREN (argument+=namedExpression (COMMA argument+=namedExpression)*)? RIGHT_PAREN #struct
| FIRST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #first
| ANY_VALUE LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #any_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public class SparkConstants {
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_CATALOG_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
Expand All @@ -59,9 +59,8 @@ public class SparkQueryDispatcher {

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
public static final String TABLE_TAG_KEY = "table";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String JOB_TYPE_TAG_KEY = "job_type";
vmmusings marked this conversation as resolved.
Show resolved Hide resolved

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -111,6 +110,8 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexDetails);

if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
} else {
Expand All @@ -121,17 +122,29 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
}
}

// TODO: Revisit this logic.
// Currently, Spark if datasource is not provided in query.
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
penghuo marked this conversation as resolved.
Show resolved Hide resolved
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -142,12 +155,12 @@ private DispatchQueryResponse handleIndexQuery(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.getAutoRefresh())
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexDetails.getAutoRefresh(),
indexDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
Expand Down Expand Up @@ -178,6 +191,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
session = createdSession.get();
} else {
// create session if not exist
tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText());
session =
sessionManager.createSession(
new CreateSessionRequest(
Expand All @@ -204,6 +218,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
} else {
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,129 @@

package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.spark.flint.FlintIndexType;

/** Index details in an async query. */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@EqualsAndHashCode
public class IndexDetails {
vmmusings marked this conversation as resolved.
Show resolved Hide resolved

public static final String STRIP_CHARS = "`";

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private Boolean autoRefresh = false;
private boolean autoRefresh;
private boolean isDropIndex;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexDetails() {}

public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
}

// Builder class
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;

public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
}

public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
return this;
}

public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
return this;
}

public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
return this;
}

public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
return this;
}

public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
return this;
}

public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
}
}

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
String indexName = StringUtils.EMPTY;
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
switch (getIndexType()) {
case COVERING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(getIndexName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
break;
case SKIPPING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
break;
case MATERIALIZED_VIEW:
indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dai-chen I am assuming that MV name would contain catalog.db

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. that's requirement for frontend. cc: @ps48

break;
}
return indexName.toLowerCase();
}
}
Loading
Loading