From a07f88f86fa384d94e535f99397e8d0d0402bba0 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 29 Oct 2024 16:54:34 -0700 Subject: [PATCH] Write metadata cache data to mappings _meta with refresh time update (#805) * [0.5-nexus] Write mock metadata cache data to mappings _meta (#744) * write mock metadata cache data to mappings _meta Signed-off-by: Sean Kao * Enable write to cache by default Signed-off-by: Sean Kao * bugfix: _meta.latestId missing when create index Signed-off-by: Sean Kao * set and unset config in test suite Signed-off-by: Sean Kao * fix: use member flintSparkConf Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao * default metadata cache write disabled Signed-off-by: Sean Kao * remove string literal "external" in index builder Signed-off-by: Sean Kao * track refreshInterval and lastRefreshTime Signed-off-by: Sean Kao * add last refresh timestamps to metadata log entry Signed-off-by: Sean Kao * update metadata cache test case: should pass Signed-off-by: Sean Kao * move to spark package; get refresh interval Signed-off-by: Sean Kao * parse refresh interval Signed-off-by: Sean Kao * minor syntax fix on FlintSpark.createIndex Signed-off-by: Sean Kao * strategize cache writer interface Signed-off-by: Sean Kao * update refresh timestamps in FlintSpark Signed-off-by: Sean Kao * add test cases Signed-off-by: Sean Kao * IT test for refresh timestamp update Signed-off-by: Sean Kao * add doc for spark conf Signed-off-by: Sean Kao * change mock table name Signed-off-by: Sean Kao * add IT test at FlintSpark level Signed-off-by: Sean Kao * test with external scheduler Signed-off-by: Sean Kao * refactor refreshIndex method; add test for modes Signed-off-by: Sean Kao * fix typo Signed-off-by: Sean Kao * fix failed test caused by refactoring Signed-off-by: Sean Kao * rename method; add comment Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- docs/index.md | 1 + .../metadata/log/FlintMetadataLogEntry.scala | 36 +- .../log/DefaultOptimisticTransaction.java | 2 + ...ntMetadataLogEntryOpenSearchConverter.java | 6 +- .../storage/FlintOpenSearchMetadataLog.java | 8 +- ...dataLogEntryOpenSearchConverterSuite.scala | 23 +- .../sql/flint/config/FlintSparkConf.scala | 6 + .../opensearch/flint/spark/FlintSpark.scala | 108 +++-- .../FlintDisabledMetadataCacheWriter.scala | 17 + .../metadatacache/FlintMetadataCache.scala | 74 ++++ .../FlintMetadataCacheWriter.scala | 27 ++ .../FlintMetadataCacheWriterBuilder.scala | 18 + .../FlintOpenSearchMetadataCacheWriter.scala | 106 +++++ .../util/IntervalSchedulerParser.java | 19 +- .../util/IntervalSchedulerParserTest.java | 69 ++- .../scala/org/apache/spark/FlintSuite.scala | 20 +- .../ApplyFlintSparkCoveringIndexSuite.scala | 2 + .../FlintMetadataCacheSuite.scala | 79 ++++ .../ApplyFlintSparkSkippingIndexSuite.scala | 2 + .../flint/core/FlintMetadataLogITSuite.scala | 10 +- .../flint/core/FlintTransactionITSuite.scala | 26 +- .../spark/FlintSparkTransactionITSuite.scala | 58 ++- ...OpenSearchMetadataCacheWriterITSuite.scala | 407 ++++++++++++++++++ 23 files changed, 1053 insertions(+), 71 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintDisabledMetadataCacheWriter.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriter.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriterBuilder.scala create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala diff --git a/docs/index.md b/docs/index.md index bb3121ba6..e76cb387a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -549,6 +549,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15. - `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60. - `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5. +- `spark.flint.metadataCacheWrite.enabled`: default is false. enable writing metadata to index mappings _meta as read cache for frontend user to access. Do not use in production, this setting will be removed in later version. #### Data Type Mapping diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala index 982b7df23..1cae64b83 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/log/FlintMetadataLogEntry.scala @@ -18,6 +18,10 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState * log entry id * @param state * Flint index state + * @param lastRefreshStartTime + * timestamp when last refresh started for manual or external scheduler refresh + * @param lastRefreshCompleteTime + * timestamp when last refresh completed for manual or external scheduler refresh * @param entryVersion * entry version fields for consistency control * @param error @@ -28,10 +32,12 @@ import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState case class FlintMetadataLogEntry( id: String, /** - * This is currently used as streaming job start time. In future, this should represent the - * create timestamp of the log entry + * This is currently used as streaming job start time for internal scheduler. In future, this + * should represent the create timestamp of the log entry */ createTime: Long, + lastRefreshStartTime: Long, + lastRefreshCompleteTime: Long, state: IndexState, entryVersion: Map[String, Any], error: String, @@ -40,26 +46,48 @@ case class FlintMetadataLogEntry( def this( id: String, createTime: Long, + lastRefreshStartTime: Long, + lastRefreshCompleteTime: Long, state: IndexState, entryVersion: JMap[String, Any], error: String, properties: JMap[String, Any]) = { - this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap) + this( + id, + createTime, + lastRefreshStartTime, + lastRefreshCompleteTime, + state, + entryVersion.asScala.toMap, + error, + properties.asScala.toMap) } def this( id: String, createTime: Long, + lastRefreshStartTime: Long, + lastRefreshCompleteTime: Long, state: IndexState, entryVersion: JMap[String, Any], error: String, properties: Map[String, Any]) = { - this(id, createTime, state, entryVersion.asScala.toMap, error, properties) + this( + id, + createTime, + lastRefreshStartTime, + lastRefreshCompleteTime, + state, + entryVersion.asScala.toMap, + error, + properties) } } object FlintMetadataLogEntry { + val EMPTY_TIMESTAMP = 0L + /** * Flint index state enum. */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index e6fed4126..ec1eabf14 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -86,6 +86,8 @@ public T commit(Function operation) { initialLog = initialLog.copy( initialLog.id(), initialLog.createTime(), + initialLog.lastRefreshStartTime(), + initialLog.lastRefreshCompleteTime(), initialLog.state(), latest.entryVersion(), initialLog.error(), diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java index 0b78304d2..f90dda9a0 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverter.java @@ -101,7 +101,7 @@ public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessin ObjectMapper mapper = new ObjectMapper(); ObjectNode json = mapper.createObjectNode(); - json.put("version", "1.0"); + json.put("version", "1.1"); json.put("latestId", logEntry.id()); json.put("type", "flintindexstate"); json.put("state", logEntry.state().toString()); @@ -109,6 +109,8 @@ public static String toJson(FlintMetadataLogEntry logEntry) throws JsonProcessin json.put("jobId", jobId); json.put("dataSourceName", logEntry.properties().get("dataSourceName").get().toString()); json.put("jobStartTime", logEntry.createTime()); + json.put("lastRefreshStartTime", logEntry.lastRefreshStartTime()); + json.put("lastRefreshCompleteTime", logEntry.lastRefreshCompleteTime()); json.put("lastUpdateTime", lastUpdateTime); json.put("error", logEntry.error()); @@ -138,6 +140,8 @@ public static FlintMetadataLogEntry constructLogEntry( id, /* sourceMap may use Integer or Long even though it's always long in index mapping */ ((Number) sourceMap.get("jobStartTime")).longValue(), + ((Number) sourceMap.get("lastRefreshStartTime")).longValue(), + ((Number) sourceMap.get("lastRefreshCompleteTime")).longValue(), FlintMetadataLogEntry.IndexState$.MODULE$.from((String) sourceMap.get("state")), Map.of("seqNo", seqNo, "primaryTerm", primaryTerm), (String) sourceMap.get("error"), diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index 8c327b664..24c9df492 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -132,7 +132,9 @@ public void purge() { public FlintMetadataLogEntry emptyLogEntry() { return new FlintMetadataLogEntry( "", - 0L, + FlintMetadataLogEntry.EMPTY_TIMESTAMP(), + FlintMetadataLogEntry.EMPTY_TIMESTAMP(), + FlintMetadataLogEntry.EMPTY_TIMESTAMP(), FlintMetadataLogEntry.IndexState$.MODULE$.EMPTY(), Map.of("seqNo", UNASSIGNED_SEQ_NO, "primaryTerm", UNASSIGNED_PRIMARY_TERM), "", @@ -146,6 +148,8 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { logEntry.copy( latestId, logEntry.createTime(), + logEntry.lastRefreshStartTime(), + logEntry.lastRefreshCompleteTime(), logEntry.state(), logEntry.entryVersion(), logEntry.error(), @@ -184,6 +188,8 @@ private FlintMetadataLogEntry writeLogEntry( logEntry = new FlintMetadataLogEntry( logEntry.id(), logEntry.createTime(), + logEntry.lastRefreshStartTime(), + logEntry.lastRefreshCompleteTime(), logEntry.state(), Map.of("seqNo", response.getSeqNo(), "primaryTerm", response.getPrimaryTerm()), logEntry.error(), diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala index 577dfc5fc..2708d48e8 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/storage/FlintMetadataLogEntryOpenSearchConverterSuite.scala @@ -25,6 +25,10 @@ class FlintMetadataLogEntryOpenSearchConverterTest val sourceMap = JMap.of( "jobStartTime", 1234567890123L.asInstanceOf[Object], + "lastRefreshStartTime", + 1234567890123L.asInstanceOf[Object], + "lastRefreshCompleteTime", + 1234567890123L.asInstanceOf[Object], "state", "active".asInstanceOf[Object], "dataSourceName", @@ -36,6 +40,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest when(mockLogEntry.id).thenReturn("id") when(mockLogEntry.state).thenReturn(FlintMetadataLogEntry.IndexState.ACTIVE) when(mockLogEntry.createTime).thenReturn(1234567890123L) + when(mockLogEntry.lastRefreshStartTime).thenReturn(1234567890123L) + when(mockLogEntry.lastRefreshCompleteTime).thenReturn(1234567890123L) when(mockLogEntry.error).thenReturn("") when(mockLogEntry.properties).thenReturn(Map("dataSourceName" -> "testDataSource")) } @@ -45,7 +51,7 @@ class FlintMetadataLogEntryOpenSearchConverterTest val expectedJsonWithoutLastUpdateTime = s""" |{ - | "version": "1.0", + | "version": "1.1", | "latestId": "id", | "type": "flintindexstate", | "state": "active", @@ -53,6 +59,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest | "jobId": "unknown", | "dataSourceName": "testDataSource", | "jobStartTime": 1234567890123, + | "lastRefreshStartTime": 1234567890123, + | "lastRefreshCompleteTime": 1234567890123, | "error": "" |} |""".stripMargin @@ -67,15 +75,22 @@ class FlintMetadataLogEntryOpenSearchConverterTest logEntry shouldBe a[FlintMetadataLogEntry] logEntry.id shouldBe "id" logEntry.createTime shouldBe 1234567890123L + logEntry.lastRefreshStartTime shouldBe 1234567890123L + logEntry.lastRefreshCompleteTime shouldBe 1234567890123L logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE logEntry.error shouldBe "" logEntry.properties.get("dataSourceName").get shouldBe "testDataSource" } - it should "construct log entry with integer jobStartTime value" in { + it should "construct log entry with integer timestamp value" in { + // Use Integer instead of Long for timestamps val testSourceMap = JMap.of( "jobStartTime", - 1234567890.asInstanceOf[Object], // Integer instead of Long + 1234567890.asInstanceOf[Object], + "lastRefreshStartTime", + 1234567890.asInstanceOf[Object], + "lastRefreshCompleteTime", + 1234567890.asInstanceOf[Object], "state", "active".asInstanceOf[Object], "dataSourceName", @@ -87,6 +102,8 @@ class FlintMetadataLogEntryOpenSearchConverterTest logEntry shouldBe a[FlintMetadataLogEntry] logEntry.id shouldBe "id" logEntry.createTime shouldBe 1234567890 + logEntry.lastRefreshStartTime shouldBe 1234567890 + logEntry.lastRefreshCompleteTime shouldBe 1234567890 logEntry.state shouldBe FlintMetadataLogEntry.IndexState.ACTIVE logEntry.error shouldBe "" logEntry.properties.get("dataSourceName").get shouldBe "testDataSource" diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 68721d235..bdcc120c0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -253,6 +253,10 @@ object FlintSparkConf { FlintConfig("spark.metadata.accessAWSCredentialsProvider") .doc("AWS credentials provider for metadata access permission") .createOptional() + val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled") + .doc("Enable Flint metadata cache write to Flint index mappings") + .createWithDefault("false") + val CUSTOM_SESSION_MANAGER = FlintConfig("spark.flint.job.customSessionManager") .createOptional() @@ -309,6 +313,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt + def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean + /** * spark.sql.session.timeZone */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 532bd8e60..b4412a3d4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -20,6 +20,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.metadatacache.FlintMetadataCacheWriterBuilder import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ @@ -56,6 +57,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions()) } + private val flintMetadataCacheWriter = FlintMetadataCacheWriterBuilder.build(flintSparkConf) + private val flintAsyncQueryScheduler: AsyncQueryScheduler = { AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions()) } @@ -117,7 +120,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w throw new IllegalStateException(s"Flint index $indexName already exists") } } else { - val metadata = index.metadata() val jobSchedulingService = FlintSparkJobSchedulingService.create( index, spark, @@ -129,15 +131,18 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(latest => { - if (latest == null) { // in case transaction capability is disabled - flintClient.createIndex(indexName, metadata) - flintIndexMetadataService.updateIndexMetadata(indexName, metadata) - } else { - logInfo(s"Creating index with metadata log entry ID ${latest.id}") - flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) - flintIndexMetadataService - .updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id))) + val metadata = latest match { + case null => // in case transaction capability is disabled + index.metadata() + case latestEntry => + logInfo(s"Creating index with metadata log entry ID ${latestEntry.id}") + index + .metadata() + .copy(latestId = Some(latestEntry.id), latestLogEntry = Some(latest)) } + flintClient.createIndex(indexName, metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, metadata) + flintMetadataCacheWriter.updateMetadataCache(indexName, metadata) jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE) }) } @@ -156,22 +161,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - tx - .initialLog(latest => latest.state == ACTIVE) - .transientLog(latest => - latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) - .finalLog(latest => { - // Change state to active if full, otherwise update index state regularly - if (indexRefresh.refreshMode == AUTO) { - logInfo("Scheduling index state monitor") - flintIndexMonitor.startMonitor(indexName) - latest - } else { - logInfo("Updating index state to active") - latest.copy(state = ACTIVE) - } - }) - .commit(_ => indexRefresh.start(spark, flintSparkConf)) + indexRefresh.refreshMode match { + case AUTO => refreshIndexAuto(index, indexRefresh, tx) + case FULL | INCREMENTAL => refreshIndexManual(index, indexRefresh, tx) + } }.flatten /** @@ -520,6 +513,63 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() } + /** + * Handles refresh for refresh mode AUTO, which is used exclusively by auto refresh index with + * internal scheduler. Refresh start time and complete time aren't tracked for streaming job. + * TODO: in future, track MicroBatchExecution time for streaming job and update as well + */ + private def refreshIndexAuto( + index: FlintSparkIndex, + indexRefresh: FlintSparkIndexRefresh, + tx: OptimisticTransaction[Option[String]]): Option[String] = { + val indexName = index.name + tx + .initialLog(latest => latest.state == ACTIVE) + .transientLog(latest => + latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) + .finalLog(latest => { + logInfo("Scheduling index state monitor") + flintIndexMonitor.startMonitor(indexName) + latest + }) + .commit(_ => indexRefresh.start(spark, flintSparkConf)) + } + + /** + * Handles refresh for refresh mode FULL and INCREMENTAL, which is used by full refresh index, + * incremental refresh index, and auto refresh index with external scheduler. Stores refresh + * start time and complete time. + */ + private def refreshIndexManual( + index: FlintSparkIndex, + indexRefresh: FlintSparkIndexRefresh, + tx: OptimisticTransaction[Option[String]]): Option[String] = { + val indexName = index.name + tx + .initialLog(latest => latest.state == ACTIVE) + .transientLog(latest => { + val currentTime = System.currentTimeMillis() + val updatedLatest = latest + .copy(state = REFRESHING, createTime = currentTime, lastRefreshStartTime = currentTime) + flintMetadataCacheWriter + .updateMetadataCache( + indexName, + index.metadata.copy(latestLogEntry = Some(updatedLatest))) + updatedLatest + }) + .finalLog(latest => { + logInfo("Updating index state to active") + val updatedLatest = + latest.copy(state = ACTIVE, lastRefreshCompleteTime = System.currentTimeMillis()) + flintMetadataCacheWriter + .updateMetadataCache( + indexName, + index.metadata.copy(latestLogEntry = Some(updatedLatest))) + updatedLatest + }) + .commit(_ => indexRefresh.start(spark, flintSparkConf)) + } + private def updateIndexAutoToManual( index: FlintSparkIndex, tx: OptimisticTransaction[Option[String]]): Option[String] = { @@ -539,8 +589,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = UPDATING)) .finalLog(latest => latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUnschedule)) - .commit(_ => { + .commit(latest => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) + flintMetadataCacheWriter + .updateMetadataCache(indexName, index.metadata.copy(latestLogEntry = Some(latest))) logInfo("Update index options complete") jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UNSCHEDULE) None @@ -566,8 +618,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .finalLog(latest => { latest.copy(state = jobSchedulingService.stateTransitions.finalStateForUpdate) }) - .commit(_ => { + .commit(latest => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) + flintMetadataCacheWriter + .updateMetadataCache(indexName, index.metadata.copy(latestLogEntry = Some(latest))) logInfo("Update index options complete") jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE) }) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintDisabledMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintDisabledMetadataCacheWriter.scala new file mode 100644 index 000000000..4099da3ff --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintDisabledMetadataCacheWriter.scala @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import org.opensearch.flint.common.metadata.FlintMetadata + +/** + * Default implementation of {@link FlintMetadataCacheWriter} that does nothing + */ +class FlintDisabledMetadataCacheWriter extends FlintMetadataCacheWriter { + override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { + // Do nothing + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala new file mode 100644 index 000000000..e8a91e1be --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import scala.collection.JavaConverters.mapAsScalaMapConverter + +import org.opensearch.flint.common.metadata.FlintMetadata +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.spark.FlintSparkIndexOptions +import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser + +/** + * Flint metadata cache defines metadata required to store in read cache for frontend user to + * access. + */ +case class FlintMetadataCache( + metadataCacheVersion: String, + /** Refresh interval for Flint index with auto refresh. Unit: seconds */ + refreshInterval: Option[Int], + /** Source table names for building the Flint index. */ + sourceTables: Array[String], + /** Timestamp when Flint index is last refreshed. Unit: milliseconds */ + lastRefreshTime: Option[Long]) { + + /** + * Convert FlintMetadataCache to a map. Skips a field if its value is not defined. + */ + def toMap: Map[String, AnyRef] = { + val fieldNames = getClass.getDeclaredFields.map(_.getName) + val fieldValues = productIterator.toList + + fieldNames + .zip(fieldValues) + .flatMap { + case (_, None) => List.empty + case (name, Some(value)) => List((name, value)) + case (name, value) => List((name, value)) + } + .toMap + .mapValues(_.asInstanceOf[AnyRef]) + } +} + +object FlintMetadataCache { + + // TODO: constant for version + val mockTableName = + "dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])" + + def apply(metadata: FlintMetadata): FlintMetadataCache = { + val indexOptions = FlintSparkIndexOptions( + metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) + val refreshInterval = if (indexOptions.autoRefresh()) { + indexOptions + .refreshInterval() + .map(IntervalSchedulerParser.parseAndConvertToMillis) + .map(millis => (millis / 1000).toInt) // convert to seconds + } else { + None + } + val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => + entry.lastRefreshCompleteTime match { + case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None + case timestamp => Some(timestamp) + } + } + + // TODO: get source tables from metadata + FlintMetadataCache("1.0", refreshInterval, Array(mockTableName), lastRefreshTime) + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriter.scala new file mode 100644 index 000000000..c256463c3 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriter.scala @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} + +/** + * Writes {@link FlintMetadataCache} to a storage of choice. This is different from {@link + * FlintIndexMetadataService} which persists the full index metadata to a storage for single + * source of truth. + */ +trait FlintMetadataCacheWriter { + + /** + * Update metadata cache for a Flint index. + * + * @param indexName + * index name + * @param metadata + * index metadata to update the cache + */ + def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit + +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriterBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriterBuilder.scala new file mode 100644 index 000000000..be821ae25 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheWriterBuilder.scala @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import org.apache.spark.sql.flint.config.FlintSparkConf + +object FlintMetadataCacheWriterBuilder { + def build(flintSparkConf: FlintSparkConf): FlintMetadataCacheWriter = { + if (flintSparkConf.isMetadataCacheWriteEnabled) { + new FlintOpenSearchMetadataCacheWriter(flintSparkConf.flintOptions()) + } else { + new FlintDisabledMetadataCacheWriter + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala new file mode 100644 index 000000000..2bc373792 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import java.util + +import scala.collection.JavaConverters._ + +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.PutMappingRequest +import org.opensearch.common.xcontent.XContentType +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder +import org.opensearch.flint.core.metadata.FlintJsonHelper._ +import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} + +import org.apache.spark.internal.Logging + +/** + * Writes {@link FlintMetadataCache} to index mappings `_meta` field for frontend user to access. + */ +class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) + extends FlintMetadataCacheWriter + with Logging { + + /** + * Since metadata cache shares the index mappings _meta field with OpenSearch index metadata + * storage, this flag is to allow for preserving index metadata that is already stored in _meta + * when updating metadata cache. + */ + private val includeSpec: Boolean = + FlintIndexMetadataServiceBuilder + .build(options) + .isInstanceOf[FlintOpenSearchIndexMetadataService] + + override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { + logInfo(s"Updating metadata cache for $indexName"); + val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) + var client: IRestHighLevelClient = null + try { + client = OpenSearchClientUtils.createClient(options) + val request = new PutMappingRequest(osIndexName) + request.source(serialize(metadata), XContentType.JSON) + client.updateIndexMapping(request, RequestOptions.DEFAULT) + } catch { + case e: Exception => + throw new IllegalStateException( + s"Failed to update metadata cache for Flint index $osIndexName", + e) + } finally + if (client != null) { + client.close() + } + } + + /** + * Serialize FlintMetadataCache from FlintMetadata. Modified from {@link + * FlintOpenSearchIndexMetadataService} + */ + private[metadatacache] def serialize(metadata: FlintMetadata): String = { + try { + buildJson(builder => { + objectField(builder, "_meta") { + // If _meta is used as index metadata storage, preserve them. + if (includeSpec) { + builder + .field("version", metadata.version.version) + .field("name", metadata.name) + .field("kind", metadata.kind) + .field("source", metadata.source) + .field("indexedColumns", metadata.indexedColumns) + + if (metadata.latestId.isDefined) { + builder.field("latestId", metadata.latestId.get) + } + optionalObjectField(builder, "options", metadata.options) + } + + optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) + } + builder.field("properties", metadata.schema) + }) + } catch { + case e: Exception => + throw new IllegalStateException("Failed to jsonify cache metadata", e) + } + } + + /** + * Since _meta.properties is shared by both index metadata and metadata cache, here we merge the + * two maps. + */ + private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { + val metadataCacheProperties = FlintMetadataCache(metadata).toMap + + if (includeSpec) { + (metadataCacheProperties ++ metadata.properties.asScala).asJava + } else { + metadataCacheProperties.asJava + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java index 8745681b9..9622b4c64 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/util/IntervalSchedulerParser.java @@ -18,21 +18,30 @@ public class IntervalSchedulerParser { /** - * Parses a schedule string into an IntervalSchedule. + * Parses a schedule string into an integer in milliseconds. * * @param scheduleStr the schedule string to parse - * @return the parsed IntervalSchedule + * @return the parsed integer * @throws IllegalArgumentException if the schedule string is invalid */ - public static IntervalSchedule parse(String scheduleStr) { + public static Long parseAndConvertToMillis(String scheduleStr) { if (Strings.isNullOrEmpty(scheduleStr)) { throw new IllegalArgumentException("Schedule string must not be null or empty."); } - Long millis = Triggers.convert(scheduleStr); + return Triggers.convert(scheduleStr); + } + /** + * Parses a schedule string into an IntervalSchedule. + * + * @param scheduleStr the schedule string to parse + * @return the parsed IntervalSchedule + * @throws IllegalArgumentException if the schedule string is invalid + */ + public static IntervalSchedule parse(String scheduleStr) { // Convert milliseconds to minutes (rounding down) - int minutes = (int) (millis / (60 * 1000)); + int minutes = (int) (parseAndConvertToMillis(scheduleStr) / (60 * 1000)); // Use the current time as the start time Instant startTime = Instant.now(); diff --git a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java index 2ad1fea9c..731e1ae5c 100644 --- a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/util/IntervalSchedulerParserTest.java @@ -23,53 +23,92 @@ public void testParseNull() { IntervalSchedulerParser.parse(null); } + @Test(expected = IllegalArgumentException.class) + public void testParseMillisNull() { + IntervalSchedulerParser.parseAndConvertToMillis(null); + } + @Test(expected = IllegalArgumentException.class) public void testParseEmptyString() { IntervalSchedulerParser.parse(""); } + @Test(expected = IllegalArgumentException.class) + public void testParseMillisEmptyString() { + IntervalSchedulerParser.parseAndConvertToMillis(""); + } + @Test public void testParseString() { - Schedule result = IntervalSchedulerParser.parse("10 minutes"); - assertTrue(result instanceof IntervalSchedule); - IntervalSchedule intervalSchedule = (IntervalSchedule) result; + Schedule schedule = IntervalSchedulerParser.parse("10 minutes"); + assertTrue(schedule instanceof IntervalSchedule); + IntervalSchedule intervalSchedule = (IntervalSchedule) schedule; assertEquals(10, intervalSchedule.getInterval()); assertEquals(ChronoUnit.MINUTES, intervalSchedule.getUnit()); } + @Test + public void testParseMillisString() { + Long millis = IntervalSchedulerParser.parseAndConvertToMillis("10 minutes"); + assertEquals(600000, millis.longValue()); + } + @Test(expected = IllegalArgumentException.class) public void testParseInvalidFormat() { IntervalSchedulerParser.parse("invalid format"); } + @Test(expected = IllegalArgumentException.class) + public void testParseMillisInvalidFormat() { + IntervalSchedulerParser.parseAndConvertToMillis("invalid format"); + } + @Test public void testParseStringScheduleMinutes() { - IntervalSchedule result = IntervalSchedulerParser.parse("5 minutes"); - assertEquals(5, result.getInterval()); - assertEquals(ChronoUnit.MINUTES, result.getUnit()); + IntervalSchedule schedule = IntervalSchedulerParser.parse("5 minutes"); + assertEquals(5, schedule.getInterval()); + assertEquals(ChronoUnit.MINUTES, schedule.getUnit()); + } + + @Test + public void testParseMillisStringScheduleMinutes() { + Long millis = IntervalSchedulerParser.parseAndConvertToMillis("5 minutes"); + assertEquals(300000, millis.longValue()); } @Test public void testParseStringScheduleHours() { - IntervalSchedule result = IntervalSchedulerParser.parse("2 hours"); - assertEquals(120, result.getInterval()); - assertEquals(ChronoUnit.MINUTES, result.getUnit()); + IntervalSchedule schedule = IntervalSchedulerParser.parse("2 hours"); + assertEquals(120, schedule.getInterval()); + assertEquals(ChronoUnit.MINUTES, schedule.getUnit()); + } + + @Test + public void testParseMillisStringScheduleHours() { + Long millis = IntervalSchedulerParser.parseAndConvertToMillis("2 hours"); + assertEquals(7200000, millis.longValue()); } @Test public void testParseStringScheduleDays() { - IntervalSchedule result = IntervalSchedulerParser.parse("1 day"); - assertEquals(1440, result.getInterval()); - assertEquals(ChronoUnit.MINUTES, result.getUnit()); + IntervalSchedule schedule = IntervalSchedulerParser.parse("1 day"); + assertEquals(1440, schedule.getInterval()); + assertEquals(ChronoUnit.MINUTES, schedule.getUnit()); + } + + @Test + public void testParseMillisStringScheduleDays() { + Long millis = IntervalSchedulerParser.parseAndConvertToMillis("1 day"); + assertEquals(86400000, millis.longValue()); } @Test public void testParseStringScheduleStartTime() { Instant before = Instant.now(); - IntervalSchedule result = IntervalSchedulerParser.parse("30 minutes"); + IntervalSchedule schedule = IntervalSchedulerParser.parse("30 minutes"); Instant after = Instant.now(); - assertTrue(result.getStartTime().isAfter(before) || result.getStartTime().equals(before)); - assertTrue(result.getStartTime().isBefore(after) || result.getStartTime().equals(after)); + assertTrue(schedule.getStartTime().isAfter(before) || schedule.getStartTime().equals(before)); + assertTrue(schedule.getStartTime().isBefore(after) || schedule.getStartTime().equals(after)); } } \ No newline at end of file diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index ee8a52d96..e43b0c52c 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -10,7 +10,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.FlintConfigEntry -import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED +import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -44,4 +44,22 @@ trait FlintSuite extends SharedSparkSession { setFlintSparkConf(HYBRID_SCAN_ENABLED, "false") } } + + protected def withExternalSchedulerEnabled(block: => Unit): Unit = { + setFlintSparkConf(EXTERNAL_SCHEDULER_ENABLED, "true") + try { + block + } finally { + setFlintSparkConf(EXTERNAL_SCHEDULER_ENABLED, "false") + } + } + + protected def withMetadataCacheWriteEnabled(block: => Unit): Unit = { + setFlintSparkConf(METADATA_CACHE_WRITE, "true") + try { + block + } finally { + setFlintSparkConf(METADATA_CACHE_WRITE, "false") + } + } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 2c5518778..96c71d94b 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -246,6 +246,8 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { new FlintMetadataLogEntry( "id", 0, + 0, + 0, state, Map("seqNo" -> 0, "primaryTerm" -> 0), "", diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala new file mode 100644 index 000000000..c6d2cf12a --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { + val flintMetadataLogEntry = FlintMetadataLogEntry( + "id", + 0L, + 0L, + 1234567890123L, + FlintMetadataLogEntry.IndexState.ACTIVE, + Map.empty[String, Any], + "", + Map.empty[String, Any]) + + it should "construct from FlintMetadata" in { + val content = + """ { + | "_meta": { + | "kind": "test_kind", + | "options": { + | "auto_refresh": "true", + | "refresh_interval": "10 Minutes" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + + val metadataCache = FlintMetadataCache(metadata) + metadataCache.metadataCacheVersion shouldBe "1.0" + metadataCache.refreshInterval.get shouldBe 600 + metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName) + metadataCache.lastRefreshTime.get shouldBe 1234567890123L + } + + it should "construct from FlintMetadata excluding invalid fields" in { + // Set auto_refresh = false and lastRefreshCompleteTime = 0 + val content = + """ { + | "_meta": { + | "kind": "test_kind", + | "options": { + | "refresh_interval": "10 Minutes" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) + + val metadataCache = FlintMetadataCache(metadata) + metadataCache.metadataCacheVersion shouldBe "1.0" + metadataCache.refreshInterval shouldBe empty + metadataCache.sourceTables shouldBe Array(FlintMetadataCache.mockTableName) + metadataCache.lastRefreshTime shouldBe empty + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index f03116de9..d56c4e66f 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -132,6 +132,8 @@ class ApplyFlintSparkSkippingIndexSuite extends FlintSuite with Matchers { new FlintMetadataLogEntry( "id", 0L, + 0L, + 0L, indexState, Map.empty[String, Any], "", diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index 9aeba7512..33702f23f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -25,10 +25,12 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val testFlintIndex = "flint_test_index" val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) - val testCreateTime = 1234567890123L + val testTimestamp = 1234567890123L val flintMetadataLogEntry = FlintMetadataLogEntry( testLatestId, - testCreateTime, + testTimestamp, + testTimestamp, + testTimestamp, ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), "", @@ -85,8 +87,10 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val latest = metadataLog.get.getLatest latest.isPresent shouldBe true latest.get.id shouldBe testLatestId - latest.get.createTime shouldBe testCreateTime + latest.get.createTime shouldBe testTimestamp latest.get.error shouldBe "" + latest.get.lastRefreshStartTime shouldBe testTimestamp + latest.get.lastRefreshCompleteTime shouldBe testTimestamp latest.get.properties.get("dataSourceName").get shouldBe testDataSourceName } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index 605e8e7fd..df5f4eec2 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -24,6 +24,7 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { val testFlintIndex = "flint_test_index" val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) + val testTimestamp = 1234567890123L var flintMetadataLogService: FlintMetadataLogService = _ override def beforeAll(): Unit = { @@ -40,6 +41,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.state shouldBe EMPTY latest.createTime shouldBe 0L + latest.lastRefreshStartTime shouldBe 0L + latest.lastRefreshCompleteTime shouldBe 0L latest.error shouldBe "" latest.properties.get("dataSourceName").get shouldBe testDataSourceName true @@ -49,11 +52,12 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { } test("should preserve original values when transition") { - val testCreateTime = 1234567890123L createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - createTime = testCreateTime, + createTime = testTimestamp, + lastRefreshStartTime = testTimestamp, + lastRefreshCompleteTime = testTimestamp, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), error = "", @@ -63,8 +67,10 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .startTransaction(testFlintIndex) .initialLog(latest => { latest.id shouldBe testLatestId - latest.createTime shouldBe testCreateTime + latest.createTime shouldBe testTimestamp latest.error shouldBe "" + latest.lastRefreshStartTime shouldBe testTimestamp + latest.lastRefreshCompleteTime shouldBe testTimestamp latest.properties.get("dataSourceName").get shouldBe testDataSourceName true }) @@ -72,8 +78,10 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .finalLog(latest => latest.copy(state = DELETED)) .commit(latest => { latest.id shouldBe testLatestId - latest.createTime shouldBe testCreateTime + latest.createTime shouldBe testTimestamp latest.error shouldBe "" + latest.lastRefreshStartTime shouldBe testTimestamp + latest.lastRefreshCompleteTime shouldBe testTimestamp latest.properties.get("dataSourceName").get shouldBe testDataSourceName }) @@ -112,7 +120,9 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - createTime = 1234567890123L, + createTime = testTimestamp, + lastRefreshStartTime = testTimestamp, + lastRefreshCompleteTime = testTimestamp, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), error = "", @@ -198,7 +208,9 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { createLatestLogEntry( FlintMetadataLogEntry( id = testLatestId, - createTime = 1234567890123L, + createTime = testTimestamp, + lastRefreshStartTime = testTimestamp, + lastRefreshCompleteTime = testTimestamp, state = ACTIVE, Map("seqNo" -> UNASSIGNED_SEQ_NO, "primaryTerm" -> UNASSIGNED_PRIMARY_TERM), error = "", @@ -240,6 +252,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { latest.id shouldBe testLatestId latest.state shouldBe EMPTY latest.createTime shouldBe 0L + latest.lastRefreshStartTime shouldBe 0L + latest.lastRefreshCompleteTime shouldBe 0L latest.error shouldBe "" latest.properties.get("dataSourceName").get shouldBe testDataSourceName true diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index debb95370..e16d40f2a 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -55,6 +55,8 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latestLogEntry(testLatestId) should (contain("latestId" -> testLatestId) and contain("state" -> "active") and contain("jobStartTime" -> 0) + and contain("lastRefreshStartTime" -> 0) + and contain("lastRefreshCompleteTime" -> 0) and contain("dataSourceName" -> testDataSourceName)) implicit val formats: Formats = Serialization.formats(NoTypeHints) @@ -77,9 +79,25 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex) - val latest = latestLogEntry(testLatestId) + var latest = latestLogEntry(testLatestId) + val prevJobStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + val prevLastRefreshStartTime = latest("lastRefreshStartTime").asInstanceOf[Number].longValue() + val prevLastRefreshCompleteTime = + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() latest should contain("state" -> "active") - latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L + prevJobStartTime should be > 0L + prevLastRefreshStartTime should be > 0L + prevLastRefreshCompleteTime should be > prevLastRefreshStartTime + + flint.refreshIndex(testFlintIndex) + latest = latestLogEntry(testLatestId) + val jobStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + val lastRefreshStartTime = latest("lastRefreshStartTime").asInstanceOf[Number].longValue() + val lastRefreshCompleteTime = + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() + jobStartTime should be > prevLastRefreshCompleteTime + lastRefreshStartTime should be > prevLastRefreshCompleteTime + lastRefreshCompleteTime should be > lastRefreshStartTime } test("incremental refresh index") { @@ -97,9 +115,26 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex) - val latest = latestLogEntry(testLatestId) + var latest = latestLogEntry(testLatestId) + val prevJobStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + val prevLastRefreshStartTime = + latest("lastRefreshStartTime").asInstanceOf[Number].longValue() + val prevLastRefreshCompleteTime = + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() latest should contain("state" -> "active") - latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L + prevJobStartTime should be > 0L + prevLastRefreshStartTime should be > 0L + prevLastRefreshCompleteTime should be > prevLastRefreshStartTime + + flint.refreshIndex(testFlintIndex) + latest = latestLogEntry(testLatestId) + val jobStartTime = latest("jobStartTime").asInstanceOf[Number].longValue() + val lastRefreshStartTime = latest("lastRefreshStartTime").asInstanceOf[Number].longValue() + val lastRefreshCompleteTime = + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() + jobStartTime should be > prevLastRefreshCompleteTime + lastRefreshStartTime should be > prevLastRefreshCompleteTime + lastRefreshCompleteTime should be > lastRefreshStartTime } } @@ -142,6 +177,8 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match val latest = latestLogEntry(testLatestId) latest should contain("state" -> "refreshing") latest("jobStartTime").asInstanceOf[Number].longValue() should be > 0L + latest("lastRefreshStartTime").asInstanceOf[Number].longValue() shouldBe 0L + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() shouldBe 0L } test("update auto refresh index to full refresh index") { @@ -153,13 +190,24 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() flint.refreshIndex(testFlintIndex) + var latest = latestLogEntry(testLatestId) + val prevLastRefreshStartTime = latest("lastRefreshStartTime").asInstanceOf[Number].longValue() + val prevLastRefreshCompleteTime = + latest("lastRefreshCompleteTime").asInstanceOf[Number].longValue() + val index = flint.describeIndex(testFlintIndex).get val updatedIndex = flint .skippingIndex() .copyWithUpdate(index, FlintSparkIndexOptions(Map("auto_refresh" -> "false"))) flint.updateIndex(updatedIndex) - val latest = latestLogEntry(testLatestId) + latest = latestLogEntry(testLatestId) latest should contain("state" -> "active") + latest("lastRefreshStartTime") + .asInstanceOf[Number] + .longValue() shouldBe prevLastRefreshStartTime + latest("lastRefreshCompleteTime") + .asInstanceOf[Number] + .longValue() shouldBe prevLastRefreshCompleteTime } test("delete and vacuum index") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala new file mode 100644 index 000000000..c04209f06 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -0,0 +1,407 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.metadatacache + +import java.util.{Base64, List} + +import scala.collection.JavaConverters._ + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.native.JsonMethods._ +import org.opensearch.flint.common.FlintVersion.current +import org.opensearch.flint.common.metadata.FlintMetadata +import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} +import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.scalatest.Entry +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.sql.flint.config.FlintSparkConf + +class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Matchers { + + /** Lazy initialize after container started. */ + lazy val options = new FlintOptions(openSearchOptions.asJava) + lazy val flintClient = new FlintOpenSearchClient(options) + lazy val flintMetadataCacheWriter = new FlintOpenSearchMetadataCacheWriter(options) + lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) + + private val testTable = "spark_catalog.default.metadatacache_test" + private val testFlintIndex = getSkippingIndexName(testTable) + private val testLatestId: String = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) + private val testLastRefreshCompleteTime = 1234567890123L + private val flintMetadataLogEntry = FlintMetadataLogEntry( + testLatestId, + 0L, + 0L, + testLastRefreshCompleteTime, + FlintMetadataLogEntry.IndexState.ACTIVE, + Map.empty[String, Any], + "", + Map.empty[String, Any]) + + override def beforeAll(): Unit = { + super.beforeAll() + createPartitionedMultiRowAddressTable(testTable) + } + + override def afterAll(): Unit = { + sql(s"DROP TABLE $testTable") + super.afterAll() + } + + override def afterEach(): Unit = { + deleteTestIndex(testFlintIndex) + super.afterEach() + } + + test("build disabled metadata cache writer") { + FlintMetadataCacheWriterBuilder + .build(FlintSparkConf()) shouldBe a[FlintDisabledMetadataCacheWriter] + } + + test("build opensearch metadata cache writer") { + setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") + withMetadataCacheWriteEnabled { + FlintMetadataCacheWriterBuilder + .build(FlintSparkConf()) shouldBe a[FlintOpenSearchMetadataCacheWriter] + } + } + + test("serialize metadata cache to JSON") { + val expectedMetadataJson: String = s""" + | { + | "_meta": { + | "version": "${current()}", + | "name": "${testFlintIndex}", + | "kind": "test_kind", + | "source": "test_source_table", + | "indexedColumns": [ + | { + | "test_field": "spark_type" + | }], + | "options": { + | "auto_refresh": "true", + | "refresh_interval": "10 Minutes" + | }, + | "properties": { + | "metadataCacheVersion": "1.0", + | "refreshInterval": 600, + | "sourceTables": ["${FlintMetadataCache.mockTableName}"], + | "lastRefreshTime": ${testLastRefreshCompleteTime} + | }, + | "latestId": "${testLatestId}" + | }, + | "properties": { + | "test_field": { + | "type": "os_type" + | } + | } + | } + |""".stripMargin + val builder = new FlintMetadata.Builder + builder.name(testFlintIndex) + builder.kind("test_kind") + builder.source("test_source_table") + builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) + builder.options( + Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes") + .mapValues(_.asInstanceOf[AnyRef]) + .asJava) + builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) + builder.latestLogEntry(flintMetadataLogEntry) + + val metadata = builder.build() + flintMetadataCacheWriter.serialize(metadata) should matchJson(expectedMetadataJson) + } + + test("write metadata cache to index mappings") { + val metadata = FlintOpenSearchIndexMetadataService + .deserialize("{}") + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 3 + properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + } + + test("write metadata cache to index mappings with refresh interval") { + val content = + """ { + | "_meta": { + | "kind": "test_kind", + | "options": { + | "auto_refresh": "true", + | "refresh_interval": "10 Minutes" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 4 + properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + Entry("refreshInterval", 600), + Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + } + + test("exclude refresh interval in metadata cache when auto refresh is false") { + val content = + """ { + | "_meta": { + | "kind": "test_kind", + | "options": { + | "refresh_interval": "10 Minutes" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 3 + properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + } + + test("exclude last refresh time in metadata cache when index has not been refreshed") { + val metadata = FlintOpenSearchIndexMetadataService + .deserialize("{}") + .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 2 + properties should contain(Entry("metadataCacheVersion", "1.0")) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + } + + test("write metadata cache to index mappings and preserve other index metadata") { + val content = + """ { + | "_meta": { + | "kind": "test_kind" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val metadata = FlintOpenSearchIndexMetadataService + .deserialize(content) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintClient.createIndex(testFlintIndex, metadata) + + flintIndexMetadataService.updateIndexMetadata(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe empty + flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 + var properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 3 + properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + + val newContent = + """ { + | "_meta": { + | "kind": "test_kind", + | "name": "test_name" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + + val newMetadata = FlintOpenSearchIndexMetadataService + .deserialize(newContent) + .copy(latestLogEntry = Some(flintMetadataLogEntry)) + flintIndexMetadataService.updateIndexMetadata(testFlintIndex, newMetadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, newMetadata) + + flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe "test_name" + flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 + properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 3 + properties should contain allOf (Entry("metadataCacheVersion", "1.0"), + Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(FlintMetadataCache.mockTableName) + } + + Seq( + ( + "auto refresh index with external scheduler", + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "refresh_interval" -> "10 Minute", + "checkpoint_location" -> "s3a://test/"), + s""" + | { + | "metadataCacheVersion": "1.0", + | "refreshInterval": 600, + | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | } + |""".stripMargin), + ( + "full refresh index", + Map.empty[String, String], + s""" + | { + | "metadataCacheVersion": "1.0", + | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | } + |""".stripMargin), + ( + "incremental refresh index", + Map("incremental_refresh" -> "true", "checkpoint_location" -> "s3a://test/"), + s""" + | { + | "metadataCacheVersion": "1.0", + | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | } + |""".stripMargin)).foreach { case (refreshMode, optionsMap, expectedJson) => + test(s"write metadata cache for $refreshMode") { + withExternalSchedulerEnabled { + withMetadataCacheWriteEnabled { + withTempDir { checkpointDir => + // update checkpoint_location if available in optionsMap + val indexOptions = FlintSparkIndexOptions( + optionsMap + .get("checkpoint_location") + .map(_ => + optionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) + .getOrElse(optionsMap)) + + flint + .skippingIndex() + .onTable(testTable) + .addMinMax("age") + .options(indexOptions, testFlintIndex) + .create() + + var index = flint.describeIndex(testFlintIndex) + index shouldBe defined + val propertiesJson = + compact( + render( + parse( + flintMetadataCacheWriter.serialize( + index.get.metadata())) \ "_meta" \ "properties")) + propertiesJson should matchJson(expectedJson) + + flint.refreshIndex(testFlintIndex) + index = flint.describeIndex(testFlintIndex) + index shouldBe defined + val lastRefreshTime = + compact( + render( + parse( + flintMetadataCacheWriter.serialize( + index.get.metadata())) \ "_meta" \ "properties" \ "lastRefreshTime")).toLong + lastRefreshTime should be > 0L + } + } + } + } + } + + test("write metadata cache for auto refresh index with internal scheduler") { + withMetadataCacheWriteEnabled { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addMinMax("age") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "internal", + "refresh_interval" -> "10 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testFlintIndex) + .create() + + var index = flint.describeIndex(testFlintIndex) + index shouldBe defined + val propertiesJson = + compact( + render(parse( + flintMetadataCacheWriter.serialize(index.get.metadata())) \ "_meta" \ "properties")) + propertiesJson should matchJson(s""" + | { + | "metadataCacheVersion": "1.0", + | "refreshInterval": 600, + | "sourceTables": ["${FlintMetadataCache.mockTableName}"] + | } + |""".stripMargin) + + flint.refreshIndex(testFlintIndex) + index = flint.describeIndex(testFlintIndex) + index shouldBe defined + compact(render(parse( + flintMetadataCacheWriter.serialize( + index.get.metadata())) \ "_meta" \ "properties")) should not include "lastRefreshTime" + } + } + } +}