diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index 219bbe782b..4ecef6a697 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -26,6 +26,7 @@ skippingIndexStatement : createSkippingIndexStatement | refreshSkippingIndexStatement | describeSkippingIndexStatement + | alterSkippingIndexStatement | dropSkippingIndexStatement | vacuumSkippingIndexStatement ; @@ -46,6 +47,12 @@ describeSkippingIndexStatement : (DESC | DESCRIBE) SKIPPING INDEX ON tableName ; +alterSkippingIndexStatement + : ALTER SKIPPING INDEX + ON tableName + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropSkippingIndexStatement : DROP SKIPPING INDEX ON tableName ; @@ -59,6 +66,7 @@ coveringIndexStatement | refreshCoveringIndexStatement | showCoveringIndexStatement | describeCoveringIndexStatement + | alterCoveringIndexStatement | dropCoveringIndexStatement | vacuumCoveringIndexStatement ; @@ -83,6 +91,12 @@ describeCoveringIndexStatement : (DESC | DESCRIBE) INDEX indexName ON tableName ; +alterCoveringIndexStatement + : ALTER INDEX indexName + ON tableName + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; @@ -96,6 +110,7 @@ materializedViewStatement | refreshMaterializedViewStatement | showMaterializedViewStatement | describeMaterializedViewStatement + | alterMaterializedViewStatement | dropMaterializedViewStatement | vacuumMaterializedViewStatement ; @@ -118,6 +133,11 @@ describeMaterializedViewStatement : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier ; +alterMaterializedViewStatement + : ALTER MATERIALIZED VIEW mvName=multipartIdentifier + WITH LEFT_PAREN propertyList RIGHT_PAREN + ; + dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 01f45016d6..cb58e97e76 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -155,6 +155,7 @@ DOT: '.'; AS: 'AS'; +ALTER: 'ALTER'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; diff --git a/spark/src/main/antlr/SqlBaseLexer.g4 b/spark/src/main/antlr/SqlBaseLexer.g4 index 174887def6..7c376e2268 100644 --- a/spark/src/main/antlr/SqlBaseLexer.g4 +++ b/spark/src/main/antlr/SqlBaseLexer.g4 @@ -79,6 +79,7 @@ COMMA: ','; DOT: '.'; LEFT_BRACKET: '['; RIGHT_BRACKET: ']'; +BANG: '!'; // NOTE: If you add a new token in the list below, you should update the list of keywords // and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND'; NANOSECONDS: 'NANOSECONDS'; NATURAL: 'NATURAL'; NO: 'NO'; -NOT: 'NOT' | '!'; +NOT: 'NOT'; NULL: 'NULL'; NULLS: 'NULLS'; NUMERIC: 'NUMERIC'; @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? ; +// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message: +// * Unicode letters rather than a-z and A-Z only +// * URI paths for table references using paths +// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser. IDENTIFIER - : (LETTER | DIGIT | '_')+ + : (UNICODE_LETTER | DIGIT | '_')+ + | UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+ ; BACKQUOTED_IDENTIFIER @@ -535,6 +541,10 @@ fragment LETTER : [A-Z] ; +fragment UNICODE_LETTER + : [\p{L}] + ; + SIMPLE_COMMENT : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) ; diff --git a/spark/src/main/antlr/SqlBaseParser.g4 b/spark/src/main/antlr/SqlBaseParser.g4 index 801cc62491..41a5ec241c 100644 --- a/spark/src/main/antlr/SqlBaseParser.g4 +++ b/spark/src/main/antlr/SqlBaseParser.g4 @@ -388,6 +388,7 @@ describeFuncName | comparisonOperator | arithmeticOperator | predicateOperator + | BANG ; describeColName @@ -946,7 +947,7 @@ expressionSeq ; booleanExpression - : NOT booleanExpression #logicalNot + : (NOT | BANG) booleanExpression #logicalNot | EXISTS LEFT_PAREN query RIGHT_PAREN #exists | valueExpression predicate? #predicated | left=booleanExpression operator=AND right=booleanExpression #logicalBinary diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index f153e94713..60d57c50df 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -10,7 +10,9 @@ import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; import com.amazonaws.services.emrserverless.model.JobRunState; +import java.util.Map; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; @@ -27,8 +29,9 @@ import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlterOff; import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -37,48 +40,61 @@ @RequiredArgsConstructor public class IndexDMLHandler extends AsyncQueryHandler { private static final Logger LOG = LogManager.getLogger(); - public static final String DROP_INDEX_JOB_ID = "dropIndexJobId"; + public static final String DML_QUERY_JOB_ID = "DMLQueryJobId"; private final EMRServerlessClient emrServerlessClient; private final JobExecutionResponseReader jobExecutionResponseReader; - private final FlintIndexMetadataReader flintIndexMetadataReader; + private final FlintIndexMetadataService flintIndexMetadataService; private final Client client; private final StateStore stateStore; public static boolean isIndexDMLQuery(String jobId) { - return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId); + return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId); } @Override public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); - IndexQueryDetails indexDetails = context.getIndexQueryDetails(); - FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails); - // if index is created without auto refresh. there is no job to cancel. - String status = JobRunState.FAILED.toString(); - String error = ""; - long startTime = 0L; + long startTime = System.currentTimeMillis(); try { - FlintIndexOp jobCancelOp = - new FlintIndexOpCancel( - stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); - jobCancelOp.apply(indexMetadata); - - FlintIndexOp indexDeleteOp = - new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource()); - indexDeleteOp.apply(indexMetadata); - status = JobRunState.SUCCESS.toString(); + IndexQueryDetails indexDetails = context.getIndexQueryDetails(); + FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails); + executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata); + AsyncQueryId asyncQueryId = + storeIndexDMLResult( + dispatchQueryRequest, + dataSourceMetadata, + JobRunState.SUCCESS.toString(), + StringUtils.EMPTY, + startTime); + return new DispatchQueryResponse( + asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null); } catch (Exception e) { - error = e.getMessage(); LOG.error(e); + AsyncQueryId asyncQueryId = + storeIndexDMLResult( + dispatchQueryRequest, + dataSourceMetadata, + JobRunState.FAILED.toString(), + e.getMessage(), + startTime); + return new DispatchQueryResponse( + asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null); } + } + private AsyncQueryId storeIndexDMLResult( + DispatchQueryRequest dispatchQueryRequest, + DataSourceMetadata dataSourceMetadata, + String status, + String error, + long startTime) { AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()); IndexDMLResult indexDMLResult = new IndexDMLResult( @@ -88,10 +104,45 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getDatasource(), System.currentTimeMillis() - startTime, System.currentTimeMillis()); - String resultIndex = dataSourceMetadata.getResultIndex(); - createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult); + createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult); + return asyncQueryId; + } - return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null); + private void executeIndexOp( + DispatchQueryRequest dispatchQueryRequest, + IndexQueryDetails indexQueryDetails, + FlintIndexMetadata indexMetadata) { + switch (indexQueryDetails.getIndexQueryActionType()) { + case DROP: + FlintIndexOp jobCancelOp = + new FlintIndexOpCancel( + stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); + jobCancelOp.apply(indexMetadata); + FlintIndexOp indexDeleteOp = + new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource()); + indexDeleteOp.apply(indexMetadata); + break; + case ALTER: + FlintIndexOpAlterOff flintIndexOpAlterOff = + new FlintIndexOpAlterOff( + indexQueryDetails.getFlintIndexOptions(), + stateStore, + dispatchQueryRequest.getDatasource(), + emrServerlessClient, + flintIndexMetadataService); + flintIndexOpAlterOff.apply(indexMetadata); + } + } + + private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) { + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName()); + if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) { + throw new IllegalStateException( + String.format( + "Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName())); + } + return indexMetadataMap.get(indexDetails.openSearchIndexName()); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index 0528a189f0..d55408f62e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.dispatcher; +import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -14,7 +15,7 @@ import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; import org.opensearch.sql.spark.leasemanager.LeaseManager; @@ -23,18 +24,18 @@ /** Handle Refresh Query. */ public class RefreshQueryHandler extends BatchQueryHandler { - private final FlintIndexMetadataReader flintIndexMetadataReader; + private final FlintIndexMetadataService flintIndexMetadataService; private final StateStore stateStore; private final EMRServerlessClient emrServerlessClient; public RefreshQueryHandler( EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, - FlintIndexMetadataReader flintIndexMetadataReader, + FlintIndexMetadataService flintIndexMetadataService, StateStore stateStore, LeaseManager leaseManager) { super(emrServerlessClient, jobExecutionResponseReader, leaseManager); - this.flintIndexMetadataReader = flintIndexMetadataReader; + this.flintIndexMetadataService = flintIndexMetadataService; this.stateStore = stateStore; this.emrServerlessClient = emrServerlessClient; } @@ -42,8 +43,14 @@ public RefreshQueryHandler( @Override public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { String datasourceName = asyncQueryJobMetadata.getDatasourceName(); - FlintIndexMetadata indexMetadata = - flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName()); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName()); + if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) { + throw new IllegalStateException( + String.format( + "Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName())); + } + FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName()); FlintIndexOp jobCancelOp = new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient); jobCancelOp.apply(indexMetadata); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 2d6a456a61..62ff392e96 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -25,7 +25,7 @@ import org.opensearch.sql.spark.dispatcher.model.JobType; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -48,7 +48,7 @@ public class SparkQueryDispatcher { private JobExecutionResponseReader jobExecutionResponseReader; - private FlintIndexMetadataReader flintIndexMetadataReader; + private FlintIndexMetadataService flintIndexMetadataService; private Client client; @@ -81,10 +81,10 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) fillMissingDetails(dispatchQueryRequest, indexQueryDetails); contextBuilder.indexQueryDetails(indexQueryDetails); - if (IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())) { + if (isEligibleForIndexDMLHandling(indexQueryDetails)) { asyncQueryHandler = createIndexDMLHandler(emrServerlessClient); } else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType()) - && indexQueryDetails.isAutoRefresh()) { + && indexQueryDetails.getFlintIndexOptions().autoRefresh()) { asyncQueryHandler = new StreamingQueryHandler( emrServerlessClient, jobExecutionResponseReader, leaseManager); @@ -94,7 +94,7 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) new RefreshQueryHandler( emrServerlessClient, jobExecutionResponseReader, - flintIndexMetadataReader, + flintIndexMetadataService, stateStore, leaseManager); } @@ -102,6 +102,16 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest) return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build()); } + private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) { + return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType()) + || (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType()) + && (indexQueryDetails + .getFlintIndexOptions() + .getSuppliedOptions() + .containsKey("auto_refresh") + && !indexQueryDetails.getFlintIndexOptions().autoRefresh())); + } + public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) { EMRServerlessClient emrServerlessClient = emrServerlessClientFactory.getClient(); if (asyncQueryJobMetadata.getSessionId() != null) { @@ -128,7 +138,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { new RefreshQueryHandler( emrServerlessClient, jobExecutionResponseReader, - flintIndexMetadataReader, + flintIndexMetadataService, stateStore, leaseManager); } else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) { @@ -145,7 +155,7 @@ private IndexDMLHandler createIndexDMLHandler(EMRServerlessClient emrServerlessC return new IndexDMLHandler( emrServerlessClient, jobExecutionResponseReader, - flintIndexMetadataReader, + flintIndexMetadataService, client, stateStore); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 8cffa8e24a..8170b41c66 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -77,7 +77,7 @@ public DispatchQueryResponse submit( .build() .toString(), tags, - indexQueryDetails.isAutoRefresh(), + indexQueryDetails.getFlintIndexOptions().autoRefresh(), dataSourceMetadata.getResultIndex()); String jobId = emrServerlessClient.startJobRun(startJobRequest); MetricUtils.incrementNumericalMetric(MetricName.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java new file mode 100644 index 0000000000..a5bb90d36a --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.dispatcher.model; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class FlintIndexOptions { + + public static final String AUTO_REFRESH = "auto_refresh"; + public static final String REFRESH_INTERVAL = "refresh_interval"; + public static final String INCREMENTAL_REFRESH = "incremental_refresh"; + public static final String CHECKPOINT_LOCATION = "checkpoint_location"; + public static final String WATERMARK_DELAY = "watermark_delay"; + public static final String OUTPUT_MODE = "output_mode"; + public static final String INDEX_SETTINGS = "index_settings"; + + private Map options = new HashMap<>(); + + public void setOptions(HashMap options) { + for (String key : options.keySet()) { + setOption(key, options.get(key)); + } + } + + public void setOption(String key, String value) { + options.put(key, value); + } + + public Optional getOption(String key) { + return Optional.ofNullable(options.get(key)); + } + + public boolean autoRefresh() { + return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false")); + } + + public Optional refreshInterval() { + return getOption(REFRESH_INTERVAL); + } + + public boolean incrementalRefresh() { + return Boolean.parseBoolean(getOption(INCREMENTAL_REFRESH).orElse("false")); + } + + public Optional checkpointLocation() { + return getOption(CHECKPOINT_LOCATION); + } + + public Optional watermarkDelay() { + return getOption(WATERMARK_DELAY); + } + + public Optional outputMode() { + return getOption(OUTPUT_MODE); + } + + public Optional indexSettings() { + return getOption(INDEX_SETTINGS); + } + + public Map getSuppliedOptions() { + return new HashMap<>(options); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java index 2c96511d2a..93e44f00ea 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryActionType.java @@ -11,5 +11,6 @@ public enum IndexQueryActionType { REFRESH, DESCRIBE, SHOW, - DROP + DROP, + ALTER } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 576b0772d2..7ecd784792 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -22,8 +22,8 @@ public class IndexQueryDetails { private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; - private boolean autoRefresh; private IndexQueryActionType indexQueryActionType; + private FlintIndexOptions flintIndexOptions; // materialized view special case where // table name and mv name are combined. private String mvName; @@ -53,17 +53,17 @@ public IndexQueryDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName return this; } - public IndexQueryDetailsBuilder autoRefresh(Boolean autoRefresh) { - indexQueryDetails.autoRefresh = autoRefresh; - return this; - } - public IndexQueryDetailsBuilder indexQueryActionType( IndexQueryActionType indexQueryActionType) { indexQueryDetails.indexQueryActionType = indexQueryActionType; return this; } + public IndexQueryDetailsBuilder indexOptions(FlintIndexOptions flintIndexOptions) { + indexQueryDetails.flintIndexOptions = flintIndexOptions; + return this; + } + public IndexQueryDetailsBuilder mvName(String mvName) { indexQueryDetails.mvName = mvName; return this; @@ -75,6 +75,9 @@ public IndexQueryDetailsBuilder indexType(FlintIndexType indexType) { } public IndexQueryDetails build() { + if (indexQueryDetails.flintIndexOptions == null) { + indexQueryDetails.flintIndexOptions = new FlintIndexOptions(); + } return indexQueryDetails; } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java index 1721263bf8..61faf00ac5 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadata.java @@ -5,43 +5,33 @@ package org.opensearch.sql.spark.flint; -import java.util.Locale; -import java.util.Map; import java.util.Optional; +import lombok.Builder; import lombok.Data; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; @Data +@Builder public class FlintIndexMetadata { - public static final String PROPERTIES_KEY = "properties"; - public static final String ENV_KEY = "env"; - public static final String OPTIONS_KEY = "options"; + public static final String META = "_meta"; + public static final String LATEST_ID = "latestId"; + public static final String KIND = "kind"; + public static final String INDEXED_COLUMNS = "indexedColumns"; + public static final String NAME = "name"; + public static final String OPTIONS = "options"; + public static final String SOURCE = "source"; + public static final String VERSION = "version"; + public static final String PROPERTIES = "properties"; + public static final String ENV_KEY = "env"; public static final String SERVERLESS_EMR_JOB_ID = "SERVERLESS_EMR_JOB_ID"; - public static final String AUTO_REFRESH = "auto_refresh"; - public static final String AUTO_REFRESH_DEFAULT = "false"; - public static final String APP_ID = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID"; - public static final String FLINT_INDEX_STATE_DOC_ID = "latestId"; + private final String opensearchIndexName; private final String jobId; - private final boolean autoRefresh; private final String appId; private final String latestId; - - public static FlintIndexMetadata fromMetatdata(Map metaMap) { - Map propertiesMap = (Map) metaMap.get(PROPERTIES_KEY); - Map envMap = (Map) propertiesMap.get(ENV_KEY); - Map options = (Map) metaMap.get(OPTIONS_KEY); - String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); - - boolean autoRefresh = - !((String) options.getOrDefault(AUTO_REFRESH, AUTO_REFRESH_DEFAULT)) - .toLowerCase(Locale.ROOT) - .equalsIgnoreCase(AUTO_REFRESH_DEFAULT); - String appId = (String) envMap.getOrDefault(APP_ID, null); - String latestId = (String) metaMap.getOrDefault(FLINT_INDEX_STATE_DOC_ID, null); - return new FlintIndexMetadata(jobId, autoRefresh, appId, latestId); - } + private final FlintIndexOptions flintIndexOptions; public Optional getLatestId() { return Optional.ofNullable(latestId); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java deleted file mode 100644 index 8833665570..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReader.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.opensearch.sql.spark.flint; - -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; - -/** Interface for FlintIndexMetadataReader */ -public interface FlintIndexMetadataReader { - - /** - * Given Index details, get the streaming job Id. - * - * @param indexQueryDetails indexDetails. - * @return FlintIndexMetadata. - */ - FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails); - - /** - * Given Index name, get the streaming job Id. - * - * @param indexName indexName. - * @return FlintIndexMetadata. - */ - FlintIndexMetadata getFlintIndexMetadata(String indexName); -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java deleted file mode 100644 index d6e07fba8a..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImpl.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.opensearch.sql.spark.flint; - -import java.util.Map; -import lombok.AllArgsConstructor; -import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; - -/** Implementation of {@link FlintIndexMetadataReader} */ -@AllArgsConstructor -public class FlintIndexMetadataReaderImpl implements FlintIndexMetadataReader { - - private final Client client; - - @Override - public FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexQueryDetails) { - return getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); - } - - @Override - public FlintIndexMetadata getFlintIndexMetadata(String indexName) { - GetMappingsResponse mappingsResponse = - client.admin().indices().prepareGetMappings(indexName).get(); - try { - MappingMetadata mappingMetadata = mappingsResponse.mappings().get(indexName); - Map mappingSourceMap = mappingMetadata.getSourceAsMap(); - return FlintIndexMetadata.fromMetatdata((Map) mappingSourceMap.get("_meta")); - } catch (NullPointerException npe) { - throw new IllegalArgumentException("Provided Index doesn't exist"); - } - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java new file mode 100644 index 0000000000..c60644be6b --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataService.java @@ -0,0 +1,19 @@ +package org.opensearch.sql.spark.flint; + +import java.util.Map; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; + +/** Interface for FlintIndexMetadataReader */ +public interface FlintIndexMetadataService { + + /** + * Retrieves a map of {@link FlintIndexMetadata} instances matching the specified index pattern. + * + * @param indexPattern indexPattern. + * @return A map of {@link FlintIndexMetadata} instances against indexName, each providing + * metadata access for a matched index. Returns an empty list if no indices match the pattern. + */ + Map getFlintIndexMetadata(String indexPattern); + + void updateIndexToManualRefresh(String indexName, FlintIndexOptions flintIndexOptions); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java new file mode 100644 index 0000000000..e510dc404b --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImpl.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.AUTO_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.CHECKPOINT_LOCATION; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.INCREMENTAL_REFRESH; +import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.LATEST_ID; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.META; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES; +import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.AllArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; + +/** Implementation of {@link FlintIndexMetadataService} */ +@AllArgsConstructor +public class FlintIndexMetadataServiceImpl implements FlintIndexMetadataService { + + private static final Logger LOGGER = LogManager.getLogger(FlintIndexMetadataServiceImpl.class); + + private final Client client; + public static final Set alterAllowedOptions = + Set.of(AUTO_REFRESH, INCREMENTAL_REFRESH, WATERMARK_DELAY, CHECKPOINT_LOCATION); + + private final StateStore stateStore; + + @Override + public Map getFlintIndexMetadata(String indexPattern) { + GetMappingsResponse mappingsResponse = + client.admin().indices().prepareGetMappings().setIndices(indexPattern).get(); + Map indexMetadataMap = new HashMap<>(); + mappingsResponse + .getMappings() + .forEach( + (indexName, mappingMetadata) -> { + try { + Map mappingSourceMap = mappingMetadata.getSourceAsMap(); + FlintIndexMetadata metadata = + fromMetadata(indexName, (Map) mappingSourceMap.get(META)); + indexMetadataMap.put(indexName, metadata); + } catch (Exception exception) { + LOGGER.error( + "Exception while building index details for index: {} due to: {}", + indexName, + exception.getMessage()); + } + }); + return indexMetadataMap; + } + + @Override + public void updateIndexToManualRefresh(String indexName, FlintIndexOptions flintIndexOptions) { + GetMappingsResponse mappingsResponse = + client.admin().indices().prepareGetMappings().setIndices(indexName).get(); + Map flintMetadataMap = + mappingsResponse.getMappings().get(indexName).getSourceAsMap(); + Map meta = (Map) flintMetadataMap.get("_meta"); + String kind = (String) meta.get("kind"); + Map options = (Map) meta.get("options"); + Map newOptions = flintIndexOptions.getSuppliedOptions(); + validateFlintIndexOptions(kind, options, newOptions); + for (String key : newOptions.keySet()) { + options.put(key, newOptions.get(key)); + } + client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get(); + } + + private void validateFlintIndexOptions( + String kind, Map existingOptions, Map newOptions) { + if (existingOptions.containsKey(AUTO_REFRESH) + && !Boolean.parseBoolean((String) existingOptions.get(AUTO_REFRESH))) { + throw new IllegalArgumentException( + "Converting to a manual refresh flint index is not allowed if it is already set to" + + " manual."); + } + HashMap mergedOptions = new HashMap<>(); + mergedOptions.putAll(existingOptions); + mergedOptions.putAll(newOptions); + if (Boolean.parseBoolean((String) existingOptions.get(INCREMENTAL_REFRESH))) { + validateConversionToIncrementalRefresh(kind, mergedOptions); + } + } + + private void validateConversionToIncrementalRefresh( + String kind, HashMap mergedOptions) { + List missingAttributes = new ArrayList<>(); + if (!mergedOptions.containsKey(CHECKPOINT_LOCATION) + || StringUtils.isEmpty((String) mergedOptions.get(CHECKPOINT_LOCATION))) { + missingAttributes.add(CHECKPOINT_LOCATION); + } + if (kind.equals("mv") + && (!mergedOptions.containsKey(WATERMARK_DELAY) + || StringUtils.isEmpty((String) mergedOptions.get(WATERMARK_DELAY)))) { + missingAttributes.add(WATERMARK_DELAY); + } + if (missingAttributes.size() > 0) { + String errorMessage = + "The following attributes are missing for conversion to incremental refresh index: " + + String.join(", ", missingAttributes) + + "."; + LOGGER.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + } + + public FlintIndexMetadata fromMetadata(String indexName, Map metaMap) { + FlintIndexMetadata.FlintIndexMetadataBuilder flintIndexMetadataBuilder = + FlintIndexMetadata.builder(); + Map propertiesMap = (Map) metaMap.get(PROPERTIES); + Map envMap = (Map) propertiesMap.get(ENV_KEY); + Map options = (Map) metaMap.get(OPTIONS); + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + for (String key : options.keySet()) { + flintIndexOptions.setOption(key, (String) options.get(key)); + } + String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID); + String appId = (String) envMap.getOrDefault(APP_ID, null); + String latestId = (String) metaMap.getOrDefault(LATEST_ID, null); + flintIndexMetadataBuilder.jobId(jobId); + flintIndexMetadataBuilder.appId(appId); + flintIndexMetadataBuilder.latestId(latestId); + flintIndexMetadataBuilder.opensearchIndexName(indexName); + flintIndexMetadataBuilder.flintIndexOptions(flintIndexOptions); + return flintIndexMetadataBuilder.build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java index 0ab4d92c17..36ac8fe715 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexState.java @@ -18,16 +18,22 @@ public enum FlintIndexState { EMPTY("empty"), // transitioning state CREATING("creating"), + // stable state + ACTIVE("active"), // transitioning state REFRESHING("refreshing"), // transitioning state CANCELLING("cancelling"), - // stable state - ACTIVE("active"), // transitioning state DELETING("deleting"), // stable state DELETED("deleted"), + // transitioning state + RECOVERING("recovering"), + // transitioning state + VACUUMING("vacuuming"), + // transitioning state + UPDATING("updating"), // stable state FAILED("failed"), // unknown state, if some state update in Spark side, not reflect in here. diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java b/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java new file mode 100644 index 0000000000..c3d024636e --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.model; + +import lombok.Getter; + +/** Request object for fetching Flint Index details. */ +@Getter +public class FlintIndexDetailsRequest { + private String indexPattern; + private boolean isIndexStateRequired; + private String dataSourceName; + + private FlintIndexDetailsRequest(Builder builder) { + this.indexPattern = builder.indexPattern; + this.isIndexStateRequired = builder.isIndexStateRequired; + this.dataSourceName = builder.dataSourceName; + } + + public static class Builder { + private String indexPattern; + private boolean isIndexStateRequired = false; + private String dataSourceName; + + public Builder() {} + + public Builder indexPattern(String indexPattern) { + this.indexPattern = indexPattern; + return this; + } + + public Builder isIndexStateRequired(boolean isIndexStateRequired) { + this.isIndexStateRequired = isIndexStateRequired; + return this; + } + + public Builder dataSourceName(String dataSourceName) { + this.dataSourceName = dataSourceName; + return this; + } + + public FlintIndexDetailsRequest build() { + if (indexPattern == null || indexPattern.isEmpty()) { + throw new IllegalArgumentException("indexPattern cannot be null or empty"); + } + if (isIndexStateRequired && (dataSourceName == null || dataSourceName.isEmpty())) { + throw new IllegalStateException( + "dataSourceName is required when isIndexStateRequired is true"); + } + return new FlintIndexDetailsRequest(this); + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index fb44b27568..ec7bf56258 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -13,6 +13,7 @@ import lombok.RequiredArgsConstructor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; @@ -31,66 +32,103 @@ public abstract class FlintIndexOp { public void apply(FlintIndexMetadata metadata) { // todo, remove this logic after IndexState feature is enabled in Flint. Optional latestId = metadata.getLatestId(); + FlintIndexStateModel initialFlintIndexStateModel = getFlintIndexStateModel(latestId); if (latestId.isEmpty()) { - // take action without occ. - FlintIndexStateModel fakeModel = - new FlintIndexStateModel( - FlintIndexState.REFRESHING, - metadata.getAppId(), - metadata.getJobId(), - "", - datasourceName, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - runOp(fakeModel); + takeActionWithoutOCC(metadata); } else { - Optional flintIndexOptional = - getFlintIndexState(stateStore, datasourceName).apply(latestId.get()); - if (flintIndexOptional.isEmpty()) { - String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId.get()); - LOG.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - FlintIndexStateModel flintIndex = flintIndexOptional.get(); - // 1.validate state. - FlintIndexState currentState = flintIndex.getIndexState(); - if (!validate(currentState)) { - String errorMsg = - String.format(Locale.ROOT, "validate failed. unexpected state: [%s]", currentState); - LOG.debug(errorMsg); + if (!isIndexInValidState(initialFlintIndexStateModel)) { return; } - // 2.begin, move to transitioning state - FlintIndexState transitioningState = transitioningState(); + FlintIndexStateModel transitionedFlintIndexStateModel = + moveToTransitioningState(initialFlintIndexStateModel); + // 3.runOp try { - flintIndex = + runOp(metadata, transitionedFlintIndexStateModel); + commit(transitionedFlintIndexStateModel); + } catch (Throwable e) { + LOG.error("Rolling back transient log due to transaction operation failure", e); + try { + if (transitioningState() != null) { updateFlintIndexState(stateStore, datasourceName) - .apply(flintIndex, transitioningState()); - } catch (Exception e) { - String errorMsg = - String.format( - Locale.ROOT, "begin failed. target transitioning state: [%s]", transitioningState); - LOG.error(errorMsg, e); - throw new IllegalStateException(errorMsg, e); + .apply( + transitionedFlintIndexStateModel, initialFlintIndexStateModel.getIndexState()); + } + } catch (Exception ex) { + LOG.error("Failed to rollback transient log", ex); + } + throw new IllegalStateException("Failed to commit transaction operation"); } + } + } - // 3.runOp - runOp(flintIndex); + @NotNull + private FlintIndexStateModel getFlintIndexStateModel(Optional latestId) { + Optional flintIndexOptional = + getFlintIndexState(stateStore, datasourceName).apply(latestId.get()); + if (flintIndexOptional.isEmpty()) { + String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId.get()); + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + return flintIndexOptional.get(); + } - // 4.commit, move to stable state - FlintIndexState stableState = stableState(); - try { - updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); - } catch (Exception e) { - String errorMsg = - String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState); - LOG.error(errorMsg, e); - throw new IllegalStateException(errorMsg, e); - } + private void takeActionWithoutOCC(FlintIndexMetadata metadata) { + // take action without occ. + FlintIndexStateModel fakeModel = + new FlintIndexStateModel( + FlintIndexState.REFRESHING, + metadata.getAppId(), + metadata.getJobId(), + "", + datasourceName, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + runOp(metadata, fakeModel); + } + + private boolean isIndexInValidState(FlintIndexStateModel flintIndex) { + LOG.debug("Validating the state before the transaction."); + FlintIndexState currentState = flintIndex.getIndexState(); + if (!validate(currentState)) { + String errorMsg = + String.format(Locale.ROOT, "validate failed. unexpected state: [%s]", currentState); + LOG.debug(errorMsg); + return false; + } + return true; + } + + private FlintIndexStateModel moveToTransitioningState(FlintIndexStateModel flintIndex) { + LOG.debug("Moving to transitioning state before committing."); + FlintIndexState transitioningState = transitioningState(); + try { + flintIndex = + updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, transitioningState()); + } catch (Exception e) { + String errorMsg = + String.format( + Locale.ROOT, "begin failed. target transitioning state: [%s]", transitioningState); + LOG.error(errorMsg, e); + throw new IllegalStateException(errorMsg, e); + } + return flintIndex; + } + + private void commit(FlintIndexStateModel flintIndex) { + LOG.debug("Committing the transaction and moving to stable state."); + FlintIndexState stableState = stableState(); + try { + updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); + } catch (Exception e) { + String errorMsg = + String.format(Locale.ROOT, "commit failed. target stable state: [%s]", stableState); + LOG.error(errorMsg, e); + throw new IllegalStateException(errorMsg, e); } } @@ -104,8 +142,10 @@ public void apply(FlintIndexMetadata metadata) { /** get transitioningState */ abstract FlintIndexState transitioningState(); - abstract void runOp(FlintIndexStateModel flintIndex); + abstract void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex); /** get stableState */ abstract FlintIndexState stableState(); + + /* To identify if an operation is composite or simple*/ } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlterOff.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlterOff.java new file mode 100644 index 0000000000..9fdaf24c60 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlterOff.java @@ -0,0 +1,83 @@ +package org.opensearch.sql.spark.flint.operation; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class FlintIndexOpAlterOff extends FlintIndexOp { + private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlterOff.class); + private final EMRServerlessClient emrServerlessClient; + private final FlintIndexMetadataService flintIndexMetadataService; + private final FlintIndexOptions flintIndexOptions; + + public FlintIndexOpAlterOff( + FlintIndexOptions flintIndexOptions, + StateStore stateStore, + String datasourceName, + EMRServerlessClient emrServerlessClient, + FlintIndexMetadataService flintIndexMetadataService) { + super(stateStore, datasourceName); + this.emrServerlessClient = emrServerlessClient; + this.flintIndexMetadataService = flintIndexMetadataService; + this.flintIndexOptions = flintIndexOptions; + } + + @Override + boolean validate(FlintIndexState state) { + return state == FlintIndexState.ACTIVE || state == FlintIndexState.REFRESHING; + } + + @Override + FlintIndexState transitioningState() { + return FlintIndexState.UPDATING; + } + + @SneakyThrows + @Override + void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndexStateModel) { + this.flintIndexMetadataService.updateIndexToManualRefresh( + flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions); + String applicationId = flintIndexStateModel.getApplicationId(); + String jobId = flintIndexStateModel.getJobId(); + try { + emrServerlessClient.cancelJobRun( + flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId()); + } catch (IllegalArgumentException e) { + // handle job does not exist case. + LOG.error(e); + return; + } + + // pull job state until timeout or cancelled. + String jobRunState = ""; + int count = 3; + while (count-- != 0) { + jobRunState = + emrServerlessClient.getJobRunResult(applicationId, jobId).getJobRun().getState(); + if (jobRunState.equalsIgnoreCase("Cancelled")) { + break; + } + TimeUnit.SECONDS.sleep(1); + } + if (!jobRunState.equalsIgnoreCase("Cancelled")) { + String errMsg = + "Cancel job timeout for Application ID: " + applicationId + ", Job ID: " + jobId; + LOG.error(errMsg); + throw new TimeoutException(errMsg); + } + } + + @Override + FlintIndexState stableState() { + return FlintIndexState.ACTIVE; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java index ba067e5c03..7d1a74c943 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -39,7 +40,7 @@ FlintIndexState transitioningState() { /** cancel EMR-S job, wait cancelled state upto 15s. */ @SneakyThrows @Override - void runOp(FlintIndexStateModel flintIndexStateModel) { + void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndexStateModel) { String applicationId = flintIndexStateModel.getApplicationId(); String jobId = flintIndexStateModel.getJobId(); try { diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java index d8b275c621..e4947d0ae3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDelete.java @@ -6,6 +6,7 @@ package org.opensearch.sql.spark.flint.operation; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -28,7 +29,7 @@ FlintIndexState transitioningState() { } @Override - void runOp(FlintIndexStateModel flintIndex) { + void runOp(FlintIndexMetadata indexMetadata, FlintIndexStateModel flintIndex) { // logically delete, do nothing. } diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index d88c1dd9df..4783588156 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -29,7 +29,7 @@ import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; +import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -70,7 +70,7 @@ public SparkQueryDispatcher sparkQueryDispatcher( DataSourceService dataSourceService, DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper, JobExecutionResponseReader jobExecutionResponseReader, - FlintIndexMetadataReaderImpl flintIndexMetadataReader, + FlintIndexMetadataServiceImpl flintIndexMetadataReader, NodeClient client, SessionManager sessionManager, DefaultLeaseManager defaultLeaseManager, @@ -113,8 +113,9 @@ public SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier(Set @Provides @Singleton - public FlintIndexMetadataReaderImpl flintIndexMetadataReader(NodeClient client) { - return new FlintIndexMetadataReaderImpl(client); + public FlintIndexMetadataServiceImpl flintIndexMetadataReader( + NodeClient client, StateStore stateStore) { + return new FlintIndexMetadataServiceImpl(client, stateStore); } @Provides diff --git a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java index c1f3f02576..b8fffde947 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java +++ b/spark/src/main/java/org/opensearch/sql/spark/utils/SQLQueryUtils.java @@ -19,6 +19,7 @@ import org.opensearch.sql.spark.antlr.parser.SqlBaseLexer; import org.opensearch.sql.spark.antlr.parser.SqlBaseParser; import org.opensearch.sql.spark.antlr.parser.SqlBaseParserBaseVisitor; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; @@ -257,23 +258,48 @@ public Void visitRefreshMaterializedViewStatement( @Override public Void visitPropertyList(FlintSparkSqlExtensionsParser.PropertyListContext ctx) { + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); if (ctx != null) { ctx.property() .forEach( - property -> { - // todo. Currently, we use contains() api to avoid unescape string. In future, we - // should leverage - // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 to unescape string literal - if (propertyKey(property.key).toLowerCase(Locale.ROOT).contains("auto_refresh")) { - if (propertyValue(property.value).toLowerCase(Locale.ROOT).contains("true")) { - indexQueryDetailsBuilder.autoRefresh(true); - } - } - }); + property -> + flintIndexOptions.setOption( + removeUnwantedQuotes(propertyKey(property.key).toLowerCase(Locale.ROOT)), + removeUnwantedQuotes( + propertyValue(property.value).toLowerCase(Locale.ROOT)))); } + indexQueryDetailsBuilder.indexOptions(flintIndexOptions); return null; } + @Override + public Void visitAlterCoveringIndexStatement( + FlintSparkSqlExtensionsParser.AlterCoveringIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.ALTER); + indexQueryDetailsBuilder.indexType(FlintIndexType.COVERING); + visitPropertyList(ctx.propertyList()); + return super.visitAlterCoveringIndexStatement(ctx); + } + + @Override + public Void visitAlterSkippingIndexStatement( + FlintSparkSqlExtensionsParser.AlterSkippingIndexStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.ALTER); + indexQueryDetailsBuilder.indexType(FlintIndexType.SKIPPING); + visitPropertyList(ctx.propertyList()); + return super.visitAlterSkippingIndexStatement(ctx); + } + + @Override + public Void visitAlterMaterializedViewStatement( + FlintSparkSqlExtensionsParser.AlterMaterializedViewStatementContext ctx) { + indexQueryDetailsBuilder.indexQueryActionType(IndexQueryActionType.ALTER); + indexQueryDetailsBuilder.indexType(FlintIndexType.MATERIALIZED_VIEW); + indexQueryDetailsBuilder.mvName(ctx.mvName.getText()); + visitPropertyList(ctx.propertyList()); + return super.visitAlterMaterializedViewStatement(ctx); + } + private String propertyKey(FlintSparkSqlExtensionsParser.PropertyKeyContext key) { if (key.STRING() != null) { return key.STRING().getText(); @@ -291,5 +317,15 @@ private String propertyValue(FlintSparkSqlExtensionsParser.PropertyValueContext return value.getText(); } } + + // TODO: Currently escaping is handled partially. + // Full implementation should mirror this: + // https://github.com/apache/spark/blob/v3.5.0/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala#L35 + public String removeUnwantedQuotes(String input) { + if (input != null && input.startsWith("\"") && input.endsWith("\"") && input.length() > 1) { + return input.substring(1, input.length() - 1); + } + return input; + } } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 725080bbcd..26cc7dea1f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -42,7 +42,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -65,9 +64,7 @@ import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; -import org.opensearch.sql.spark.flint.FlintIndexState; -import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -210,7 +207,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( this.dataSourceService, new DataSourceUserAuthorizationHelperImpl(client), jobExecutionResponseReader, - new FlintIndexMetadataReaderImpl(client), + new FlintIndexMetadataServiceImpl(client, stateStore), client, new SessionManager(stateStore, emrServerlessClientFactory, pluginSettings), new DefaultLeaseManager(pluginSettings, stateStore), @@ -330,64 +327,6 @@ public String loadResultIndexMappings() { return Resources.toString(url, Charsets.UTF_8); } - public class MockFlintSparkJob { - - private FlintIndexStateModel stateModel; - - public MockFlintSparkJob(String latestId) { - assertNotNull(latestId); - stateModel = - new FlintIndexStateModel( - FlintIndexState.EMPTY, - "mockAppId", - "mockJobId", - latestId, - DATASOURCE, - System.currentTimeMillis(), - "", - SequenceNumbers.UNASSIGNED_SEQ_NO, - SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, DATASOURCE).apply(stateModel); - } - - public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.REFRESHING); - } - - public void cancelling() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.CANCELLING); - } - - public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.ACTIVE); - } - - public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETING); - } - - public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, DATASOURCE) - .apply(stateModel, FlintIndexState.DELETED); - } - - void assertState(FlintIndexState expected) { - Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, DATASOURCE).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); - assertEquals(expected, stateModelOpt.get().getIndexState()); - } - } - @RequiredArgsConstructor public class FlintDatasetMock { final String query; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 4ec5d4d80b..3acbfc439c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -26,6 +26,7 @@ import org.opensearch.sql.protocol.response.format.ResponseFormatter; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; @@ -52,7 +53,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Before public void doSetUp() { - mockIndexState = new MockFlintSparkJob(mockIndex.latestId); + mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, DATASOURCE); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 9ba15c250e..70169f2ee0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -9,11 +9,17 @@ import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.Map; import org.junit.Assert; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexType; import org.opensearch.sql.spark.leasemanager.ConcurrencyLimitExceededException; @@ -246,7 +252,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -307,7 +314,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state in refresh state. - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -356,7 +364,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.refreshing(); // 1. drop index @@ -404,7 +413,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.cancelling(); // 1. drop index @@ -460,7 +470,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.active(); // 1. drop index @@ -511,7 +522,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.deleted(); // 1. drop index @@ -562,7 +574,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); flintIndexJob.deleting(); // 1. drop index @@ -618,7 +631,8 @@ public EMRServerlessClient getClient() { // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); // 1. drop index CreateAsyncQueryResponse response = @@ -695,7 +709,8 @@ public void concurrentRefreshJobLimitNotApplied() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto refresh @@ -719,7 +734,8 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -746,7 +762,8 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -772,7 +789,8 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { // Mock flint index COVERING.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(COVERING.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); flintIndexJob.refreshing(); CreateAsyncQueryResponse asyncQueryResponse = @@ -845,7 +863,8 @@ public GetJobRunResult getJobRunResult( // Mock flint index mockDS.createIndex(); // Mock index state - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(mockDS.latestId); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = @@ -865,4 +884,340 @@ public GetJobRunResult getJobRunResult( flintIndexJob.assertState(FlintIndexState.ACTIVE); }); } + + @Test + public void cancelRefreshStatementWithFailureInFetchingIndexMetadata() { + String indexName = "flint_my_glue_mydb_http_logs_covering_corrupted_index"; + MockFlintIndex mockFlintIndex = + new MockFlintIndex(client(), indexName, FlintIndexType.COVERING, null); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService( + () -> + new LocalEMRSClient() { + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled")); + } + }); + + mockFlintIndex.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, indexName + "_latest_id", DATASOURCE); + + // 1. Submit REFRESH statement + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "REFRESH INDEX covering_corrupted ON my_glue.mydb.http_logs", + DATASOURCE, + LangType.SQL, + null)); + // mock index state. + flintIndexJob.refreshing(); + + // 2. Cancel query + Assertions.assertThrows( + IllegalStateException.class, + () -> asyncQueryExecutorService.cancelQuery(response.getQueryId())); + } + + // ALTER SKIPPING INDEX + // ON tableName + // WITH LEFT_PAREN propertyList RIGHT_PAREN + // ALTER INDEX indexName + // ON tableName + // WITH LEFT_PAREN propertyList RIGHT_PAREN + // ALTER MATERIALIZED VIEW mvName=multipartIdentifier + // WITH LEFT_PAREN propertyList RIGHT_PAREN + public void testAlterIndexQueryConvertingToManualRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public String startJobRun(StartJobRequest startJobRequest) { + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + return new CancelJobRunResult() + .withJobRunId(jobId) + .withApplicationId(applicationId); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + public void testAlterIndexQueryWithRedundantOperation() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public String startJobRun(StartJobRequest startJobRequest) { + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + return new CancelJobRunResult() + .withJobRunId(jobId) + .withApplicationId(applicationId); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "false"); + mockDS.updateIndexOptions(existingOptions); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "FAILED", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + public void testAlterIndexQueryConvertingToAutoRefresh() { + MockFlintIndex ALTER_SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=false)"); + MockFlintIndex ALTER_MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=false) "); + ImmutableList.of(ALTER_SKIPPING, ALTER_COVERING, ALTER_MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public String startJobRun(StartJobRequest startJobRequest) { + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + return new CancelJobRunResult() + .withJobRunId(jobId) + .withApplicationId(applicationId); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + // Mock flint index + mockDS.createIndex(); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + mockDS.updateIndexOptions(existingOptions); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + flintIndexJob.active(); + + // 1. alter index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = mockDS.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); + } + + public void testAlterIndexQueryWithOutAnyAutoRefresh() { + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + mockDS -> { + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + Assert.fail("should not call cancelJobRun"); + return null; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + Assert.fail("should not call getJobRunResult"); + return null; + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = + new EMRServerlessClientFactory() { + @Override + public EMRServerlessClient getClient() { + return emrsClient; + } + }; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock flint index + mockDS.createIndex(); + // Mock index state + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + flintIndexJob.active(); + + // 1. drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2. fetch result + assertEquals( + "SUCCESS", + asyncQueryExecutorService + .getAsyncQueryResults(response.getQueryId()) + .getStatus()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + }); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java new file mode 100644 index 0000000000..06830b01e8 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintIndex.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.SneakyThrows; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.utils.TestUtils; + +@Getter +public class MockFlintIndex { + private final String indexName; + private final Client client; + private final FlintIndexType flintIndexType; + private final String query; + + public MockFlintIndex( + Client client, String indexName, FlintIndexType flintIndexType, String query) { + this.client = client; + this.indexName = indexName; + this.flintIndexType = flintIndexType; + this.query = query; + } + + public void createIndex() { + String mappingFile = String.format("flint-index-mappings/%s_mapping.json", indexName); + TestUtils.createIndexWithMappings(client, indexName, mappingFile); + } + + public String getLatestId() { + return this.indexName + "_latest_id"; + } + + @SneakyThrows + public void deleteIndex() { + client.admin().indices().delete(new DeleteIndexRequest().indices(indexName)).get(); + } + + public Map getIndexMappings() { + return client + .admin() + .indices() + .prepareGetMappings(indexName) + .get() + .getMappings() + .get(indexName) + .getSourceAsMap(); + } + + public void updateIndexOptions(HashMap newOptions) { + GetMappingsResponse mappingsResponse = + client.admin().indices().prepareGetMappings().setIndices(indexName).get(); + Map flintMetadataMap = + mappingsResponse.getMappings().get(indexName).getSourceAsMap(); + Map meta = (Map) flintMetadataMap.get("_meta"); + Map options = (Map) meta.get("options"); + options.putAll(newOptions); + client.admin().indices().preparePutMapping(indexName).setSource(flintMetadataMap).get(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java new file mode 100644 index 0000000000..459e6e4762 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Optional; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; + +public class MockFlintSparkJob { + private FlintIndexStateModel stateModel; + private StateStore stateStore; + private String datasource; + + public MockFlintSparkJob(StateStore stateStore, String latestId, String datasource) { + assertNotNull(latestId); + this.stateStore = stateStore; + this.datasource = datasource; + stateModel = + new FlintIndexStateModel( + FlintIndexState.EMPTY, + "mockAppId", + "mockJobId", + latestId, + datasource, + System.currentTimeMillis(), + "", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + stateModel = StateStore.createFlintIndexState(stateStore, datasource).apply(stateModel); + } + + public void refreshing() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.REFRESHING); + } + + public void cancelling() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.CANCELLING); + } + + public void active() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.ACTIVE); + } + + public void deleting() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.DELETING); + } + + public void deleted() { + stateModel = + StateStore.updateFlintIndexState(stateStore, datasource) + .apply(stateModel, FlintIndexState.DELETED); + } + + public void assertState(FlintIndexState expected) { + Optional stateModelOpt = + StateStore.getFlintIndexState(stateStore, datasource).apply(stateModel.getId()); + assertTrue((stateModelOpt.isPresent())); + assertEquals(expected, stateModelOpt.get().getIndexState()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index ec82488749..efb065707f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -6,13 +6,44 @@ package org.opensearch.sql.spark.dispatcher; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.opensearch.sql.datasource.model.DataSourceStatus.ACTIVE; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID; +import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE; +import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import java.util.HashMap; import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.Client; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.response.JobExecutionResponseReader; +import org.opensearch.sql.spark.rest.model.LangType; +@ExtendWith(MockitoExtension.class) class IndexDMLHandlerTest { + + @Mock private EMRServerlessClient emrServerlessClient; + @Mock private Client client; + @Mock private JobExecutionResponseReader jobExecutionResponseReader; + @Mock private FlintIndexMetadataService flintIndexMetadataService; + @Mock private StateStore stateStore; + @Test public void getResponseFromExecutor() { JSONObject result = @@ -21,4 +52,49 @@ public void getResponseFromExecutor() { assertEquals("running", result.getString(STATUS_FIELD)); assertEquals("", result.getString(ERROR_FIELD)); } + + @Test + public void testWhenIndexDetailsAreNotFound() { + IndexDMLHandler indexDMLHandler = + new IndexDMLHandler( + emrServerlessClient, + jobExecutionResponseReader, + flintIndexMetadataService, + client, + stateStore); + DispatchQueryRequest dispatchQueryRequest = + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + "DROP INDEX", + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("mys3") + .setDescription("test description") + .setConnector(DataSourceType.S3GLUE) + .setDataSourceStatus(ACTIVE) + .build(); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .mvName("mys3.default.http_logs_metrics") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build(); + DispatchQueryContext dispatchQueryContext = + DispatchQueryContext.builder() + .dataSourceMetadata(metadata) + .indexQueryDetails(indexQueryDetails) + .build(); + Mockito.when(flintIndexMetadataService.getFlintIndexMetadata(any())) + .thenReturn(new HashMap<>()); + IllegalStateException illegalStateException = + Assertions.assertThrows( + IllegalStateException.class, + () -> indexDMLHandler.submit(dispatchQueryRequest, dispatchQueryContext)); + Assertions.assertEquals( + "Couldn't fetch flint index: flint_mys3_default_http_logs_metrics details", + illegalStateException.getMessage()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index bca1bb229a..d1d5033ee0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -74,7 +74,7 @@ import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; -import org.opensearch.sql.spark.flint.FlintIndexMetadataReader; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -87,7 +87,7 @@ public class SparkQueryDispatcherTest { @Mock private DataSourceService dataSourceService; @Mock private JobExecutionResponseReader jobExecutionResponseReader; @Mock private DataSourceUserAuthorizationHelperImpl dataSourceUserAuthorizationHelper; - @Mock private FlintIndexMetadataReader flintIndexMetadataReader; + @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock(answer = RETURNS_DEEP_STUBS) private Client openSearchClient; @@ -118,7 +118,7 @@ void setUp() { dataSourceService, dataSourceUserAuthorizationHelper, jobExecutionResponseReader, - flintIndexMetadataReader, + flintIndexMetadataService, openSearchClient, sessionManager, leaseManager, @@ -168,7 +168,7 @@ void testDispatchSelectQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -213,7 +213,7 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -257,7 +257,7 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -371,7 +371,7 @@ void testDispatchIndexQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -415,7 +415,7 @@ void testDispatchWithPPLQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -460,7 +460,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -509,7 +509,7 @@ void testDispatchIndexQueryWithoutADatasourceName() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -558,7 +558,7 @@ void testDispatchMaterializedViewQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -603,7 +603,7 @@ void testDispatchShowMVQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -648,7 +648,7 @@ void testRefreshIndexQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test @@ -693,7 +693,7 @@ void testDispatchDescribeIndexQuery() { verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataReader); + verifyNoInteractions(flintIndexMetadataService); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java deleted file mode 100644 index 4d809c31dc..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataReaderImplTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.opensearch.sql.spark.flint; - -import static org.mockito.Answers.RETURNS_DEEP_STUBS; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.google.common.base.Charsets; -import com.google.common.io.Resources; -import java.io.IOException; -import java.net.URL; -import java.util.Map; -import lombok.SneakyThrows; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; -import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.MappingMetadata; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.xcontent.DeprecationHandler; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; - -@ExtendWith(MockitoExtension.class) -public class FlintIndexMetadataReaderImplTest { - @Mock(answer = RETURNS_DEEP_STUBS) - private Client client; - - @SneakyThrows - @Test - void testGetJobIdFromFlintSkippingIndexMetadata() { - URL url = - Resources.getResource( - "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); - String mappings = Resources.toString(url, Charsets.UTF_8); - String indexName = "flint_mys3_default_http_logs_skipping_index"; - mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - FlintIndexMetadata indexMetadata = - flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.SKIPPING) - .build()); - Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); - } - - @SneakyThrows - @Test - void testGetJobIdFromFlintCoveringIndexMetadata() { - URL url = - Resources.getResource("flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json"); - String mappings = Resources.toString(url, Charsets.UTF_8); - String indexName = "flint_mys3_default_http_logs_cv1_index"; - mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - FlintIndexMetadata indexMetadata = - flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.COVERING) - .build()); - Assertions.assertEquals("00fdmvv9hp8u0o0q", indexMetadata.getJobId()); - } - - @SneakyThrows - @Test - void testGetJobIDWithNPEException() { - URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); - String mappings = Resources.toString(url, Charsets.UTF_8); - String indexName = "flint_mys3_default_http_logs_cv1_index"; - mockNodeClientIndicesMappings(indexName, mappings); - FlintIndexMetadataReader flintIndexMetadataReader = new FlintIndexMetadataReaderImpl(client); - IllegalArgumentException illegalArgumentException = - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - flintIndexMetadataReader.getFlintIndexMetadata( - IndexQueryDetails.builder() - .indexName("cv1") - .fullyQualifiedTableName( - new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) - .indexQueryActionType(IndexQueryActionType.DROP) - .indexType(FlintIndexType.COVERING) - .build())); - Assertions.assertEquals("Provided Index doesn't exist", illegalArgumentException.getMessage()); - } - - @SneakyThrows - public void mockNodeClientIndicesMappings(String indexName, String mappings) { - GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); - when(client.admin().indices().prepareGetMappings(any()).get()).thenReturn(mockResponse); - Map metadata; - metadata = Map.of(indexName, IndexMetadata.fromXContent(createParser(mappings)).mapping()); - when(mockResponse.mappings()).thenReturn(metadata); - } - - private XContentParser createParser(String mappings) throws IOException { - return XContentType.JSON - .xContent() - .createParser( - NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings); - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImplTest.java new file mode 100644 index 0000000000..981af48b71 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceImplTest.java @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; +import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; +import org.opensearch.sql.spark.execution.statestore.StateStore; + +@ExtendWith(MockitoExtension.class) +public class FlintIndexMetadataServiceImplTest { + @Mock(answer = RETURNS_DEEP_STUBS) + private Client client; + + @Mock private StateStore stateStore; + + @SneakyThrows + @Test + void testGetJobIdFromFlintSkippingIndexMetadata() { + URL url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_skipping_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); + Assertions.assertEquals( + "00fhelvq7peuao0", + indexMetadataMap.get(indexQueryDetails.openSearchIndexName()).getJobId()); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintSkippingIndexMetadataWithIndexState() { + URL url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_skipping_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build(); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); + FlintIndexMetadata metadata = indexMetadataMap.get(indexQueryDetails.openSearchIndexName()); + Assertions.assertEquals("00fhelvq7peuao0", metadata.getJobId()); + } + + @SneakyThrows + @Test + void testGetJobIdFromFlintCoveringIndexMetadata() { + URL url = + Resources.getResource("flint-index-mappings/flint_mys3_default_http_logs_cv1_index.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.COVERING) + .build(); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); + Assertions.assertEquals( + "00fdmvv9hp8u0o0q", + indexMetadataMap.get(indexQueryDetails.openSearchIndexName()).getJobId()); + } + + @SneakyThrows + @Test + void testGetJobIDWithNPEException() { + URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + mockNodeClientIndicesMappings(indexName, mappings); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + IndexQueryDetails indexQueryDetails = + IndexQueryDetails.builder() + .indexName("cv1") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.COVERING) + .build(); + Map flintIndexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata(indexQueryDetails.openSearchIndexName()); + Assertions.assertFalse( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_cv1_index")); + } + + @SneakyThrows + @Test + void testGetJobIDWithNPEExceptionForMultipleIndices() { + HashMap indexMappingsMap = new HashMap<>(); + URL url = Resources.getResource("flint-index-mappings/npe_mapping.json"); + String mappings = Resources.toString(url, Charsets.UTF_8); + String indexName = "flint_mys3_default_http_logs_cv1_index"; + indexMappingsMap.put(indexName, mappings); + url = + Resources.getResource( + "flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json"); + mappings = Resources.toString(url, Charsets.UTF_8); + indexName = "flint_mys3_default_http_logs_skipping_index"; + indexMappingsMap.put(indexName, mappings); + mockNodeClientIndicesMappings("flint_mys3*", indexMappingsMap); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + Map flintIndexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata("flint_mys3*"); + Assertions.assertFalse( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_cv1_index")); + Assertions.assertTrue( + flintIndexMetadataMap.containsKey("flint_mys3_default_http_logs_skipping_index")); + } + + @SneakyThrows + public void mockNodeClientIndicesMappings(String indexName, String mappings) { + GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); + when(client.admin().indices().prepareGetMappings().setIndices(indexName).get()) + .thenReturn(mockResponse); + Map metadata; + metadata = Map.of(indexName, IndexMetadata.fromXContent(createParser(mappings)).mapping()); + when(mockResponse.getMappings()).thenReturn(metadata); + } + + @SneakyThrows + public void mockNodeClientIndicesMappings( + String indexPattern, HashMap indexMappingsMap) { + GetMappingsResponse mockResponse = mock(GetMappingsResponse.class); + when(client.admin().indices().prepareGetMappings().setIndices(indexPattern).get()) + .thenReturn(mockResponse); + Map metadataMap = new HashMap<>(); + for (String indexName : indexMappingsMap.keySet()) { + metadataMap.put( + indexName, + IndexMetadata.fromXContent(createParser(indexMappingsMap.get(indexName))).mapping()); + } + when(mockResponse.getMappings()).thenReturn(metadataMap); + } + + private XContentParser createParser(String mappings) throws IOException { + return XContentType.JSON + .xContent() + .createParser( + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, mappings); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceSpecTest.java new file mode 100644 index 0000000000..1e88d8a2d7 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataServiceSpecTest.java @@ -0,0 +1,151 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.test.OpenSearchIntegTestCase; + +public class FlintIndexMetadataServiceSpecTest extends OpenSearchIntegTestCase { + + private static final Logger LOG = LogManager.getLogger(FlintIndexMetadataServiceSpecTest.class); + protected ClusterService clusterService; + protected NodeClient client; + protected StateStore stateStore; + private MockFlintIndex skippingIndex; + private MockFlintSparkJob skippingIndexState; + private MockFlintIndex coveringIndex; + private MockFlintSparkJob coveringIndexState; + private MockFlintIndex mvIndex; + private MockFlintSparkJob mvIndexState; + + @Before + public void setup() { + clusterService = clusterService(); + client = (NodeClient) cluster().client(); + stateStore = new StateStore(client, clusterService); + + skippingIndex = + new MockFlintIndex( + client, "flint_my_glue_mydb_http_logs_skipping_index", FlintIndexType.SKIPPING, ""); + skippingIndexState = + new MockFlintSparkJob( + stateStore, "flint_my_glue_mydb_http_logs_skipping_index_latest_id", "my_glue"); + coveringIndex = + new MockFlintIndex( + client, "flint_my_glue_mydb_http_logs_covering_index", FlintIndexType.COVERING, ""); + coveringIndexState = + new MockFlintSparkJob( + stateStore, "flint_my_glue_mydb_http_logs_covering_index_latest_id", "my_glue"); + mvIndex = + new MockFlintIndex(client, "flint_my_glue_mydb_mv", FlintIndexType.MATERIALIZED_VIEW, ""); + mvIndexState = new MockFlintSparkJob(stateStore, "flint_my_glue_mydb_mv_latest_id", "my_glue"); + } + + @Test + public void testGetFlintIndexMetadataWithIndexState() { + + skippingIndex.createIndex(); + skippingIndexState.cancelling(); + coveringIndex.createIndex(); + coveringIndexState.refreshing(); + mvIndex.createIndex(); + mvIndexState.active(); + try { + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata("flint_my_glue*"); + Assertions.assertEquals(3, indexMetadataMap.size()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_skipping_index")); + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_covering_index")); + Assertions.assertTrue(indexMetadataMap.containsKey("flint_my_glue_mydb_mv")); + } finally { + skippingIndex.deleteIndex(); + skippingIndexState.deleted(); + coveringIndex.deleteIndex(); + coveringIndexState.deleted(); + mvIndex.deleteIndex(); + mvIndexState.deleted(); + } + } + + @Test + public void testGetFlintIndexMetadataWithOutIndexState() { + skippingIndex.createIndex(); + skippingIndexState.cancelling(); + coveringIndex.createIndex(); + coveringIndexState.refreshing(); + mvIndex.createIndex(); + mvIndexState.active(); + try { + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + Map indexMetadataMap = + flintIndexMetadataService.getFlintIndexMetadata("flint_my_glue*"); + Assertions.assertEquals(3, indexMetadataMap.size()); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_skipping_index")); + + Assertions.assertTrue( + indexMetadataMap.containsKey("flint_my_glue_mydb_http_logs_covering_index")); + Assertions.assertTrue(indexMetadataMap.containsKey("flint_my_glue_mydb_mv")); + } finally { + skippingIndex.deleteIndex(); + skippingIndexState.deleted(); + coveringIndex.deleteIndex(); + coveringIndexState.deleted(); + mvIndex.deleteIndex(); + mvIndexState.deleted(); + } + } + + @Test + public void testUpdateMapping() { + skippingIndex.createIndex(); + LOG.fatal( + client + .admin() + .indices() + .prepareGetMappings() + .setIndices(skippingIndex.getIndexName()) + .get() + .getMappings() + .get("flint_my_glue_mydb_http_logs_skipping_index") + .getSourceAsMap() + .toString()); + FlintIndexMetadataService flintIndexMetadataService = + new FlintIndexMetadataServiceImpl(client, stateStore); + flintIndexMetadataService.updateIndexToManualRefresh( + skippingIndex.getIndexName(), new FlintIndexOptions()); + LOG.fatal( + client + .admin() + .indices() + .prepareGetMappings() + .setIndices(skippingIndex.getIndexName()) + .get() + .getMappings() + .get("flint_my_glue_mydb_http_logs_skipping_index") + .getSourceAsMap() + .toString()); + skippingIndex.deleteIndex(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java deleted file mode 100644 index 808b80766e..0000000000 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/FlintIndexMetadataTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.flint; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.AUTO_REFRESH; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY; -import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID; - -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.Test; - -public class FlintIndexMetadataTest { - - @Test - public void testAutoRefreshSetToTrue() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "true") - .metadata()); - assertTrue(indexMetadata.isAutoRefresh()); - } - - @Test - public void testAutoRefreshSetToFalse() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "false") - .metadata()); - assertFalse(indexMetadata.isAutoRefresh()); - } - - @Test - public void testWithOutAutoRefresh() { - FlintIndexMetadata indexMetadata = - FlintIndexMetadata.fromMetatdata( - new Metadata() - .addEnv(SERVERLESS_EMR_JOB_ID, EMR_JOB_ID) - .addOptions(AUTO_REFRESH, "false") - .metadata()); - assertFalse(indexMetadata.isAutoRefresh()); - } - - static class Metadata { - private final Map properties; - private final Map env; - private final Map options; - - private Metadata() { - properties = new HashMap<>(); - env = new HashMap<>(); - options = new HashMap<>(); - } - - public Metadata addEnv(String key, String value) { - env.put(key, value); - return this; - } - - public Metadata addOptions(String key, String value) { - options.put(key, value); - return this; - } - - public Map metadata() { - Map result = new HashMap<>(); - properties.put(ENV_KEY, env); - result.put(OPTIONS_KEY, options); - result.put(PROPERTIES_KEY, properties); - return result; - } - } -} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java index 6299dee0ca..cddc790d5e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java @@ -8,6 +8,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName; import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; @@ -20,7 +21,7 @@ public void skippingIndexName() { IndexQueryDetails.builder() .indexName("invalid") .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) - .autoRefresh(false) + .indexOptions(new FlintIndexOptions()) .indexQueryActionType(IndexQueryActionType.DROP) .indexType(FlintIndexType.SKIPPING) .build() diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java new file mode 100644 index 0000000000..38623cc19e --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/model/FlintIndexDetailsRequestTest.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.model; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class FlintIndexDetailsRequestTest { + + @Test + void buildWithMinimumRequiredFields() { + FlintIndexDetailsRequest request = + new FlintIndexDetailsRequest.Builder().indexPattern("test-pattern").build(); + + assertNotNull(request); + assertEquals("test-pattern", request.getIndexPattern()); + assertFalse(request.isIndexStateRequired()); + assertNull(request.getDataSourceName()); + } + + @Test + void buildWithAllFields() { + FlintIndexDetailsRequest request = + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName("test-source") + .build(); + + assertNotNull(request); + assertEquals("test-pattern", request.getIndexPattern()); + assertTrue(request.isIndexStateRequired()); + assertEquals("test-source", request.getDataSourceName()); + } + + @Test + void throwExceptionWhenIndexPatternIsNull() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> new FlintIndexDetailsRequest.Builder().indexPattern(null).build()); + assertEquals("indexPattern cannot be null or empty", exception.getMessage()); + } + + @Test + void throwExceptionWhenIndexPatternIsEmpty() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> new FlintIndexDetailsRequest.Builder().indexPattern("").build()); + assertEquals("indexPattern cannot be null or empty", exception.getMessage()); + } + + @Test + void throwExceptionWhenDataSourceNameIsRequiredButNull() { + Exception exception = + assertThrows( + IllegalStateException.class, + () -> + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName(null) + .build()); + assertEquals( + "dataSourceName is required when isIndexStateRequired is true", exception.getMessage()); + } + + @Test + void throwExceptionWhenDataSourceNameIsRequiredButEmpty() { + Exception exception = + assertThrows( + IllegalStateException.class, + () -> + new FlintIndexDetailsRequest.Builder() + .indexPattern("test-pattern") + .isIndexStateRequired(true) + .dataSourceName("") + .build()); + assertEquals( + "dataSourceName is required when isIndexStateRequired is true", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index f5226206ab..1c6fbe5da1 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -259,50 +259,62 @@ void testRefreshIndex() { @Test void testAutoRefresh() { Assertions.assertFalse( - SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()).isAutoRefresh()); + SQLQueryUtils.extractIndexDetails(skippingIndex().getQuery()) + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "false").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "true").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("\"auto_refresh\"", "\"true\"").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails( skippingIndex().withProperty("auto_refresh", "1").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(skippingIndex().withProperty("interval", "1").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); - Assertions.assertFalse(SQLQueryUtils.extractIndexDetails(index().getQuery()).isAutoRefresh()); + Assertions.assertFalse( + SQLQueryUtils.extractIndexDetails(index().getQuery()).getFlintIndexOptions().autoRefresh()); Assertions.assertFalse( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "false").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails(index().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); Assertions.assertTrue( SQLQueryUtils.extractIndexDetails(mv().withProperty("auto_refresh", "true").getQuery()) - .isAutoRefresh()); + .getFlintIndexOptions() + .autoRefresh()); } @Getter diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java b/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java index ca77006d9c..4cab6afa9c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/TestUtils.java @@ -5,8 +5,15 @@ package org.opensearch.sql.spark.utils; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; import java.io.IOException; +import java.net.URL; import java.util.Objects; +import lombok.SneakyThrows; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.XContentType; public class TestUtils { @@ -22,4 +29,17 @@ public static String getJson(String filename) throws IOException { return new String( Objects.requireNonNull(classLoader.getResourceAsStream(filename)).readAllBytes()); } + + @SneakyThrows + public static String loadMappings(String path) { + URL url = Resources.getResource(path); + return Resources.toString(url, Charsets.UTF_8); + } + + public static void createIndexWithMappings( + Client client, String indexName, String metadataFileLocation) { + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.mapping(loadMappings(metadataFileLocation), XContentType.JSON); + client.admin().indices().create(request).actionGet(); + } } diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json new file mode 100644 index 0000000000..90d37c3e79 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_corrupted_index_mapping.json @@ -0,0 +1,33 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_covering_corrupted_index_latest_id", + "kind": "covering", + "indexedColumns": [ + { + "columnType": "string", + "columnName": "clientip" + }, + { + "columnType": "int", + "columnName": "status" + } + ], + "name": "covering", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/cv/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.2.0" + }, + "properties": { + "clientip": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json new file mode 100644 index 0000000000..cb4a6b5366 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_covering_index_mapping.json @@ -0,0 +1,39 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_covering_index_latest_id", + "kind": "covering", + "indexedColumns": [ + { + "columnType": "string", + "columnName": "clientip" + }, + { + "columnType": "int", + "columnName": "status" + } + ], + "name": "covering", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/cv/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.2.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhh7frokkf0k0l", + "SERVERLESS_EMR_JOB_ID": "00fhoag6i0671o0m" + } + } + }, + "properties": { + "clientip": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json new file mode 100644 index 0000000000..4ffd73bf9c --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_http_logs_skipping_index_mapping.json @@ -0,0 +1,39 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_http_logs_skipping_index_latest_id", + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "parameters": { + "max_size": "100" + }, + "columnName": "status" + } + ], + "name": "flint_my_glue_mydb_http_logs_skipping_index", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5, \"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/skp/" + }, + "source": "my_glue.mydb.http_logs", + "version": "0.3.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhe6d5jpah090l", + "SERVERLESS_EMR_JOB_ID": "00fhelvq7peuao0m" + } + } + }, + "properties": { + "file_path": { + "type": "keyword" + }, + "status": { + "type": "integer" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json new file mode 100644 index 0000000000..0fcbf299ec --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_my_glue_mydb_mv_mapping.json @@ -0,0 +1,33 @@ +{ + "_meta": { + "latestId": "flint_my_glue_mydb_mv_latest_id", + "kind": "mv", + "indexedColumns": [ + { + "columnType": "bigint", + "columnName": "counter1" + } + ], + "name": "my_glue.mydb.mv", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/mv/", + "watermark_delay": "10 seconds" + }, + "source": "SELECT count(`@timestamp`) AS `counter1` FROM my_glue.mydb.http_logs GROUP BY TUMBLE (`@timestamp`, '1 second')", + "version": "0.2.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhh7frokkf0k0l", + "SERVERLESS_EMR_JOB_ID": "00fhob01oa7fu00m" + } + } + }, + "properties": { + "counter1": { + "type": "long" + } + } +} \ No newline at end of file diff --git a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json index 24e14c12ba..1438b257d1 100644 --- a/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json +++ b/spark/src/test/resources/flint-index-mappings/flint_mys3_default_http_logs_skipping_index.json @@ -2,23 +2,32 @@ "flint_mys3_default_http_logs_skipping_index": { "mappings": { "_doc": { - "_meta": { + "_meta": { + "latestId": "ZmxpbnRfdmFtc2lfZ2x1ZV92YW1zaWRiX2h0dHBfbG9nc19za2lwcGluZ19pbmRleA==", "kind": "skipping", "indexedColumns": [ { "columnType": "int", "kind": "VALUE_SET", + "parameters": { + "max_size": "100" + }, "columnName": "status" } ], - "name": "flint_mys3_default_http_logs_skipping_index", - "options": {}, - "source": "mys3.default.http_logs", - "version": "0.1.0", + "name": "flint_vamsi_glue_vamsidb_http_logs_skipping_index", + "options": { + "auto_refresh": "true", + "incremental_refresh": "false", + "index_settings": "{\"number_of_shards\":5,\"number_of_replicas\":1}", + "checkpoint_location": "s3://vamsicheckpoint/skp/" + }, + "source": "vamsi_glue.vamsidb.http_logs", + "version": "0.3.0", "properties": { "env": { - "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", - "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fhe6d5jpah090l", + "SERVERLESS_EMR_JOB_ID": "00fhelvq7peuao0" } } }