From 2cc0dc3341d6c3758ded697c22dad582ee3636fe Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Tue, 12 Mar 2024 12:28:16 -0700 Subject: [PATCH] FlintIndexMetadataReader refactoring Signed-off-by: Vamsi Manohar --- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 20 ++ spark/src/main/antlr/SparkSqlBase.g4 | 1 + spark/src/main/antlr/SqlBaseLexer.g4 | 14 +- spark/src/main/antlr/SqlBaseParser.g4 | 3 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 14 +- .../spark/dispatcher/RefreshQueryHandler.java | 15 +- .../sql/spark/flint/FlintIndexMetadata.java | 30 +-- .../spark/flint/FlintIndexMetadataReader.java | 21 +- .../flint/FlintIndexMetadataReaderImpl.java | 107 +++++++- .../flint/model/FlintIndexDetailsRequest.java | 56 ++++ .../config/AsyncExecutorServiceModule.java | 5 +- .../AsyncQueryExecutorServiceSpec.java | 63 +---- .../AsyncQueryGetResultSpecTest.java | 3 +- .../spark/asyncquery/IndexQuerySpecTest.java | 78 +++++- .../asyncquery/model/MockFlintIndex.java | 31 +++ .../asyncquery/model/MockFlintSparkJob.java | 77 ++++++ .../spark/dispatcher/IndexDMLHandlerTest.java | 75 ++++++ .../FlintIndexMetadataReaderImplTest.java | 248 +++++++++++++++--- .../FlintIndexMetadataReaderSpecTest.java | 163 ++++++++++++ .../spark/flint/FlintIndexMetadataTest.java | 85 ------ .../model/FlintIndexDetailsRequestTest.java | 82 ++++++ .../opensearch/sql/spark/utils/TestUtils.java | 20 ++ ...logs_covering_corrupted_index_mapping.json | 33 +++ ...mydb_http_logs_covering_index_mapping.json | 39 +++ ...mydb_http_logs_skipping_index_mapping.json | 39 +++ .../flint_my_glue_mydb_mv_mapping.json | 33 +++ ...mys3_default_http_logs_skipping_index.json | 23 +- 27 files changed, 1116 insertions(+), 262 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderSpecTest.java delete mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java create mode 100644 spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index 219bbe782b..4ecef6a697 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -26,6 +26,7 @@ skippingIndexStatement : createSkippingIndexStatement | refreshSkippingIndexStatement | describeSkippingIndexStatement + | alterSkippingIndexStatement | dropSkippingIndexStatement | vacuumSkippingIndexStatement ; @@ -46,6 +47,12 @@ describeSkippingIndexStatement : (DESC | DESCRIBE) SKIPPING INDEX ON tableName ; +alterSkippingIndexStatement + : ALTER SKIPPING INDEX + ON tableName + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName ; @@ -59,6 +66,7 @@ coveringIndexStatement | refreshCoveringIndexStatement | showCoveringIndexStatement | describeCoveringIndexStatement + | alterCoveringIndexStatement | dropCoveringIndexStatement | vacuumCoveringIndexStatement ; @@ -83,6 +91,12 @@ describeCoveringIndexStatement : (DESC | DESCRIBE) INDEX indexName ON tableName ; +alterCoveringIndexStatement + : ALTER INDEX indexName + ON tableName + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; @@ -96,6 +110,7 @@ materializedViewStatement | refreshMaterializedViewStatement | showMaterializedViewStatement | describeMaterializedViewStatement + | alterMaterializedViewStatement | dropMaterializedViewStatement | vacuumMaterializedViewStatement ; @@ -118,6 +133,11 @@ describeMaterializedViewStatement : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier ; +alterMaterializedViewStatement + : ALTER MATERIALIZED VIEW mvName=multipartIdentifier + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 01f45016d6..cb58e97e76 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -155,6 +155,7 @@ DOT: '.'; AS: 'AS'; +ALTER: 'ALTER'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index 174887def6..7c376e2268 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -79,6 +79,7 @@ COMMA: ','; DOT: '.'; LEFT_BRACKET: '['; RIGHT_BRACKET: ']'; +BANG: '!'; // NOTE: If you add a new token in the list below, you should update the list of keywords // and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND'; NANOSECONDS: 'NANOSECONDS'; NATURAL: 'NATURAL'; NO: 'NO'; -NOT: 'NOT' | '!'; +NOT: 'NOT'; NULL: 'NULL'; NULLS: 'NULLS'; NUMERIC: 'NUMERIC'; @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? ; +// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message: +// * Unicode letters rather than a-z and A-Z only +// * URI paths for table references using paths +// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser. IDENTIFIER - : (LETTER | DIGIT | '_')+ + : (UNICODE_LETTER | DIGIT | '_')+ + | UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+ ; BACKQUOTED_IDENTIFIER @@ -535,6 +541,10 @@ fragment LETTER : [A-Z] ; +fragment UNICODE_LETTER + : [\p{L}] + ; + SIMPLE_COMMENT : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) ; diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 801cc62491..41a5ec241c 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -388,6 +388,7 @@ describeFuncName | comparisonOperator | arithmeticOperator | predicateOperator + | BANG ; describeColName @@ -946,7 +947,7 @@ expressionSeq ; booleanExpression - : NOT booleanExpression #logicalNot + : (NOT | BANG) booleanExpression #logicalNot | EXISTS LEFT_PAREN query RIGHT_PAREN #exists | valueExpression predicate? #predicated | left=booleanExpression operator=AND right=booleanExpression #logicalBinary diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index f153e94713..5f49f6eb15 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; import com.amazonaws.services.emrserverless.model.JobRunState; +import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,6 +29,7 @@ import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete; @@ -59,7 +61,17 @@ public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); IndexQueryDetails indexDetails = context.getIndexQueryDetails(); - FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexDetails.openSearchIndexName()) + .build()); + if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) { + throw new IllegalStateException( + String.format( + "Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName())); + } + FlintIndexMetadata indexMetadata = indexMetadataMap.get(indexDetails.openSearchIndexName()); // if index is created without auto refresh. there is no job to cancel. String status = JobRunState.FAILED.toString(); String error = ""; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index 0528a189f0..e0e60bfae1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.dispatcher; +import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -15,6 +16,7 @@ import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; import org.opensearch.sql.spark.leasemanager.LeaseManager; @@ -42,8 +44,17 @@ public RefreshQueryHandler( @Override public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { String datasourceName = asyncQueryJobMetadata.getDatasourceName(); - FlintIndexMetadata indexMetadata = - flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName()); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(asyncQueryJobMetadata.getIndexName()) + .build()); + if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) { + throw new IllegalStateException( + String.format( + "Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName())); + } + FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName()); FlintIndexOp jobCancelOp = new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient); jobCancelOp.apply(indexMetadata); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java index 1721263bf8..45653e9b1c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -5,42 +5,22 @@ package org.opensearch.sql.spark.flint; -import java.util.Locale; -import java.util.Map; import java.util.Optional; +import lombok.Builder; import lombok.Data; @Data +@Builder public class FlintIndexMetadata { - public static final String PROPERTIES_KEY = "properties"; - public static final String ENV_KEY = "env"; - public static final String OPTIONS_KEY = "options"; - - public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID"; - public static final String AUTO_REFRESH = "auto_refresh"; - public static final String AUTO_REFRESH_DEFAULT = "false"; - - public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID"; - public static final String FLINT_INDEX_STATE_DOC_ID = "latestId"; private final String jobId; private final boolean autoRefresh; private final String appId; private final String latestId; + private final FlintIndexStateModel indexStateModel; - public static FlintIndexMetadata fromMetatdata(Map metaMap) { - Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); - Map envMap = (Map) propertiesMap.get(ENV_KEY); - Map options = (Map) metaMap.get(OPTIONS_KEY); - String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); - - boolean autoRefresh = - !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) - .toLowerCase(Locale.ROOT) - .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); - String appId = (String) envMap.getOrDefault(APP_ID, null); - String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null); - return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId); + public Optional getIndexState() { + return Optional.ofNullable(indexStateModel); } public Optional getLatestId() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java index 8833665570..9782ce57e8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java @@ -1,23 +1,18 @@ package org.opensearch.sql.spark.flint; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import java.util.Map; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; /** Interface for FlintIndexMetadataReader */ public interface FlintIndexMetadataReader { /** - * Given Index details, get the streaming job Id. + * Retrieves a map of {@link FlintIndexMetadata} instances matching the specified index pattern. * - * @param indexQueryDetails indexDetails. - * @return FlintIndexMetadata. + * @param flintIndexDetailsRequest {@link FlintIndexDetailsRequest} + * @return A map of {@link FlintIndexMetadata} instances against indexName, each providing + * metadata access for a matched index. Returns an empty list if no indices match the pattern. */ - FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails); - - /** - * Given Index name, get the streaming job Id. - * - * @param indexName indexName. - * @return FlintIndexMetadata. - */ - FlintIndexMetadata getFlintIndexMetadata(String indexName); + Map getFlintIndexMetadata( + FlintIndexDetailsRequest flintIndexDetailsRequest); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java index d6e07fba8a..768330df1b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java @@ -1,33 +1,112 @@ package org.opensearch.sql.spark.flint; +import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Optional; import lombok.AllArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; /** Implementation of {@link FlintIndexMetadataReader} */ @AllArgsConstructor public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { + private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataReaderImpl.class); + public static final String PROPERTIES_KEY = "properties"; + public static final String ENV_KEY = "env"; + public static final String OPTIONS_KEY = "options"; + + public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID"; + public static final String AUTO_REFRESH = "auto_refresh"; + public static final String AUTO_REFRESH_DEFAULT = "false"; + + public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID"; + public static final String FLINT_INDEX_STATE_DOC_ID = "latestId"; + private final Client client; - @Override - public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) { - return getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); - } + private final StateStore stateStore; @Override - public FlintIndexMetadata getFlintIndexMetadata(String indexName) { + public Map getFlintIndexMetadata( + FlintIndexDetailsRequest flintIndexDetailsRequest) { GetMappingsResponse mappingsResponse = - client.admin().indices().prepareGetMappings(indexName).get(); - try { - MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName); - Map mappingSourceMap = mappingMetadata.getSourceAsMap(); - return FlintIndexMetadata.fromMetatdata((Map) mappingSourceMap.get("_meta")); - } catch (NullPointerException npe) { - throw new IllegalArgumentException("Provided Index doesn't exist"); + client + .admin() + .indices() + .prepareGetMappings() + .setIndices(flintIndexDetailsRequest.getIndexPattern()) + .get(); + Map indexMetadataMap = new HashMap<>(); + mappingsResponse + .getMappings() + .forEach( + (indexName, mappingMetadata) -> { + try { + Map mappingSourceMap = mappingMetadata.getSourceAsMap(); + FlintIndexMetadata metadata = + fromMetadata( + (Map) mappingSourceMap.get("_meta"), + flintIndexDetailsRequest.isIndexStateRequired(), + flintIndexDetailsRequest.getDataSourceName()); + indexMetadataMap.put(indexName, metadata); + } catch (Exception exception) { + LOGGER.error( + "Exception while building index details for index: {} due to: {}", + indexName, + exception.getMessage()); + } + }); + return indexMetadataMap; + } + + public FlintIndexMetadata fromMetadata( + Map metaMap, Boolean isIndexStateRequired, String datasourceName) { + FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder = + FlintIndexMetadata.builder(); + String latestId = buildFlintIndexMetadataAndReturnLatestId(flintIndexMetadataBuilder, metaMap); + if (isIndexStateRequired) { + buildFlintIndexState(flintIndexMetadataBuilder, datasourceName, latestId); + } + return flintIndexMetadataBuilder.build(); + } + + private static String buildFlintIndexMetadataAndReturnLatestId( + FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder, + Map metaMap) { + Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); + Map envMap = (Map) propertiesMap.get(ENV_KEY); + Map options = (Map) metaMap.get(OPTIONS_KEY); + String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); + boolean autoRefresh = + !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) + .toLowerCase(Locale.ROOT) + .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); + String appId = (String) envMap.getOrDefault(APP_ID, null); + String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null); + flintIndexMetadataBuilder.jobId(jobId); + flintIndexMetadataBuilder.autoRefresh(autoRefresh); + flintIndexMetadataBuilder.appId(appId); + flintIndexMetadataBuilder.latestId(latestId); + return latestId; + } + + private void buildFlintIndexState( + FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder, + String datasourceName, + String latestId) { + Optional flintIndexStateModelOptional = + StateStore.getFlintIndexState(stateStore, datasourceName).apply(latestId); + if (flintIndexStateModelOptional.isPresent()) { + flintIndexMetadataBuilder.indexStateModel(flintIndexStateModelOptional.get()); + } else { + throw new IllegalStateException( + String.format("Couldn't fetch index state of flint index with latestId : %s", latestId)); } } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java b/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java new file mode 100644 index 0000000000..c3d024636e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.model; + +import lombok.Getter; + +/** Request object for fetching Flint Index details. */ +@Getter +public class FlintIndexDetailsRequest { + private String indexPattern; + private boolean isIndexStateRequired; + private String dataSourceName; + + private FlintIndexDetailsRequest(Builder builder) { + this.indexPattern = builder.indexPattern; + this.isIndexStateRequired = builder.isIndexStateRequired; + this.dataSourceName = builder.dataSourceName; + } + + public static class Builder { + private String indexPattern; + private boolean isIndexStateRequired = false; + private String dataSourceName; + + public Builder() {} + + public Builder indexPattern(String indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + public Builder isIndexStateRequired(boolean isIndexStateRequired) { + this.isIndexStateRequired = isIndexStateRequired; + return this; + } + + public Builder dataSourceName(String dataSourceName) { + this.dataSourceName = dataSourceName; + return this; + } + + public FlintIndexDetailsRequest build() { + if (indexPattern == null || indexPattern.isEmpty()) { + throw new IllegalArgumentException("indexPattern cannot be null or empty"); + } + if (isIndexStateRequired && (dataSourceName == null || dataSourceName.isEmpty())) { + throw new IllegalStateException( + "dataSourceName is required when isIndexStateRequired is true"); + } + return new FlintIndexDetailsRequest(this); + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index d88c1dd9df..865507765d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -113,8 +113,9 @@ public SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier(Set @Provides @Singleton - public FlintIndexMetadataReaderImpl flintIndexMetadataReader(NodeClient client) { - return new FlintIndexMetadataReaderImpl(client); + public FlintIndexMetadataReaderImpl flintIndexMetadataReader( + NodeClient client, StateStore stateStore) { + return new FlintIndexMetadataReaderImpl(client, stateStore); } @Provides diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 725080bbcd..9131119f00 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -42,7 +42,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -66,8 +65,6 @@ import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -210,7 +207,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( this.dataSourceService, new DataSourceUserAuthorizationHelperImpl(client), jobExecutionResponseReader, - new FlintIndexMetadataReaderImpl(client), + new FlintIndexMetadataReaderImpl(client, stateStore), client, new SessionManager(stateStore, emrServerlessClientFactory, pluginSettings), new DefaultLeaseManager(pluginSettings, stateStore), @@ -330,64 +327,6 @@ public String loadResultIndexMappings() { return Resources.toString(url, Charsets.UTF_8); } - public class MockFlintSparkJob { - - private FlintIndexStateModel stateModel; - - public MockFlintSparkJob(String latestId) { - assertNotNull(latestId); - stateModel = - new FlintIndexStateModel( - FlintIndexState.EMPTY, - "mockAppId", - "mockJobId", - latestId, - DATASOURCE, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); - } - - public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.REFRESHING); - } - - public void cancelling() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.CANCELLING); - } - - public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.ACTIVE); - } - - public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETING); - } - - public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETED); - } - - void assertState(FlintIndexState expected) { - Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); - assertEquals(expected, stateModelOpt.get().getIndexState()); - } - } - @RequiredArgsConstructor public class FlintDatasetMock { final String query; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 4ec5d4d80b..3acbfc439c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -26,6 +26,7 @@ import org.opensearch.sql.protocol.response.format.ResponseFormatter; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; @@ -52,7 +53,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Before public void doSetUp() { - mockIndexState = new MockFlintSparkJob(mockIndex.latestId); + mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, DATASOURCE); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 9ba15c250e..5dd1d79057 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -11,7 +11,10 @@ import com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -246,7 +249,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -307,7 +311,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state in refresh state. - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -356,7 +361,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1. drop index @@ -404,7 +410,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.cancelling(); // 1. drop index @@ -460,7 +467,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.active(); // 1. drop index @@ -511,7 +519,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.deleted(); // 1. drop index @@ -562,7 +571,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.deleting(); // 1. drop index @@ -618,7 +628,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); // 1. drop index CreateAsyncQueryResponse response = @@ -695,7 +706,8 @@ public void concurrentRefreshJobLimitNotApplied() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto refresh @@ -719,7 +731,8 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -746,7 +759,8 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -772,7 +786,8 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); CreateAsyncQueryResponse asyncQueryResponse = @@ -845,7 +860,8 @@ public GetJobRunResult getJobRunResult( // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = @@ -865,4 +881,40 @@ public GetJobRunResult getJobRunResult( flintIndexJob.assertState(FlintIndexState.ACTIVE); }); } + + @Test + public void cancelRefreshStatementWithFailureInFetchingIndexMetadata() { + String indexName = "flint_my_glue_mydb_http_logs_covering_corrupted_index"; + MockFlintIndex mockFlintIndex = new MockFlintIndex(client(), indexName); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService( + () -> + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); + } + }); + + mockFlintIndex.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, indexName + "_latest_id", DATASOURCE); + + // 1. Submit REFRESH statement + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "REFRESH INDEX covering_corrupted ON my_glue.mydb.http_logs", + DATASOURCE, + LangType.SQL, + null)); + // mock index state. + flintIndexJob.refreshing(); + + // 2. Cancel query + Assertions.assertThrows( + IllegalStateException.class, + () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java new file mode 100644 index 0000000000..09508572d8 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import lombok.SneakyThrows; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.utils.TestUtils; + +public class MockFlintIndex { + private final String indexName; + private final Client client; + + public MockFlintIndex(Client client, String indexName) { + this.client = client; + this.indexName = indexName; + } + + public void createIndex() { + String mappingFile = String.format("flint-index-mappings/%s_mapping.json", indexName); + TestUtils.createIndexWithMappings(client, indexName, mappingFile); + } + + @SneakyThrows + public void deleteIndex() { + client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java new file mode 100644 index 0000000000..459e6e4762 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Optional; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class MockFlintSparkJob { + private FlintIndexStateModel stateModel; + private StateStore stateStore; + private String datasource; + + public MockFlintSparkJob(StateStore stateStore, String latestId, String datasource) { + assertNotNull(latestId); + this.stateStore = stateStore; + this.datasource = datasource; + stateModel = + new FlintIndexStateModel( + FlintIndexState.EMPTY, + "mockAppId", + "mockJobId", + latestId, + datasource, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + stateModel = StateStore.createFlintIndexState(stateStore, datasource).apply(stateModel); + } + + public void refreshing() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.REFRESHING); + } + + public void cancelling() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.CANCELLING); + } + + public void active() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.ACTIVE); + } + + public void deleting() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.DELETING); + } + + public void deleted() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.DELETED); + } + + public void assertState(FlintIndexState expected) { + Optional stateModelOpt = + StateStore.getFlintIndexState(stateStore, datasource).apply(stateModel.getId()); + assertTrue((stateModelOpt.isPresent())); + assertEquals(expected, stateModelOpt.get().getIndexState()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index ec82488749..7f36112bd4 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -6,13 +6,44 @@ package org.opensearch.sql.spark.dispatcher; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.opensearch.sql.datasource.model.DataSourceStatus.ACTIVE; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE; +import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import java.util.HashMap; import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.Client; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.LangType; +@ExtendWith(MockitoExtension.class) class IndexDMLHandlerTest { + + @Mock private EMRServerlessClient emrServerlessClient; + @Mock private Client client; + @Mock private JobExecutionResponseReader jobExecutionResponseReader; + @Mock private FlintIndexMetadataReader flintIndexMetadataReader; + @Mock private StateStore stateStore; + @Test public void getResponseFromExecutor() { JSONObject result = @@ -21,4 +52,48 @@ public void getResponseFromExecutor() { assertEquals("running", result.getString(STATUS_FIELD)); assertEquals("", result.getString(ERROR_FIELD)); } + + @Test + public void testWhenIndexDetailsAreNotFound() { + IndexDMLHandler indexDMLHandler = + new IndexDMLHandler( + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataReader, + client, + stateStore); + DispatchQueryRequest dispatchQueryRequest = + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + "DROP INDEX", + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("mys3") + .setDescription("test description") + .setConnector(DataSourceType.S3GLUE) + .setDataSourceStatus(ACTIVE) + .build(); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .mvName("mys3.default.http_logs_metrics") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build(); + DispatchQueryContext dispatchQueryContext = + DispatchQueryContext.builder() + .dataSourceMetadata(metadata) + .indexQueryDetails(indexQueryDetails) + .build(); + Mockito.when(flintIndexMetadataReader.getFlintIndexMetadata(any())).thenReturn(new HashMap<>()); + IllegalStateException illegalStateException = + Assertions.assertThrows( + IllegalStateException.class, + () -> indexDMLHandler.submit(dispatchQueryRequest, dispatchQueryContext)); + Assertions.assertEquals( + "Couldn't fetch flint index: flint_mys3_default_http_logs_metrics details", + illegalStateException.getMessage()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java index 4d809c31dc..0bf3805a1a 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java @@ -1,15 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.spark.flint; import static org.mockito.Answers.RETURNS_DEEP_STUBS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; import com.google.common.base.Charsets; import com.google.common.io.Resources; import java.io.IOException; import java.net.URL; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -24,15 +33,20 @@ import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; @ExtendWith(MockitoExtension.class) public class FlintIndexMetadataReaderImplTest { @Mock(answer = RETURNS_DEEP_STUBS) private Client client; + @Mock private StateStore stateStore; + @SneakyThrows @Test void testGetJobIdFromFlintSkippingIndexMetadata() { @@ -42,16 +56,131 @@ void testGetJobIdFromFlintSkippingIndexMetadata() { String mappings = Resources.toString(url, Charsets.UTF_8); String indexName = "flint_mys3_default_http_logs_skipping_index"; mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - FlintIndexMetadata indexMetadata = + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) + .build()); + Assertions.assertEquals( + "00fhelvq7peuao0", + indexMetadataMap.get(indexQueryDetails.openSearchIndexName()).getJobId()); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintSkippingIndexMetadataWithIndexState() { + URL url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_skipping_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + String latestId = "ZmxpbnRfdmFtc2lfZ2x1ZV92YW1zaWRiX2h0dHBfbG9nc19za2lwcGluZ19pbmRleA=="; + FlintIndexStateModel stateModel = + new FlintIndexStateModel( + ACTIVE, + "mockAppId", + "00fhelvq7peuao0m", + latestId, + "mys3", + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + when(stateStore.get(eq(latestId), any(), eq(".query_execution_request_mys3"))) + .thenReturn(Optional.of(stateModel)); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) + .isIndexStateRequired(true) + .dataSourceName("mys3") + .build()); + FlintIndexMetadata metadata = indexMetadataMap.get(indexQueryDetails.openSearchIndexName()); + Assertions.assertEquals("00fhelvq7peuao0", metadata.getJobId()); + Assertions.assertNotNull(metadata.getIndexState()); + Assertions.assertTrue(metadata.getIndexState().isPresent()); + Assertions.assertEquals(ACTIVE, metadata.getIndexState().get().getIndexState()); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintSkippingIndexMetadataWithException() { + URL url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_skipping_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + String latestId = "ZmxpbnRfdmFtc2lfZ2x1ZV92YW1zaWRiX2h0dHBfbG9nc19za2lwcGluZ19pbmRleA=="; + when(stateStore.get(eq(latestId), any(), eq(".query_execution_request_mys3"))) + .thenThrow(new RuntimeException("Couldn't fetch index state")); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) + .isIndexStateRequired(true) + .dataSourceName("mys3") + .build()); + Assertions.assertFalse(indexMetadataMap.containsKey(indexQueryDetails.openSearchIndexName())); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintSkippingIndexMetadataWithEmptyIndexStateModel() { + URL url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_skipping_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + String latestId = "ZmxpbnRfdmFtc2lfZ2x1ZV92YW1zaWRiX2h0dHBfbG9nc19za2lwcGluZ19pbmRleA=="; + when(stateStore.get(eq(latestId), any(), eq(".query_execution_request_mys3"))) + .thenReturn(Optional.empty()); + Map indexMetadataMap = flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.SKIPPING) + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) + .isIndexStateRequired(true) + .dataSourceName("mys3") .build()); - Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); + Assertions.assertFalse(indexMetadataMap.containsKey(indexQueryDetails.openSearchIndexName())); } @SneakyThrows @@ -62,17 +191,24 @@ void testGetJobIdFromFlintCoveringIndexMetadata() { String mappings = Resources.toString(url, Charsets.UTF_8); String indexName = "flint_mys3_default_http_logs_cv1_index"; mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - FlintIndexMetadata indexMetadata = + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.COVERING) + .build(); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + Map indexMetadataMap = flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.COVERING) + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) .build()); - Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); + Assertions.assertEquals( + "00fdmvv9hp8u0o0q", + indexMetadataMap.get(indexQueryDetails.openSearchIndexName()).getJobId()); } @SneakyThrows @@ -82,30 +218,74 @@ void testGetJobIDWithNPEException() { String mappings = Resources.toString(url, Charsets.UTF_8); String indexName = "flint_mys3_default_http_logs_cv1_index"; mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - IllegalArgumentException illegalArgumentException = - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName( - new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.COVERING) - .build())); - Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .autoRefresh(false) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.COVERING) + .build(); + Map flintIndexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern(indexQueryDetails.openSearchIndexName()) + .build()); + Assertions.assertFalse( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_cv1_index")); + } + + @SneakyThrows + @Test + void testGetJobIDWithNPEExceptionForMultipleIndices() { + HashMap indexMappingsMap = new HashMap<>(); + URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + indexMappingsMap.put(indexName, mappings); + url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + mappings = Resources.toString(url, Charsets.UTF_8); + indexName = "flint_mys3_default_http_logs_skipping_index"; + indexMappingsMap.put(indexName, mappings); + mockNodeClientIndicesMappings("flint_mys3*", indexMappingsMap); + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + Map flintIndexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder().indexPattern("flint_mys3*").build()); + Assertions.assertFalse( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_cv1_index")); + Assertions.assertTrue( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_skipping_index")); } @SneakyThrows public void mockNodeClientIndicesMappings(String indexName, String mappings) { GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); - when(client.admin().indices().prepareGetMappings(any()).get()).thenReturn(mockResponse); + when(client.admin().indices().prepareGetMappings().setIndices(indexName).get()) + .thenReturn(mockResponse); Map metadata; metadata = Map.of(indexName, IndexMetadata.fromXContent(createParser(mappings)).mapping()); - when(mockResponse.mappings()).thenReturn(metadata); + when(mockResponse.getMappings()).thenReturn(metadata); + } + + @SneakyThrows + public void mockNodeClientIndicesMappings( + String indexPattern, HashMap indexMappingsMap) { + GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); + when(client.admin().indices().prepareGetMappings().setIndices(indexPattern).get()) + .thenReturn(mockResponse); + Map metadataMap = new HashMap<>(); + for (String indexName : indexMappingsMap.keySet()) { + metadataMap.put( + indexName, + IndexMetadata.fromXContent(createParser(indexMappingsMap.get(indexName))).mapping()); + } + when(mockResponse.getMappings()).thenReturn(metadataMap); } private XContentParser createParser(String mappings) throws IOException { diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderSpecTest.java new file mode 100644 index 0000000000..1e589e38b3 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderSpecTest.java @@ -0,0 +1,163 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; +import static org.opensearch.sql.spark.flint.FlintIndexState.CANCELLING; +import static org.opensearch.sql.spark.flint.FlintIndexState.REFRESHING; + +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.model.FlintIndexDetailsRequest; +import org.opensearch.test.OpenSearchIntegTestCase; + +public class FlintIndexMetadataReaderSpecTest extends OpenSearchIntegTestCase { + protected ClusterService clusterService; + protected NodeClient client; + protected StateStore stateStore; + private MockFlintIndex skippingIndex; + private MockFlintSparkJob skippingIndexState; + private MockFlintIndex coveringIndex; + private MockFlintSparkJob coveringIndexState; + private MockFlintIndex mvIndex; + private MockFlintSparkJob mvIndexState; + + @Before + public void setup() { + clusterService = clusterService(); + client = (NodeClient) cluster().client(); + stateStore = new StateStore(client, clusterService); + + skippingIndex = new MockFlintIndex(client, "flint_my_glue_mydb_http_logs_skipping_index"); + skippingIndexState = + new MockFlintSparkJob( + stateStore, "flint_my_glue_mydb_http_logs_skipping_index_latest_id", "my_glue"); + coveringIndex = new MockFlintIndex(client, "flint_my_glue_mydb_http_logs_covering_index"); + coveringIndexState = + new MockFlintSparkJob( + stateStore, "flint_my_glue_mydb_http_logs_covering_index_latest_id", "my_glue"); + mvIndex = new MockFlintIndex(client, "flint_my_glue_mydb_mv"); + mvIndexState = new MockFlintSparkJob(stateStore, "flint_my_glue_mydb_mv_latest_id", "my_glue"); + } + + @Test + public void testGetFlintIndexMetadataWithIndexState() { + + skippingIndex.createIndex(); + skippingIndexState.cancelling(); + coveringIndex.createIndex(); + coveringIndexState.refreshing(); + mvIndex.createIndex(); + mvIndexState.active(); + try { + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder() + .indexPattern("flint_my_glue*") + .isIndexStateRequired(true) + .dataSourceName("my_glue") + .build()); + Assertions.assertEquals(3, indexMetadataMap.size()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_skipping_index")); + Assertions.assertTrue( + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_skipping_index") + .getIndexState() + .isPresent()); + Assertions.assertEquals( + CANCELLING, + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_skipping_index") + .getIndexState() + .get() + .getIndexState()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_covering_index")); + Assertions.assertTrue( + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_covering_index") + .getIndexState() + .isPresent()); + Assertions.assertEquals( + REFRESHING, + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_covering_index") + .getIndexState() + .get() + .getIndexState()); + + Assertions.assertTrue(indexMetadataMap.containsKey("flint_my_glue_mydb_mv")); + Assertions.assertTrue( + indexMetadataMap.get("flint_my_glue_mydb_mv").getIndexState().isPresent()); + Assertions.assertEquals( + ACTIVE, + indexMetadataMap.get("flint_my_glue_mydb_mv").getIndexState().get().getIndexState()); + } finally { + skippingIndex.deleteIndex(); + skippingIndexState.deleted(); + coveringIndex.deleteIndex(); + coveringIndexState.deleted(); + mvIndex.deleteIndex(); + mvIndexState.deleted(); + } + } + + @Test + public void testGetFlintIndexMetadataWithOutIndexState() { + skippingIndex.createIndex(); + skippingIndexState.cancelling(); + coveringIndex.createIndex(); + coveringIndexState.refreshing(); + mvIndex.createIndex(); + mvIndexState.active(); + try { + FlintIndexMetadataReader flintIndexMetadataReader = + new FlintIndexMetadataReaderImpl(client, stateStore); + Map indexMetadataMap = + flintIndexMetadataReader.getFlintIndexMetadata( + new FlintIndexDetailsRequest.Builder().indexPattern("flint_my_glue*").build()); + Assertions.assertEquals(3, indexMetadataMap.size()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_skipping_index")); + Assertions.assertFalse( + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_skipping_index") + .getIndexState() + .isPresent()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_covering_index")); + Assertions.assertFalse( + indexMetadataMap + .get("flint_my_glue_mydb_http_logs_covering_index") + .getIndexState() + .isPresent()); + Assertions.assertTrue(indexMetadataMap.containsKey("flint_my_glue_mydb_mv")); + Assertions.assertFalse( + indexMetadataMap.get("flint_my_glue_mydb_mv").getIndexState().isPresent()); + } finally { + skippingIndex.deleteIndex(); + skippingIndexState.deleted(); + coveringIndex.deleteIndex(); + coveringIndexState.deleted(); + mvIndex.deleteIndex(); + mvIndexState.deleted(); + } + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java deleted file mode 100644 index 808b80766e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.AUTO_REFRESH; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; - -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.Test; - -public class FlintIndexMetadataTest { - - @Test - public void testAutoRefreshSetToTrue() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "true") - .metadata()); - assertTrue(indexMetadata.isAutoRefresh()); - } - - @Test - public void testAutoRefreshSetToFalse() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "false") - .metadata()); - assertFalse(indexMetadata.isAutoRefresh()); - } - - @Test - public void testWithOutAutoRefresh() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "false") - .metadata()); - assertFalse(indexMetadata.isAutoRefresh()); - } - - static class Metadata { - private final Map properties; - private final Map env; - private final Map options; - - private Metadata() { - properties = new HashMap<>(); - env = new HashMap<>(); - options = new HashMap<>(); - } - - public Metadata addEnv(String key, String value) { - env.put(key, value); - return this; - } - - public Metadata addOptions(String key, String value) { - options.put(key, value); - return this; - } - - public Map metadata() { - Map result = new HashMap<>(); - properties.put(ENV_KEY, env); - result.put(OPTIONS_KEY, options); - result.put(PROPERTIES_KEY, properties); - return result; - } - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java new file mode 100644 index 0000000000..9eb8402eaa --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java @@ -0,0 +1,82 @@ +package org.opensearch.sql.spark.flint.model; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class FlintIndexDetailsRequestTest { + + @Test + void buildWithMinimumRequiredFields() { + FlintIndexDetailsRequest request = + new FlintIndexDetailsRequest.Builder().indexPattern("test-pattern").build(); + + assertNotNull(request); + assertEquals("test-pattern", request.getIndexPattern()); + assertFalse(request.isIndexStateRequired()); + assertNull(request.getDataSourceName()); + } + + @Test + void buildWithAllFields() { + FlintIndexDetailsRequest request = + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName("test-source") + .build(); + + assertNotNull(request); + assertEquals("test-pattern", request.getIndexPattern()); + assertTrue(request.isIndexStateRequired()); + assertEquals("test-source", request.getDataSourceName()); + } + + @Test + void throwExceptionWhenIndexPatternIsNull() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> new FlintIndexDetailsRequest.Builder().indexPattern(null).build()); + assertEquals("indexPattern cannot be null or empty", exception.getMessage()); + } + + @Test + void throwExceptionWhenIndexPatternIsEmpty() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> new FlintIndexDetailsRequest.Builder().indexPattern("").build()); + assertEquals("indexPattern cannot be null or empty", exception.getMessage()); + } + + @Test + void throwExceptionWhenDataSourceNameIsRequiredButNull() { + Exception exception = + assertThrows( + IllegalStateException.class, + () -> + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName(null) + .build()); + assertEquals( + "dataSourceName is required when isIndexStateRequired is true", exception.getMessage()); + } + + @Test + void throwExceptionWhenDataSourceNameIsRequiredButEmpty() { + Exception exception = + assertThrows( + IllegalStateException.class, + () -> + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName("") + .build()); + assertEquals( + "dataSourceName is required when isIndexStateRequired is true", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java b/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java index ca77006d9c..4cab6afa9c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java @@ -5,8 +5,15 @@ package org.opensearch.sql.spark.utils; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; import java.io.IOException; +import java.net.URL; import java.util.Objects; +import lombok.SneakyThrows; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; public class TestUtils { @@ -22,4 +29,17 @@ public static String getJson(String filename) throws IOException { return new String( Objects.requireNonNull(classLoader.getResourceAsStream(filename)).readAllBytes()); } + + @SneakyThrows + public static String loadMappings(String path) { + URL url = Resources.getResource(path); + return Resources.toString(url, Charsets.UTF_8); + } + + public static void createIndexWithMappings( + Client client, String indexName, String metadataFileLocation) { + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.mapping(loadMappings(metadataFileLocation), XContentType.JSON); + client.admin().indices().create(request).actionGet(); + } } diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json new file mode 100644 index 0000000000..90d37c3e79 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json @@ -0,0 +1,33 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_covering_corrupted_index_latest_id", + "kind": "covering", + "indexedColumns": [ + { + "columnType": "string", + "columnName": "clientip" + }, + { + "columnType": "int", + "columnName": "status" + } + ], + "name": "covering", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/cv/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.2.0" + }, + "properties": { + "clientip": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json new file mode 100644 index 0000000000..cb4a6b5366 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json @@ -0,0 +1,39 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_covering_index_latest_id", + "kind": "covering", + "indexedColumns": [ + { + "columnType": "string", + "columnName": "clientip" + }, + { + "columnType": "int", + "columnName": "status" + } + ], + "name": "covering", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/cv/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.2.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhh7frokkf0k0l", + "SERVERLESS_EMR_JOB_ID": "00fhoag6i0671o0m" + } + } + }, + "properties": { + "clientip": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json new file mode 100644 index 0000000000..4ffd73bf9c --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json @@ -0,0 +1,39 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_skipping_index_latest_id", + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "parameters": { + "max_size": "100" + }, + "columnName": "status" + } + ], + "name": "flint_my_glue_mydb_http_logs_skipping_index", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5, \"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/skp/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.3.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhe6d5jpah090l", + "SERVERLESS_EMR_JOB_ID": "00fhelvq7peuao0m" + } + } + }, + "properties": { + "file_path": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json new file mode 100644 index 0000000000..0fcbf299ec --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json @@ -0,0 +1,33 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_mv_latest_id", + "kind": "mv", + "indexedColumns": [ + { + "columnType": "bigint", + "columnName": "counter1" + } + ], + "name": "my_glue.mydb.mv", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/mv/", + "watermark_delay": "10 seconds" + }, + "source": "SELECT count(`@timestamp`) AS `counter1` FROM my_glue.mydb.http_logs GROUP BY TUMBLE (`@timestamp`, '1 second')", + "version": "0.2.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhh7frokkf0k0l", + "SERVERLESS_EMR_JOB_ID": "00fhob01oa7fu00m" + } + } + }, + "properties": { + "counter1": { + "type": "long" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json index 24e14c12ba..1438b257d1 100644 --- a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json @@ -2,23 +2,32 @@ "flint_mys3_default_http_logs_skipping_index": { "mappings": { "_doc": { - "_meta": { + "_meta": { + "latestId": "ZmxpbnRfdmFtc2lfZ2x1ZV92YW1zaWRiX2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==", "kind": "skipping", "indexedColumns": [ { "columnType": "int", "kind": "VALUE_SET", + "parameters": { + "max_size": "100" + }, "columnName": "status" } ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", + "name": "flint_vamsi_glue_vamsidb_http_logs_skipping_index", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/skp/" + }, + "source": "vamsi_glue.vamsidb.http_logs", + "version": "0.3.0", "properties": { "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhe6d5jpah090l", + "SERVERLESS_EMR_JOB_ID": "00fhelvq7peuao0" } } }