From 31b2813bf5a5f6902c7c1449a607bf29e281eb64 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 25 Oct 2023 16:24:47 +0000 Subject: [PATCH] Handle Describe,Refresh and Show Queries Properly (#2357) Signed-off-by: Vamsi Manohar (cherry picked from commit 886c2fcc87c461304a9457653d25255b985c9837) Signed-off-by: github-actions[bot] --- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 5 + .../dispatcher/SparkQueryDispatcher.java | 84 +++++-- .../model/IndexQueryActionType.java | 15 ++ ...dexDetails.java => IndexQueryDetails.java} | 65 ++--- .../execution/session/InteractiveSession.java | 3 + .../spark/flint/FlintIndexMetadataReader.java | 6 +- .../flint/FlintIndexMetadataReaderImpl.java | 6 +- .../sql/spark/utils/SQLQueryUtils.java | 113 +++++++-- .../dispatcher/SparkQueryDispatcherTest.java | 234 +++++++++++++++--- .../session/InteractiveSessionTest.java | 3 +- .../FlintIndexMetadataReaderImplTest.java | 15 +- ...lsTest.java => IndexQueryDetailsTest.java} | 9 +- .../sql/spark/utils/SQLQueryUtilsTest.java | 123 +++++++-- 13 files changed, 519 insertions(+), 162 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java rename spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/{IndexDetails.java => IndexQueryDetails.java} (55%) rename spark/src/test/java/org/opensearch/sql/spark/flint/{IndexDetailsTest.java => IndexQueryDetailsTest.java} (71%) diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index c4af2779d1..f48c276e44 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -79,6 +79,7 @@ dropCoveringIndexStatement materializedViewStatement : createMaterializedViewStatement + | refreshMaterializedViewStatement | showMaterializedViewStatement | describeMaterializedViewStatement | dropMaterializedViewStatement @@ -90,6 +91,10 @@ createMaterializedViewStatement (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; +refreshMaterializedViewStatement + : REFRESH MATERIALIZED VIEW mvName=multipartIdentifier + ; + showMaterializedViewStatement : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier ; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index ff7ccf8c08..6ec67709b8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -36,10 +36,7 @@ import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; -import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.dispatcher.model.*; import org.opensearch.sql.spark.execution.session.CreateSessionRequest; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; @@ -56,11 +53,10 @@ public class SparkQueryDispatcher { private static final Logger LOG = LogManager.getLogger(); - public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; public static final String CLUSTER_NAME_TAG_KEY = "cluster"; - public static final String JOB_TYPE_TAG_KEY = "job_type"; + public static final String JOB_TYPE_TAG_KEY = "type"; private EMRServerlessClient emrServerlessClient; @@ -107,15 +103,18 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { } private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryRequest) { - if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) { - IndexDetails indexDetails = + if (SQLQueryUtils.isFlintExtensionQuery(dispatchQueryRequest.getQuery())) { + IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery()); - fillMissingDetails(dispatchQueryRequest, indexDetails); + fillMissingDetails(dispatchQueryRequest, indexQueryDetails); - if (indexDetails.isDropIndex()) { - return handleDropIndexQuery(dispatchQueryRequest, indexDetails); + // TODO: refactor this code properly. + if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) { + return handleDropIndexQuery(dispatchQueryRequest, indexQueryDetails); + } else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) { + return handleStreamingQueries(dispatchQueryRequest, indexQueryDetails); } else { - return handleIndexQuery(dispatchQueryRequest, indexDetails); + return handleFlintNonStreamingQueries(dispatchQueryRequest, indexQueryDetails); } } else { return handleNonIndexQuery(dispatchQueryRequest); @@ -127,24 +126,59 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR // Spark Assumes the datasource to be catalog. // This is required to handle drop index case properly when datasource name is not provided. private static void fillMissingDetails( - DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { - if (indexDetails.getFullyQualifiedTableName() != null - && indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) { - indexDetails + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { + if (indexQueryDetails.getFullyQualifiedTableName() != null + && indexQueryDetails.getFullyQualifiedTableName().getDatasourceName() == null) { + indexQueryDetails .getFullyQualifiedTableName() .setDatasourceName(dispatchQueryRequest.getDatasource()); } } - private DispatchQueryResponse handleIndexQuery( - DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { + private DispatchQueryResponse handleStreamingQueries( + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { + DataSourceMetadata dataSourceMetadata = + this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); + dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); + String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; + Map tags = getDefaultTagsForJobSubmission(dispatchQueryRequest); + tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName()); + if (indexQueryDetails.isAutoRefresh()) { + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); + } + StartJobRequest startJobRequest = + new StartJobRequest( + dispatchQueryRequest.getQuery(), + jobName, + dispatchQueryRequest.getApplicationId(), + dispatchQueryRequest.getExecutionRoleARN(), + SparkSubmitParameters.Builder.builder() + .dataSource( + dataSourceService.getRawDataSourceMetadata( + dispatchQueryRequest.getDatasource())) + .structuredStreaming(indexQueryDetails.isAutoRefresh()) + .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) + .build() + .toString(), + tags, + indexQueryDetails.isAutoRefresh(), + dataSourceMetadata.getResultIndex()); + String jobId = emrServerlessClient.startJobRun(startJobRequest); + return new DispatchQueryResponse( + AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()), + jobId, + false, + dataSourceMetadata.getResultIndex(), + null); + } + + private DispatchQueryResponse handleFlintNonStreamingQueries( + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; Map tags = getDefaultTagsForJobSubmission(dispatchQueryRequest); - tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName()); - tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); StartJobRequest startJobRequest = new StartJobRequest( dispatchQueryRequest.getQuery(), @@ -155,12 +189,11 @@ private DispatchQueryResponse handleIndexQuery( .dataSource( dataSourceService.getRawDataSourceMetadata( dispatchQueryRequest.getDatasource())) - .structuredStreaming(indexDetails.isAutoRefresh()) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) .build() .toString(), tags, - indexDetails.isAutoRefresh(), + indexQueryDetails.isAutoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); return new DispatchQueryResponse( @@ -242,11 +275,12 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ } private DispatchQueryResponse handleDropIndexQuery( - DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) { + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { DataSourceMetadata dataSourceMetadata = this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource()); dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata); - FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); + FlintIndexMetadata indexMetadata = + flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails); // if index is created without auto refresh. there is no job to cancel. String status = JobRunState.FAILED.toString(); try { @@ -255,7 +289,7 @@ private DispatchQueryResponse handleDropIndexQuery( dispatchQueryRequest.getApplicationId(), indexMetadata.getJobId()); } } finally { - String indexName = indexDetails.openSearchIndexName(); + String indexName = indexQueryDetails.openSearchIndexName(); try { AcknowledgedResponse response = client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java new file mode 100644 index 0000000000..2c96511d2a --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher.model; + +/** Enum for Index Action in the given query.* */ +public enum IndexQueryActionType { + CREATE, + REFRESH, + DESCRIBE, + SHOW, + DROP +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java similarity index 55% rename from spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java rename to spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 42e2905e67..5b4326a10e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.dispatcher.model; -import com.google.common.base.Preconditions; import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -14,7 +13,7 @@ /** Index details in an async query. */ @Getter @EqualsAndHashCode -public class IndexDetails { +public class IndexQueryDetails { public static final String STRIP_CHARS = "`"; @@ -22,75 +21,59 @@ public class IndexDetails { private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; private boolean autoRefresh; - private boolean isDropIndex; + private IndexQueryActionType indexQueryActionType; // materialized view special case where // table name and mv name are combined. private String mvName; private FlintIndexType indexType; - private IndexDetails() {} + private IndexQueryDetails() {} - public static IndexDetailsBuilder builder() { - return new IndexDetailsBuilder(); + public static IndexQueryDetailsBuilder builder() { + return new IndexQueryDetailsBuilder(); } // Builder class - public static class IndexDetailsBuilder { - private final IndexDetails indexDetails; + public static class IndexQueryDetailsBuilder { + private final IndexQueryDetails indexQueryDetails; - public IndexDetailsBuilder() { - indexDetails = new IndexDetails(); + public IndexQueryDetailsBuilder() { + indexQueryDetails = new IndexQueryDetails(); } - public IndexDetailsBuilder indexName(String indexName) { - indexDetails.indexName = indexName; + public IndexQueryDetailsBuilder indexName(String indexName) { + indexQueryDetails.indexName = indexName; return this; } - public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) { - indexDetails.fullyQualifiedTableName = tableName; + public IndexQueryDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) { + indexQueryDetails.fullyQualifiedTableName = tableName; return this; } - public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) { - indexDetails.autoRefresh = autoRefresh; + public IndexQueryDetailsBuilder autoRefresh(Boolean autoRefresh) { + indexQueryDetails.autoRefresh = autoRefresh; return this; } - public IndexDetailsBuilder isDropIndex(boolean isDropIndex) { - indexDetails.isDropIndex = isDropIndex; + public IndexQueryDetailsBuilder indexQueryActionType( + IndexQueryActionType indexQueryActionType) { + indexQueryDetails.indexQueryActionType = indexQueryActionType; return this; } - public IndexDetailsBuilder mvName(String mvName) { - indexDetails.mvName = mvName; + public IndexQueryDetailsBuilder mvName(String mvName) { + indexQueryDetails.mvName = mvName; return this; } - public IndexDetailsBuilder indexType(FlintIndexType indexType) { - indexDetails.indexType = indexType; + public IndexQueryDetailsBuilder indexType(FlintIndexType indexType) { + indexQueryDetails.indexType = indexType; return this; } - public IndexDetails build() { - Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null"); - switch (indexDetails.indexType) { - case COVERING: - Preconditions.checkNotNull( - indexDetails.indexName, "IndexName can't be null for Covering Index."); - Preconditions.checkNotNull( - indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index."); - break; - case SKIPPING: - Preconditions.checkNotNull( - indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index."); - break; - case MATERIALIZED_VIEW: - Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null"); - break; - } - - return indexDetails; + public IndexQueryDetails build() { + return indexQueryDetails; } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index a2e7cfe6ee..956275b04a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -34,6 +34,8 @@ public class InteractiveSession implements Session { private static final Logger LOG = LogManager.getLogger(); + public static final String SESSION_ID_TAG_KEY = "sid"; + private final SessionId sessionId; private final StateStore stateStore; private final EMRServerlessClient serverlessClient; @@ -46,6 +48,7 @@ public void open(CreateSessionRequest createSessionRequest) { createSessionRequest .getSparkSubmitParametersBuilder() .sessionExecution(sessionId.getSessionId(), createSessionRequest.getDatasourceName()); + createSessionRequest.getTags().put(SESSION_ID_TAG_KEY, sessionId.getSessionId()); String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java index e4a5e92035..d4a8e7ddbf 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java @@ -1,6 +1,6 @@ package org.opensearch.sql.spark.flint; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; /** Interface for FlintIndexMetadataReader */ public interface FlintIndexMetadataReader { @@ -8,8 +8,8 @@ public interface FlintIndexMetadataReader { /** * Given Index details, get the streaming job Id. * - * @param indexDetails indexDetails. + * @param indexQueryDetails indexDetails. * @return FlintIndexMetadata. */ - FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails); + FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index 5f712e65cd..a16d0b9138 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -5,7 +5,7 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; /** Implementation of {@link FlintIndexMetadataReader} */ @AllArgsConstructor @@ -14,8 +14,8 @@ public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { private final Client client; @Override - public FlintIndexMetadata getFlintIndexMetadata(IndexDetails indexDetails) { - String indexName = indexDetails.openSearchIndexName(); + public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) { + String indexName = indexQueryDetails.openSearchIndexName(); GetMappingsResponse mappingsResponse = client.admin().indices().prepareGetMappings(indexName).get(); try { diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index 4816f1c2cd..c1f3f02576 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -20,7 +20,8 @@ import org.opensearch.sql.spark.antlr.parser.SqlBaseParser; import org.opensearch.sql.spark.antlr.parser.SqlBaseParserBaseVisitor; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.flint.FlintIndexType; /** @@ -42,7 +43,7 @@ public static FullyQualifiedTableName extractFullyQualifiedTableName(String sqlQ return sparkSqlTableNameVisitor.getFullyQualifiedTableName(); } - public static IndexDetails extractIndexDetails(String sqlQuery) { + public static IndexQueryDetails extractIndexDetails(String sqlQuery) { FlintSparkSqlExtensionsParser flintSparkSqlExtensionsParser = new FlintSparkSqlExtensionsParser( new CommonTokenStream( @@ -52,10 +53,10 @@ public static IndexDetails extractIndexDetails(String sqlQuery) { flintSparkSqlExtensionsParser.statement(); FlintSQLIndexDetailsVisitor flintSQLIndexDetailsVisitor = new FlintSQLIndexDetailsVisitor(); statementContext.accept(flintSQLIndexDetailsVisitor); - return flintSQLIndexDetailsVisitor.getIndexDetailsBuilder().build(); + return flintSQLIndexDetailsVisitor.getIndexQueryDetailsBuilder().build(); } - public static boolean isIndexQuery(String sqlQuery) { + public static boolean isFlintExtensionQuery(String sqlQuery) { FlintSparkSqlExtensionsParser flintSparkSqlExtensionsParser = new FlintSparkSqlExtensionsParser( new CommonTokenStream( @@ -117,29 +118,29 @@ public Void visitCreateTableHeader(SqlBaseParser.CreateTableHeaderContext ctx) { public static class FlintSQLIndexDetailsVisitor extends FlintSparkSqlExtensionsBaseVisitor { - @Getter private final IndexDetails.IndexDetailsBuilder indexDetailsBuilder; + @Getter private final IndexQueryDetails.IndexQueryDetailsBuilder indexQueryDetailsBuilder; public FlintSQLIndexDetailsVisitor() { - this.indexDetailsBuilder = new IndexDetails.IndexDetailsBuilder(); + this.indexQueryDetailsBuilder = new IndexQueryDetails.IndexQueryDetailsBuilder(); } @Override public Void visitIndexName(FlintSparkSqlExtensionsParser.IndexNameContext ctx) { - indexDetailsBuilder.indexName(ctx.getText()); + indexQueryDetailsBuilder.indexName(ctx.getText()); return super.visitIndexName(ctx); } @Override public Void visitTableName(FlintSparkSqlExtensionsParser.TableNameContext ctx) { - indexDetailsBuilder.fullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); + indexQueryDetailsBuilder.fullyQualifiedTableName(new FullyQualifiedTableName(ctx.getText())); return super.visitTableName(ctx); } @Override public Void visitCreateSkippingIndexStatement( FlintSparkSqlExtensionsParser.CreateSkippingIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.CREATE); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); visitPropertyList(ctx.propertyList()); return super.visitCreateSkippingIndexStatement(ctx); } @@ -147,8 +148,8 @@ public Void visitCreateSkippingIndexStatement( @Override public Void visitCreateCoveringIndexStatement( FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.COVERING); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.CREATE); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); visitPropertyList(ctx.propertyList()); return super.visitCreateCoveringIndexStatement(ctx); } @@ -156,9 +157,9 @@ public Void visitCreateCoveringIndexStatement( @Override public Void visitCreateMaterializedViewStatement( FlintSparkSqlExtensionsParser.CreateMaterializedViewStatementContext ctx) { - indexDetailsBuilder.isDropIndex(false); - indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); - indexDetailsBuilder.mvName(ctx.mvName.getText()); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.CREATE); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); visitPropertyList(ctx.propertyList()); return super.visitCreateMaterializedViewStatement(ctx); } @@ -166,28 +167,94 @@ public Void visitCreateMaterializedViewStatement( @Override public Void visitDropCoveringIndexStatement( FlintSparkSqlExtensionsParser.DropCoveringIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.COVERING); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DROP); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); return super.visitDropCoveringIndexStatement(ctx); } @Override public Void visitDropSkippingIndexStatement( FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.SKIPPING); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DROP); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); return super.visitDropSkippingIndexStatement(ctx); } @Override public Void visitDropMaterializedViewStatement( FlintSparkSqlExtensionsParser.DropMaterializedViewStatementContext ctx) { - indexDetailsBuilder.isDropIndex(true); - indexDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); - indexDetailsBuilder.mvName(ctx.mvName.getText()); + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DROP); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); return super.visitDropMaterializedViewStatement(ctx); } + @Override + public Void visitDescribeCoveringIndexStatement( + FlintSparkSqlExtensionsParser.DescribeCoveringIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DESCRIBE); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); + return super.visitDescribeCoveringIndexStatement(ctx); + } + + @Override + public Void visitDescribeSkippingIndexStatement( + FlintSparkSqlExtensionsParser.DescribeSkippingIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DESCRIBE); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); + return super.visitDescribeSkippingIndexStatement(ctx); + } + + @Override + public Void visitDescribeMaterializedViewStatement( + FlintSparkSqlExtensionsParser.DescribeMaterializedViewStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.DESCRIBE); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); + return super.visitDescribeMaterializedViewStatement(ctx); + } + + @Override + public Void visitShowCoveringIndexStatement( + FlintSparkSqlExtensionsParser.ShowCoveringIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.SHOW); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); + return super.visitShowCoveringIndexStatement(ctx); + } + + @Override + public Void visitShowMaterializedViewStatement( + FlintSparkSqlExtensionsParser.ShowMaterializedViewStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.SHOW); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + return super.visitShowMaterializedViewStatement(ctx); + } + + @Override + public Void visitRefreshCoveringIndexStatement( + FlintSparkSqlExtensionsParser.RefreshCoveringIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.REFRESH); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); + return super.visitRefreshCoveringIndexStatement(ctx); + } + + @Override + public Void visitRefreshSkippingIndexStatement( + FlintSparkSqlExtensionsParser.RefreshSkippingIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.REFRESH); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); + return super.visitRefreshSkippingIndexStatement(ctx); + } + + @Override + public Void visitRefreshMaterializedViewStatement( + FlintSparkSqlExtensionsParser.RefreshMaterializedViewStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.REFRESH); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); + return super.visitRefreshMaterializedViewStatement(ctx); + } + @Override public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) { if (ctx != null) { @@ -199,7 +266,7 @@ public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) { if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) { - indexDetailsBuilder.autoRefresh(true); + indexQueryDetailsBuilder.autoRefresh(true); } } }); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index a69c6e2b1a..fc8623d51a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -63,11 +63,7 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; -import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; -import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; -import org.opensearch.sql.spark.dispatcher.model.JobType; +import org.opensearch.sql.spark.dispatcher.model.*; import org.opensearch.sql.spark.execution.session.Session; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; @@ -126,7 +122,7 @@ void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -181,7 +177,7 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -237,7 +233,7 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + tags.put("type", JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -372,7 +368,7 @@ void testDispatchIndexQuery() { tags.put("datasource", "my_glue"); tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -430,7 +426,7 @@ void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + tags.put("type", JobType.BATCH.getText()); String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -485,7 +481,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); tags.put("datasource", "my_glue"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.BATCH.getText()); + tags.put("type", JobType.BATCH.getText()); String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -541,7 +537,7 @@ void testDispatchIndexQueryWithoutADatasourceName() { tags.put("datasource", "my_glue"); tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -600,7 +596,7 @@ void testDispatchMaterializedViewQuery() { tags.put("datasource", "my_glue"); tags.put("index", "flint_mv_1"); tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("job_type", JobType.STREAMING.getText()); + tags.put("type", JobType.STREAMING.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; @@ -653,6 +649,168 @@ void testDispatchMaterializedViewQuery() { verifyNoInteractions(flintIndexMetadataReader); } + @Test + void testDispatchShowMVQuery() { + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + String query = "SHOW MATERIALIZED VIEW IN mys3.default"; + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }); + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + any()))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + StartJobRequest expected = + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + null); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); + verifyNoInteractions(flintIndexMetadataReader); + } + + @Test + void testRefreshIndexQuery() { + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs"; + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }); + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + any()))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + StartJobRequest expected = + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + null); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); + verifyNoInteractions(flintIndexMetadataReader); + } + + @Test + void testDispatchDescribeIndexQuery() { + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"; + String sparkSubmitParameters = + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }); + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + any()))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + DispatchQueryResponse dispatchQueryResponse = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); + StartJobRequest expected = + new StartJobRequest( + query, + "TEST_CLUSTER:index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + sparkSubmitParameters, + tags, + false, + null); + Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); + Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); + Assertions.assertFalse(dispatchQueryResponse.isDropIndexQuery()); + verifyNoInteractions(flintIndexMetadataReader); + } + @Test void testDispatchWithWrongURI() { when(dataSourceService.getRawDataSourceMetadata("my_glue")) @@ -903,15 +1061,15 @@ void testGetQueryResponseOfDropIndex() { @Test void testDropIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP INDEX size_year ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() .indexName("size_year") .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.COVERING) .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); // auto_refresh == true @@ -940,7 +1098,7 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -950,14 +1108,14 @@ void testDropIndexQuery() throws ExecutionException, InterruptedException { @Test void testDropSkippingIndexQuery() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); when(flintIndexMetadata.isAutoRefresh()).thenReturn(true); @@ -984,7 +1142,7 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -995,14 +1153,14 @@ void testDropSkippingIndexQuery() throws ExecutionException, InterruptedExceptio void testDropSkippingIndexQueryAutoRefreshFalse() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -1023,7 +1181,7 @@ void testDropSkippingIndexQueryAutoRefreshFalse() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); @@ -1034,14 +1192,14 @@ void testDropSkippingIndexQueryAutoRefreshFalse() void testDropSkippingIndexQueryDeleteIndexException() throws ExecutionException, InterruptedException { String query = "DROP SKIPPING INDEX ON my_glue.default.http_logs"; - IndexDetails indexDetails = - IndexDetails.builder() + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() .fullyQualifiedTableName(new FullyQualifiedTableName("my_glue.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.isAutoRefresh()).thenReturn(false); @@ -1063,7 +1221,7 @@ void testDropSkippingIndexQueryDeleteIndexException() TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(0)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.FAILED.toString(), dropIndexResult.getStatus()); @@ -1076,14 +1234,14 @@ void testDropSkippingIndexQueryDeleteIndexException() @Test void testDropMVQuery() throws ExecutionException, InterruptedException { String query = "DROP MATERIALIZED VIEW mv_1"; - IndexDetails indexDetails = - IndexDetails.builder() + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() .mvName("mv_1") - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .fullyQualifiedTableName(null) .indexType(FlintIndexType.MATERIALIZED_VIEW) .build(); - when(flintIndexMetadataReader.getFlintIndexMetadata(indexDetails)) + when(flintIndexMetadataReader.getFlintIndexMetadata(indexQueryDetails)) .thenReturn(flintIndexMetadata); when(flintIndexMetadata.getJobId()).thenReturn(EMR_JOB_ID); // auto_refresh == true @@ -1112,7 +1270,7 @@ void testDropMVQuery() throws ExecutionException, InterruptedException { TEST_CLUSTER_NAME)); verify(emrServerlessClient, times(1)).cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID); verify(dataSourceUserAuthorizationHelper, times(1)).authorizeDataSource(dataSourceMetadata); - verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexDetails); + verify(flintIndexMetadataReader, times(1)).getFlintIndexMetadata(indexQueryDetails); SparkQueryDispatcher.DropIndexResult dropIndexResult = SparkQueryDispatcher.DropIndexResult.fromJobId(dispatchQueryResponse.getJobId()); Assertions.assertEquals(JobRunState.SUCCESS.toString(), dropIndexResult.getStatus()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 06a8d8c73c..14ccaf7708 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -13,7 +13,6 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; -import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Optional; import lombok.RequiredArgsConstructor; @@ -194,7 +193,7 @@ public static CreateSessionRequest createSessionRequest() { "appId", "arn", SparkSubmitParameters.Builder.builder(), - ImmutableMap.of(), + new HashMap<>(), "resultIndex", DS_NAME); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 3cc40e0df5..4d809c31dc 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -25,7 +25,8 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; @ExtendWith(MockitoExtension.class) public class FlintIndexMetadataReaderImplTest { @@ -44,10 +45,10 @@ void testGetJobIdFromFlintSkippingIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() + IndexQueryDetails.builder() .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build()); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); @@ -64,11 +65,11 @@ void testGetJobIdFromFlintCoveringIndexMetadata() { FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() + IndexQueryDetails.builder() .indexName("cv1") .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.COVERING) .build()); Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); @@ -87,12 +88,12 @@ void testGetJobIDWithNPEException() { IllegalArgumentException.class, () -> flintIndexMetadataReader.getFlintIndexMetadata( - IndexDetails.builder() + IndexQueryDetails.builder() .indexName("cv1") .fullyQualifiedTableName( new FullyQualifiedTableName("mys3.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.COVERING) .build())); Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java similarity index 71% rename from spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java rename to spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java index cf6b5f8f2b..e725ddc21e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java @@ -9,18 +9,19 @@ import org.junit.jupiter.api.Test; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; -public class IndexDetailsTest { +public class IndexQueryDetailsTest { @Test public void skippingIndexName() { assertEquals( "flint_mys3_default_http_logs_skipping_index", - IndexDetails.builder() + IndexQueryDetails.builder() .indexName("invalid") .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) .autoRefresh(false) - .isDropIndex(true) + .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build() .openSearchIndexName()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 01759c2bdd..c86d7656d6 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -15,7 +15,9 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexDetails; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.flint.FlintIndexType; @ExtendWith(MockitoExtension.class) public class SQLQueryUtilsTest { @@ -25,13 +27,13 @@ void testExtractionOfTableNameFromSQLQueries() { String sqlQuery = "select * from my_glue.default.http_logs"; FullyQualifiedTableName fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("my_glue", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("http_logs", fullyQualifiedTableName.getTableName()); sqlQuery = "select * from my_glue.db.http_logs"; - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); Assertions.assertEquals("my_glue", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("db", fullyQualifiedTableName.getSchemaName()); @@ -39,28 +41,28 @@ void testExtractionOfTableNameFromSQLQueries() { sqlQuery = "select * from my_glue.http_logs"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("my_glue", fullyQualifiedTableName.getSchemaName()); Assertions.assertNull(fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("http_logs", fullyQualifiedTableName.getTableName()); sqlQuery = "select * from http_logs"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertNull(fullyQualifiedTableName.getDatasourceName()); Assertions.assertNull(fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("http_logs", fullyQualifiedTableName.getTableName()); sqlQuery = "DROP TABLE myS3.default.alb_logs"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); sqlQuery = "DESCRIBE TABLE myS3.default.alb_logs"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); @@ -73,7 +75,7 @@ void testExtractionOfTableNameFromSQLQueries() { + "STORED AS file_format\n" + "LOCATION { 's3://bucket/folder/' }"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); @@ -92,7 +94,7 @@ void testErrorScenarios() { sqlQuery = "DESCRIBE TABLE FROM myS3.default.alb_logs"; fullyQualifiedTableName = SQLQueryUtils.extractFullyQualifiedTableName(sqlQuery); - Assertions.assertFalse(SQLQueryUtils.isIndexQuery(sqlQuery)); + Assertions.assertFalse(SQLQueryUtils.isFlintExtensionQuery(sqlQuery)); Assertions.assertEquals("FROM", fullyQualifiedTableName.getFullyQualifiedName()); Assertions.assertNull(fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("FROM", fullyQualifiedTableName.getTableName()); @@ -104,10 +106,12 @@ void testExtractionFromFlintIndexQueries() { String createCoveredIndexQuery = "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; - Assertions.assertTrue(SQLQueryUtils.isIndexQuery(createCoveredIndexQuery)); - IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); - FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); - Assertions.assertEquals("elb_and_requestUri", indexDetails.getIndexName()); + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(createCoveredIndexQuery)); + IndexQueryDetails indexQueryDetails = + SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); + FullyQualifiedTableName fullyQualifiedTableName = + indexQueryDetails.getFullyQualifiedTableName(); + Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName()); Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); @@ -118,12 +122,99 @@ void testExtractionFromFlintMVQuery() { String createCoveredIndexQuery = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; - Assertions.assertTrue(SQLQueryUtils.isIndexQuery(createCoveredIndexQuery)); - IndexDetails indexDetails = SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(createCoveredIndexQuery)); + IndexQueryDetails indexQueryDetails = + SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); + FullyQualifiedTableName fullyQualifiedTableName = + indexQueryDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexQueryDetails.getIndexName()); + Assertions.assertNull(fullyQualifiedTableName); + Assertions.assertEquals("mv_1", indexQueryDetails.getMvName()); + } + + @Test + void testDescIndex() { + String descSkippingIndex = "DESC SKIPPING INDEX ON mys3.default.http_logs"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(descSkippingIndex)); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(descSkippingIndex); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertNotNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.SKIPPING, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + + String descCoveringIndex = "DESC INDEX cv1 ON mys3.default.http_logs"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(descCoveringIndex)); + indexDetails = SQLQueryUtils.extractIndexDetails(descCoveringIndex); + fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertEquals("cv1", indexDetails.getIndexName()); + Assertions.assertNotNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + + String descMv = "DESC MATERIALIZED VIEW mv1"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(descMv)); + indexDetails = SQLQueryUtils.extractIndexDetails(descMv); + fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertEquals("mv1", indexDetails.getMvName()); + Assertions.assertNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.DESCRIBE, indexDetails.getIndexQueryActionType()); + } + + @Test + void testShowIndex() { + String showCoveringIndex = " SHOW INDEX ON myS3.default.http_logs"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(showCoveringIndex)); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(showCoveringIndex); FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertNull(indexDetails.getMvName()); + Assertions.assertNotNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); + + String showMV = "SHOW MATERIALIZED VIEW IN my_glue.default"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(showMV)); + indexDetails = SQLQueryUtils.extractIndexDetails(showMV); + fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertNull(indexDetails.getMvName()); + Assertions.assertNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.SHOW, indexDetails.getIndexQueryActionType()); + } + + @Test + void testRefreshIndex() { + String refreshSkippingIndex = "REFRESH SKIPPING INDEX ON mys3.default.http_logs"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(refreshSkippingIndex)); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(refreshSkippingIndex); + FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertNotNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.SKIPPING, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.REFRESH, indexDetails.getIndexQueryActionType()); + + String refreshCoveringIndex = "REFRESH INDEX cv1 ON mys3.default.http_logs"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(refreshCoveringIndex)); + indexDetails = SQLQueryUtils.extractIndexDetails(refreshCoveringIndex); + fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertEquals("cv1", indexDetails.getIndexName()); + Assertions.assertNotNull(fullyQualifiedTableName); + Assertions.assertEquals(FlintIndexType.COVERING, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.REFRESH, indexDetails.getIndexQueryActionType()); + + String refreshMV = "REFRESH MATERIALIZED VIEW mv1"; + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(refreshMV)); + indexDetails = SQLQueryUtils.extractIndexDetails(refreshMV); + fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName(); + Assertions.assertNull(indexDetails.getIndexName()); + Assertions.assertEquals("mv1", indexDetails.getMvName()); Assertions.assertNull(fullyQualifiedTableName); - Assertions.assertEquals("mv_1", indexDetails.getMvName()); + Assertions.assertEquals(FlintIndexType.MATERIALIZED_VIEW, indexDetails.getIndexType()); + Assertions.assertEquals(IndexQueryActionType.REFRESH, indexDetails.getIndexQueryActionType()); } /** https://github.com/opensearch-project/sql/issues/2206 */