Skip to content

Commit

Permalink
Revert "Handle Describe,Refresh and Show Queries Properly (opensearch…
Browse files Browse the repository at this point in the history
…-project#2357) (opensearch-project#2362)"

This reverts commit 16e2f30.

Signed-off-by: Eric <[email protected]>
  • Loading branch information
mengweieric committed Nov 8, 2023
1 parent 7c311b7 commit 6eb2f0c
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 519 deletions.
5 changes: 0 additions & 5 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -91,10 +90,6 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.*;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
Expand All @@ -53,10 +56,11 @@
public class SparkQueryDispatcher {

private static final Logger LOG = LogManager.getLogger();

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String JOB_TYPE_TAG_KEY = "type";
public static final String JOB_TYPE_TAG_KEY = "job_type";

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -103,18 +107,15 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
}

private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryRequest) {
if (SQLQueryUtils.isFlintExtensionQuery(dispatchQueryRequest.getQuery())) {
IndexQueryDetails indexQueryDetails =
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexQueryDetails);
fillMissingDetails(dispatchQueryRequest, indexDetails);

// TODO: refactor this code properly.
if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) {
return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails);
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) {
return handleStreamingQueries(dispatchQueryRequest, indexQueryDetails);
if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
} else {
return handleFlintNonStreamingQueries(dispatchQueryRequest, indexQueryDetails);
return handleIndexQuery(dispatchQueryRequest, indexDetails);
}
} else {
return handleNonIndexQuery(dispatchQueryRequest);
Expand All @@ -126,59 +127,24 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
if (indexQueryDetails.getFullyQualifiedTableName() != null
&& indexQueryDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexQueryDetails
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
if (indexQueryDetails.isAutoRefresh()) {
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
}
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexQueryDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexQueryDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()),
jobId,
false,
dataSourceMetadata.getResultIndex(),
null);
}

private DispatchQueryResponse handleFlintNonStreamingQueries(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -189,11 +155,12 @@ private DispatchQueryResponse handleFlintNonStreamingQueries(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexQueryDetails.isAutoRefresh(),
indexDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
Expand Down Expand Up @@ -275,12 +242,11 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
}

private DispatchQueryResponse handleDropIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails);
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
try {
Expand All @@ -289,7 +255,7 @@ private DispatchQueryResponse handleDropIndexQuery(
dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId());
}
} finally {
String indexName = indexQueryDetails.openSearchIndexName();
String indexName = indexDetails.openSearchIndexName();
try {
AcknowledgedResponse response =
client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.dispatcher.model;

import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -13,67 +14,83 @@
/** Index details in an async query. */
@Getter
@EqualsAndHashCode
public class IndexQueryDetails {
public class IndexDetails {

public static final String STRIP_CHARS = "`";

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private boolean autoRefresh;
private IndexQueryActionType indexQueryActionType;
private boolean isDropIndex;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexQueryDetails() {}
private IndexDetails() {}

public static IndexQueryDetailsBuilder builder() {
return new IndexQueryDetailsBuilder();
public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
}

// Builder class
public static class IndexQueryDetailsBuilder {
private final IndexQueryDetails indexQueryDetails;
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;

public IndexQueryDetailsBuilder() {
indexQueryDetails = new IndexQueryDetails();
public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
}

public IndexQueryDetailsBuilder indexName(String indexName) {
indexQueryDetails.indexName = indexName;
public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
return this;
}

public IndexQueryDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexQueryDetails.fullyQualifiedTableName = tableName;
public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexQueryDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexQueryDetails.autoRefresh = autoRefresh;
public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
return this;
}

public IndexQueryDetailsBuilder indexQueryActionType(
IndexQueryActionType indexQueryActionType) {
indexQueryDetails.indexQueryActionType = indexQueryActionType;
public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
return this;
}

public IndexQueryDetailsBuilder mvName(String mvName) {
indexQueryDetails.mvName = mvName;
public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
return this;
}

public IndexQueryDetailsBuilder indexType(FlintIndexType indexType) {
indexQueryDetails.indexType = indexType;
public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
return this;
}

public IndexQueryDetails build() {
return indexQueryDetails;
public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

public static final String SESSION_ID_TAG_KEY = "sid";

private final SessionId sessionId;
private final StateStore stateStore;
private final EMRServerlessClient serverlessClient;
Expand All @@ -48,7 +46,6 @@ public void open(CreateSessionRequest createSessionRequest) {
createSessionRequest
.getSparkSubmitParametersBuilder()
.sessionExecution(sessionId.getSessionId(), createSessionRequest.getDatasourceName());
createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId.getSessionId());
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Interface for FlintIndexMetadataReader */
public interface FlintIndexMetadataReader {

/**
* Given Index details, get the streaming job Id.
*
* @param indexQueryDetails indexDetails.
* @param indexDetails indexDetails.
* @return FlintIndexMetadata.
*/
FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails);
FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;

/** Implementation of {@link FlintIndexMetadataReader} */
@AllArgsConstructor
Expand All @@ -14,8 +14,8 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader {
private final Client client;

@Override
public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) {
String indexName = indexQueryDetails.openSearchIndexName();
public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) {
String indexName = indexDetails.openSearchIndexName();
GetMappingsResponse mappingsResponse =
client.admin().indices().prepareGetMappings(indexName).get();
try {
Expand Down
Loading

0 comments on commit 6eb2f0c

Please sign in to comment.