From 2c878fbc67de1b18052e1184e10700a2b799defa Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 9 Sep 2024 14:56:05 -0700 Subject: [PATCH] Throw exception for RECOVER INDEX JOB query (#2988) Signed-off-by: Tomoyuki Morita --- .../sql/spark/dispatcher/SparkQueryDispatcher.java | 3 +++ .../spark/dispatcher/model/IndexQueryActionType.java | 3 ++- .../org/opensearch/sql/spark/utils/SQLQueryUtils.java | 7 +++++++ .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 10 ++++++++++ .../opensearch/sql/spark/utils/SQLQueryUtilsTest.java | 9 +++++++++ 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 50e8403d36..732f5f71ab 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -119,6 +119,9 @@ private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery( } else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) { // Manual refresh should be handled by batch handler return queryHandlerFactory.getRefreshQueryHandler(dispatchQueryRequest.getAccountId()); + } else if (IndexQueryActionType.RECOVER.equals(indexQueryDetails.getIndexQueryActionType())) { + // RECOVER INDEX JOB should not be executed from async-query-core + throw new IllegalArgumentException("RECOVER INDEX JOB is not allowed."); } else { return getDefaultAsyncQueryHandler(dispatchQueryRequest.getAccountId()); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java index 96e7d159af..51e0832217 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java @@ -13,5 +13,6 @@ public enum IndexQueryActionType { SHOW, DROP, VACUUM, - ALTER + ALTER, + RECOVER } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index b1a8c3d4f6..7550de2f1e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -26,6 +26,7 @@ import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsLexer; import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser; import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser.MaterializedViewQueryContext; +import org.opensearch.sql.spark.antlr.parser.FlintSparkSqlExtensionsParser.RecoverIndexJobStatementContext; import org.opensearch.sql.spark.antlr.parser.SqlBaseLexer; import org.opensearch.sql.spark.antlr.parser.SqlBaseParser; import org.opensearch.sql.spark.antlr.parser.SqlBaseParser.IdentifierReferenceContext; @@ -386,6 +387,12 @@ public Void visitMaterializedViewQuery(MaterializedViewQueryContext ctx) { return super.visitMaterializedViewQuery(ctx); } + @Override + public Void visitRecoverIndexJobStatement(RecoverIndexJobStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.RECOVER); + return super.visitRecoverIndexJobStatement(ctx); + } + private String propertyKey(FlintSparkSqlExtensionsParser.PropertyKeyContext key) { if (key.STRING() != null) { return key.STRING().getText(); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 9f12ddf323..75c0e00337 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -622,6 +622,16 @@ void testDispatchVacuumIndexQuery() { testDispatchBatchQuery("VACUUM INDEX elb_and_requestUri ON my_glue.default.http_logs"); } + @Test + void testDispatchRecoverIndexQuery() { + String query = "RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`"; + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + sparkQueryDispatcher.dispatch( + getBaseDispatchQueryRequest(query), asyncQueryRequestContext)); + } + @Test void testDispatchWithUnSupportedDataSourceType() { when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index 4608bce74e..56cab7ce7f 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -435,6 +435,15 @@ void testAutoRefresh() { .autoRefresh()); } + @Test + void testRecoverIndex() { + String refreshSkippingIndex = + "RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index`"; + assertTrue(SQLQueryUtils.isFlintExtensionQuery(refreshSkippingIndex)); + IndexQueryDetails indexDetails = SQLQueryUtils.extractIndexDetails(refreshSkippingIndex); + assertEquals(IndexQueryActionType.RECOVER, indexDetails.getIndexQueryActionType()); + } + @Test void testValidateSparkSqlQuery_ValidQuery() { List errors =