From ecc5eddb112ea4927a92f9a24f5b3a05dcdb1de7 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Sun, 17 Mar 2024 20:17:29 -0700 Subject: [PATCH] FlintStreamingJobCleanerTask Implementation Signed-off-by: Vamsi Manohar --- .../sql/legacy/metrics/MetricName.java | 4 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 9 +- .../cluster/ClusterManagerEventListener.java | 35 ++++- .../cluster/FlintStreamingJobCleanerTask.java | 125 ++++++++++++++++ ...AsyncQueryExecutorServiceImplSpecTest.java | 73 +++++----- .../AsyncQueryExecutorServiceSpec.java | 14 +- .../AsyncQueryGetResultSpecTest.java | 12 +- .../spark/asyncquery/IndexQuerySpecTest.java | 134 ++++++++++-------- .../FlintStreamingJobCleanerTaskTest.java | 130 +++++++++++++++++ 9 files changed, 425 insertions(+), 111 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTaskTest.java diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java index 91ade7b038..4c3f24a9a9 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/metrics/MetricName.java @@ -47,7 +47,8 @@ public enum MetricName { EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"), EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"), EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"), - EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"); + EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"), + STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT("streaming_job_cleaner_task_failure_count"); private String name; @@ -91,6 +92,7 @@ public static List getNames() { .add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT) .add(ASYNC_QUERY_GET_API_REQUEST_COUNT) .add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT) + .add(STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT) .build(); public boolean isNumerical() { diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 2b75a8b2c9..8487f30716 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -79,7 +79,10 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; @@ -221,7 +224,11 @@ public Collection createComponents( OpenSearchSettings.SESSION_INDEX_TTL_SETTING, OpenSearchSettings.RESULT_INDEX_TTL_SETTING, OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, - environment.settings()); + environment.settings(), + dataSourceService, + injector.getInstance(FlintIndexMetadataService.class), + injector.getInstance(StateStore.class), + injector.getInstance(EMRServerlessClientFactory.class)); return ImmutableList.of( dataSourceService, injector.getInstance(AsyncQueryExecutorService.class), diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java index 3d004b548f..9bea1f4b0d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java @@ -19,17 +19,26 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; public class ClusterManagerEventListener implements LocalNodeClusterManagerListener { private Cancellable flintIndexRetentionCron; + private Cancellable flintStreamingJobCleanerCron; private ClusterService clusterService; private ThreadPool threadPool; private Client client; private Clock clock; + private DataSourceService dataSourceService; + private FlintIndexMetadataService flintIndexMetadataService; + private StateStore stateStore; + private EMRServerlessClientFactory emrServerlessClientFactory; private Duration sessionTtlDuration; private Duration resultTtlDuration; private boolean isAutoIndexManagementEnabled; @@ -42,13 +51,20 @@ public ClusterManagerEventListener( Setting sessionTtl, Setting resultTtl, Setting isAutoIndexManagementEnabledSetting, - Settings settings) { + Settings settings, + DataSourceService dataSourceService, + FlintIndexMetadataService flintIndexMetadataService, + StateStore stateStore, + EMRServerlessClientFactory emrServerlessClientFactory) { this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; this.clusterService.addLocalNodeClusterManagerListener(this); this.clock = clock; - + this.dataSourceService = dataSourceService; + this.flintIndexMetadataService = flintIndexMetadataService; + this.stateStore = stateStore; + this.emrServerlessClientFactory = emrServerlessClientFactory; this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); this.resultTtlDuration = toDuration(resultTtl.get(settings)); @@ -104,6 +120,19 @@ public void beforeStop() { } }); } + initializeStreamingJobCleanerCron(); + } + + private void initializeStreamingJobCleanerCron() { + flintStreamingJobCleanerCron = + threadPool.scheduleWithFixedDelay( + new FlintStreamingJobCleanerTask( + dataSourceService, + flintIndexMetadataService, + stateStore, + emrServerlessClientFactory), + TimeValue.timeValueMinutes(15), + executorName()); } private void reInitializeFlintIndexRetention() { @@ -125,6 +154,8 @@ private void reInitializeFlintIndexRetention() { public void offClusterManager() { cancel(flintIndexRetentionCron); flintIndexRetentionCron = null; + cancel(flintStreamingJobCleanerCron); + flintStreamingJobCleanerCron = null; } private void cancel(Cancellable cron) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java new file mode 100644 index 0000000000..698d94a11d --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTask.java @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; + +/** Cleaner task which alters the active streaming jobs of a disabled datasource. */ +@RequiredArgsConstructor +public class FlintStreamingJobCleanerTask implements Runnable { + + private final DataSourceService dataSourceService; + private final FlintIndexMetadataService flintIndexMetadataService; + private final StateStore stateStore; + private final EMRServerlessClientFactory emrServerlessClientFactory; + + private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobCleanerTask.class); + protected static final AtomicBoolean isRunning = new AtomicBoolean(false); + + @Override + public void run() { + if (!isRunning.compareAndSet(false, true)) { + LOGGER.info("Previous task is still running. Skipping this execution."); + return; + } + try { + LOGGER.info("Starting the cleaner task for disabled and deleted data sources."); + List s3GlueDisabledDataSources = getS3GlueDataSources(); + Set disabledS3DataSources = + s3GlueDisabledDataSources.stream() + .filter( + dataSourceMetadata -> dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) + .map(DataSourceMetadata::getName) + .collect(Collectors.toSet()); + Set allS3DataSources = + s3GlueDisabledDataSources.stream() + .map(DataSourceMetadata::getName) + .collect(Collectors.toSet()); + Map autoRefreshFlintIndicesMap = getAllAutoRefreshIndices(); + autoRefreshFlintIndicesMap.forEach( + (autoRefreshIndex, flintIndexMetadata) -> { + try { + String datasourceName = getDataSourceName(autoRefreshIndex); + if (disabledS3DataSources.contains(datasourceName)) { + LOGGER.debug("Attempting to alter index: {}", autoRefreshIndex); + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false"); + FlintIndexOpAlter flintIndexOpAlter = + new FlintIndexOpAlter( + flintIndexOptions, + stateStore, + datasourceName, + emrServerlessClientFactory.getClient(), + flintIndexMetadataService); + flintIndexOpAlter.apply(flintIndexMetadata); + LOGGER.info("Successfully altered index: {}", autoRefreshIndex); + } else if (!allS3DataSources.contains(datasourceName)) { + // Possibly Replace with VACCUM Operation. + LOGGER.debug("Attempting to cancel auto refresh index: {}", autoRefreshIndex); + FlintIndexOpCancel flintIndexOpCancel = + new FlintIndexOpCancel( + stateStore, datasourceName, emrServerlessClientFactory.getClient()); + flintIndexOpCancel.apply(flintIndexMetadata); + LOGGER.info("Successfully cancelled index: {}", autoRefreshIndex); + } + } catch (Exception exception) { + LOGGER.error( + "Failed to alter/cancel index {}: {}", + autoRefreshIndex, + exception.getMessage(), + exception); + Metrics.getInstance() + .getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT) + .increment(); + } + }); + } catch (Throwable error) { + LOGGER.info("Error while running the streaming job cleaner task: {}", error.getMessage()); + } finally { + isRunning.set(false); + } + } + + private String getDataSourceName(String autoRefreshIndex) { + String[] split = autoRefreshIndex.split("_"); + return split.length > 1 ? split[1] : StringUtils.EMPTY; + } + + private Map getAllAutoRefreshIndices() { + Map flintIndexMetadataHashMap = + flintIndexMetadataService.getFlintIndexMetadata("flint_*"); + return flintIndexMetadataHashMap.entrySet().stream() + .filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private List getS3GlueDataSources() { + return this.dataSourceService.getDataSourceMetadata(false).stream() + .filter(dataSourceMetadata -> dataSourceMetadata.getConnector() == DataSourceType.S3GLUE) + .collect(Collectors.toList()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 6a6d5982b8..f2d3bb1aa8 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -58,7 +58,7 @@ public void withoutSessionCreateAsyncQueryThenGetResultThenCancel() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertFalse(clusterService().state().routingTable().hasIndex(SPARK_REQUEST_BUFFER_INDEX_NAME)); emrsClient.startJobRunCalled(1); @@ -88,12 +88,12 @@ public void sessionLimitNotImpactBatchQuery() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); emrsClient.startJobRunCalled(1); CreateAsyncQueryResponse resp2 = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); emrsClient.startJobRunCalled(2); } @@ -107,7 +107,7 @@ public void createAsyncQueryCreateJobWithCorrectParameters() { enableSession(false); CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); String params = emrsClient.getJobRequest().getSparkSubmitParams(); assertNull(response.getSessionId()); assertTrue(params.contains(String.format("--class %s", DEFAULT_CLASS_NAME))); @@ -121,7 +121,7 @@ public void createAsyncQueryCreateJobWithCorrectParameters() { enableSession(true); response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); params = emrsClient.getJobRequest().getSparkSubmitParams(); assertTrue(params.contains(String.format("--class %s", FLINT_SESSION_CLASS_NAME))); assertTrue( @@ -141,10 +141,10 @@ public void withSessionCreateAsyncQueryThenGetResultThenCancel() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getSessionId()); Optional statementModel = - getStatement(stateStore, DATASOURCE).apply(response.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(response.getQueryId()); assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); @@ -172,14 +172,14 @@ public void reuseSessionWhenCreateAsyncQuery() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // 2. reuse session id CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); assertNotEquals(first.getQueryId(), second.getQueryId()); @@ -199,13 +199,13 @@ public void reuseSessionWhenCreateAsyncQuery() { .must(QueryBuilders.termQuery(SESSION_ID, first.getSessionId())))); Optional firstModel = - getStatement(stateStore, DATASOURCE).apply(first.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(first.getQueryId()); assertTrue(firstModel.isPresent()); assertEquals(StatementState.WAITING, firstModel.get().getStatementState()); assertEquals(first.getQueryId(), firstModel.get().getStatementId().getId()); assertEquals(first.getQueryId(), firstModel.get().getQueryId()); Optional secondModel = - getStatement(stateStore, DATASOURCE).apply(second.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(second.getQueryId()); assertEquals(StatementState.WAITING, secondModel.get().getStatementState()); assertEquals(second.getQueryId(), secondModel.get().getStatementId().getId()); assertEquals(second.getQueryId(), secondModel.get().getQueryId()); @@ -221,7 +221,7 @@ public void batchQueryHasTimeout() { enableSession(false); CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertEquals(120L, (long) emrsClient.getJobRequest().executionTimeout()); } @@ -237,7 +237,7 @@ public void interactiveQueryNoTimeout() { enableSession(true); asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertEquals(0L, (long) emrsClient.getJobRequest().executionTimeout()); } @@ -292,10 +292,10 @@ public void withSessionCreateAsyncQueryFailed() { // 1. create async query. CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("myselect 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("myselect 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getSessionId()); Optional statementModel = - getStatement(stateStore, DATASOURCE).apply(response.getQueryId()); + getStatement(stateStore, MYS3_DATASOURCE).apply(response.getQueryId()); assertTrue(statementModel.isPresent()); assertEquals(StatementState.WAITING, statementModel.get().getStatementState()); @@ -319,7 +319,7 @@ public void withSessionCreateAsyncQueryFailed() { .seqNo(submitted.getSeqNo()) .primaryTerm(submitted.getPrimaryTerm()) .build(); - updateStatementState(stateStore, DATASOURCE).apply(mocked, StatementState.FAILED); + updateStatementState(stateStore, MYS3_DATASOURCE).apply(mocked, StatementState.FAILED); AsyncQueryExecutionResponse asyncQueryResults = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); @@ -343,7 +343,7 @@ public void createSessionMoreThanLimitFailed() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); setSessionState(first.getSessionId(), SessionState.RUNNING); @@ -353,7 +353,7 @@ public void createSessionMoreThanLimitFailed() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } @@ -371,7 +371,7 @@ public void recreateSessionIfNotReady() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to FAIL @@ -381,7 +381,7 @@ public void recreateSessionIfNotReady() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertNotEquals(first.getSessionId(), second.getSessionId()); @@ -392,7 +392,7 @@ public void recreateSessionIfNotReady() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, second.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } @@ -410,7 +410,7 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, null)); + "SHOW SCHEMAS IN " + MYS3_DATASOURCE, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to RUNNING @@ -420,7 +420,10 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DATASOURCE, DATASOURCE, LangType.SQL, first.getSessionId())); + "SHOW SCHEMAS IN " + MYS3_DATASOURCE, + MYS3_DATASOURCE, + LangType.SQL, + first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); @@ -431,7 +434,10 @@ public void submitQueryWithDifferentDataSourceSessionWillCreateNewSession() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "SHOW SCHEMAS IN " + DSOTHER, DSOTHER, LangType.SQL, second.getSessionId())); + "SHOW SCHEMAS IN " + MYGLUE_DATASOURCE, + MYGLUE_DATASOURCE, + LangType.SQL, + second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } @@ -448,7 +454,7 @@ public void recreateSessionIfStale() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); // set sessionState to RUNNING @@ -458,7 +464,7 @@ public void recreateSessionIfStale() { CreateAsyncQueryResponse second = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, first.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, first.getSessionId())); assertEquals(first.getSessionId(), second.getSessionId()); @@ -476,7 +482,7 @@ public void recreateSessionIfStale() { CreateAsyncQueryResponse third = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, second.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, second.getSessionId())); assertNotEquals(second.getSessionId(), third.getSessionId()); } finally { // set timeout setting to 0 @@ -501,11 +507,11 @@ public void submitQueryInInvalidSessionWillCreateNewSession() { enableSession(true); // 1. create async query with invalid sessionId - SessionId invalidSessionId = SessionId.newSessionId(DATASOURCE); + SessionId invalidSessionId = SessionId.newSessionId(MYS3_DATASOURCE); CreateAsyncQueryResponse asyncQuery = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); + "select 1", MYS3_DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); assertNotNull(asyncQuery.getSessionId()); assertNotEquals(invalidSessionId.getSessionId(), asyncQuery.getSessionId()); } @@ -560,7 +566,7 @@ public void concurrentSessionLimitIsDomainLevel() { // 1. create async query. CreateAsyncQueryResponse first = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(first.getSessionId()); setSessionState(first.getSessionId(), SessionState.RUNNING); @@ -570,7 +576,8 @@ public void concurrentSessionLimitIsDomainLevel() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DSOTHER, LangType.SQL, null))); + new CreateAsyncQueryRequest( + "select 1", MYGLUE_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent active session can not exceed 1", exception.getMessage()); } @@ -583,14 +590,14 @@ public void testDatasourceDisabled() { // Disable Datasource HashMap datasourceMap = new HashMap<>(); - datasourceMap.put("name", DATASOURCE); + datasourceMap.put("name", MYS3_DATASOURCE); datasourceMap.put("status", DataSourceStatus.DISABLED); this.dataSourceService.patchDataSource(datasourceMap); // 1. create async query. try { asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest("select 1", MYS3_DATASOURCE, LangType.SQL, null)); fail("It should have thrown DataSourceDisabledException"); } catch (DatasourceDisabledException exception) { Assertions.assertEquals("Datasource mys3 is disabled.", exception.getMessage()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c1532d5c10..a6410e7d6b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -72,8 +72,8 @@ import org.opensearch.test.OpenSearchIntegTestCase; public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { - public static final String DATASOURCE = "mys3"; - public static final String DSOTHER = "mytest"; + public static final String MYS3_DATASOURCE = "mys3"; + public static final String MYGLUE_DATASOURCE = "my_glue"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -115,7 +115,7 @@ public void setup() { dataSourceService = createDataSourceService(); DataSourceMetadata dm = new DataSourceMetadata.Builder() - .setName(DATASOURCE) + .setName(MYS3_DATASOURCE) .setConnector(DataSourceType.S3GLUE) .setProperties( ImmutableMap.of( @@ -131,7 +131,7 @@ public void setup() { dataSourceService.createDataSource(dm); DataSourceMetadata otherDm = new DataSourceMetadata.Builder() - .setName(DSOTHER) + .setName(MYGLUE_DATASOURCE) .setConnector(DataSourceType.S3GLUE) .setProperties( ImmutableMap.of( @@ -305,7 +305,7 @@ public void setConcurrentRefreshJob(long limit) { int search(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); + searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(MYS3_DATASOURCE)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); @@ -315,9 +315,9 @@ int search(QueryBuilder query) { } void setSessionState(String sessionId, SessionState sessionState) { - Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); + Optional model = getSession(stateStore, MYS3_DATASOURCE).apply(sessionId); SessionModel updated = - updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); + updateSessionState(stateStore, MYS3_DATASOURCE).apply(model.get(), sessionState); assertEquals(sessionState, updated.getSessionState()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 3acbfc439c..cc36a61de9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -53,7 +53,7 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Before public void doSetUp() { - mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, DATASOURCE); + mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, MYS3_DATASOURCE); } @Test @@ -436,7 +436,7 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { }); this.createQueryResponse = queryService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); } AssertionHelper withInteraction(Interaction interaction) { @@ -510,8 +510,8 @@ void emrJobWriteResultDoc(Map resultDoc) { /** Simulate EMR-S updates query_execution_request with state */ void emrJobUpdateStatementState(StatementState newState) { - StatementModel stmt = getStatement(stateStore, DATASOURCE).apply(queryId).get(); - StateStore.updateStatementState(stateStore, DATASOURCE).apply(stmt, newState); + StatementModel stmt = getStatement(stateStore, MYS3_DATASOURCE).apply(queryId).get(); + StateStore.updateStatementState(stateStore, MYS3_DATASOURCE).apply(stmt, newState); } void emrJobUpdateJobState(JobRunState jobState) { @@ -525,7 +525,7 @@ private Map createEmptyResultDoc(String queryId) { document.put("schema", ImmutableList.of()); document.put("jobRunId", "XXX"); document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); + document.put("dataSourceName", MYS3_DATASOURCE); document.put("status", "SUCCESS"); document.put("error", ""); document.put("queryId", queryId); @@ -550,7 +550,7 @@ private Map createResultDoc( document.put("schema", schema); document.put("jobRunId", "XXX"); document.put("applicationId", "YYY"); - document.put("dataSourceName", DATASOURCE); + document.put("dataSourceName", MYS3_DATASOURCE); document.put("status", "SUCCESS"); document.put("error", ""); document.put("queryId", queryId); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 25b94f2d11..0334a07626 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -107,7 +107,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getQueryId()); assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); @@ -155,7 +156,8 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2.fetch result. AsyncQueryExecutionResponse asyncQueryResults = @@ -193,7 +195,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -228,13 +231,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(response.getQueryId()); assertTrue(clusterService.state().routingTable().hasIndex(mockDS.indexName)); @@ -284,13 +288,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state in refresh state. MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2.fetch result. AsyncQueryExecutionResponse asyncQueryResults = @@ -328,13 +333,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -371,13 +377,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -419,13 +426,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.active(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -464,13 +472,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.creating(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -509,12 +518,13 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -559,13 +569,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.deleting(); // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); AsyncQueryExecutionResponse asyncQueryExecutionResponse = asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); @@ -612,7 +623,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. drop index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest( + mockDS.query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryResults = @@ -632,7 +644,7 @@ public void concurrentRefreshJobLimitNotApplied() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto refresh @@ -641,7 +653,7 @@ public void concurrentRefreshJobLimitNotApplied() { + "l_quantity) WITH (auto_refresh = true)"; CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); assertNull(response.getSessionId()); } @@ -657,7 +669,7 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -669,7 +681,7 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); } @@ -685,7 +697,7 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -695,7 +707,7 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { ConcurrencyLimitExceededException.class, () -> asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null))); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null))); assertEquals("domain concurrent refresh job can not exceed 1", exception.getMessage()); } @@ -712,12 +724,12 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); CreateAsyncQueryResponse asyncQueryResponse = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); assertNotNull(asyncQueryResponse.getSessionId()); } @@ -748,7 +760,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { // 1. submit create / refresh index query CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest(query, DATASOURCE, LangType.SQL, null)); + new CreateAsyncQueryRequest(query, MYS3_DATASOURCE, LangType.SQL, null)); // 2. cancel query IllegalArgumentException exception = @@ -784,13 +796,13 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.refreshQuery, DATASOURCE, LangType.SQL, null)); + mockDS.refreshQuery, MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. flintIndexJob.refreshing(); @@ -827,13 +839,13 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.refreshQuery, DATASOURCE, LangType.SQL, null)); + mockDS.refreshQuery, MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. flintIndexJob.active(); @@ -869,14 +881,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockFlintIndex.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, indexName + "_latest_id", DATASOURCE); + new MockFlintSparkJob(stateStore, indexName + "_latest_id", MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( "REFRESH INDEX covering_corrupted ON my_glue.mydb.http_logs", - DATASOURCE, + MYS3_DATASOURCE, LangType.SQL, null)); // mock index state. @@ -934,14 +946,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1001,14 +1013,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1080,14 +1092,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1142,14 +1154,14 @@ public void testAlterIndexQueryConvertingToAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -1206,14 +1218,14 @@ public void testAlterIndexQueryWithOutAnyAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result assertEquals( @@ -1279,14 +1291,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1352,14 +1364,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1419,14 +1431,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1479,14 +1491,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1541,14 +1553,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1603,14 +1615,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1662,14 +1674,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = @@ -1719,14 +1731,14 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), DATASOURCE); + new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.updating(); // 1. alter index CreateAsyncQueryResponse response = asyncQueryExecutorService.createAsyncQuery( new CreateAsyncQueryRequest( - mockDS.getQuery(), DATASOURCE, LangType.SQL, null)); + mockDS.getQuery(), MYS3_DATASOURCE, LangType.SQL, null)); // 2. fetch result AsyncQueryExecutionResponse asyncQueryExecutionResponse = diff --git a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTaskTest.java b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTaskTest.java new file mode 100644 index 0000000000..0ffedb35df --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobCleanerTaskTest.java @@ -0,0 +1,130 @@ +package org.opensearch.sql.spark.cluster; + +import static org.opensearch.sql.datasource.model.DataSourceStatus.DISABLED; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceStatus; +import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec; +import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; +import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexType; + +public class FlintStreamingJobCleanerTaskTest extends AsyncQueryExecutorServiceSpec { + + @Before + @Override + public void setup() { + super.setup(); + // Having other type just for extra datasource. + DataSourceMetadata prometheusDataSource = + new DataSourceMetadata.Builder() + .setName("prometheus") + .setConnector(DataSourceType.PROMETHEUS) + .setProperties(ImmutableMap.of("prometheus.uri", "http://localhost:9080")) + .build(); + dataSourceService.createDataSource(prometheusDataSource); + } + + @Test + public void testStreamingJobCleanerWhenDataSourceDisabled() { + MockFlintIndex SKIPPING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex COVERING = + new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + MockFlintIndex MV = + new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.active(); + }); + changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); + LocalEMRSClient localEMRSClient = new LocalEMRSClient(); + EMRServerlessClientFactory emrServerlessClientFactory = () -> localEMRSClient; + FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + FlintStreamingJobCleanerTask flintStreamingJobCleanerTask = + new FlintStreamingJobCleanerTask( + dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + Thread thread = new Thread(flintStreamingJobCleanerTask); + thread.start(); + ImmutableList.of(SKIPPING, COVERING, MV) + .forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); + } + + @Test + public void testMultipleStreamingJobCleanerWhenDataSourceDisabled() {} + + @Test + public void testStreamingJobClearnerWhenDataSourceIsDeleted() {} + + @Test + public void testStreamingJobCleanerWhenDataSourceIsNeitherDisabledNorDeleted() {} + + @Test + public void testStreamingJobCleanerWhenADatasourceIsDeletedAndAnotherIsDisabled() {} + + @Test + public void testStreamingJobCleanerWhenS3GlueDataSourcesAreNotAvailable() {} + + @Test + public void testStreamingJobCleanerWhenS3GlueIsDisabledButNotStreamingJobQueries() {} + + @Test + public void testStreamingJobCleanerWhenFlintIndexIsCorruptedWithNoMetadata() {} + + @Test + public void testStreamingJobCleanerWhenMultipleDataSourcesAreDisabled() {} + + @Test + public void testStreamingJobCleanerWhenConcurrentJobsAreInitiated() {} + + private void changeDataSourceStatus(String dataSourceName, DataSourceStatus dataSourceStatus) { + HashMap datasourceMap = new HashMap<>(); + datasourceMap.put("name", dataSourceName); + datasourceMap.put("status", dataSourceStatus); + this.dataSourceService.patchDataSource(datasourceMap); + } +}