diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java index 1c6f42a6a..a356a456f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java @@ -39,4 +39,11 @@ default OptimisticTransaction startTransaction(String indexName) { * @return optional metadata log */ Optional> getIndexMetadataLog(String indexName); + + /** + * Record heartbeat timestamp for index streaming job. + * + * @param indexName index name + */ + void recordHeartbeat(String indexName); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index a49e45829..15b294c8b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -17,6 +17,7 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; @@ -55,6 +56,14 @@ public Optional> getIndexMetadataLog(Str return getIndexMetadataLog(indexName, false); } + @Override + public void recordHeartbeat(String indexName) { + startTransaction(indexName) + .initialLog(latest -> latest.state() == IndexState$.MODULE$.REFRESHING()) + .finalLog(latest -> latest) // timestamp will update automatically + .commit(latest -> null); + } + private Optional> getIndexMetadataLog(String indexName, boolean initIfNotExist) { LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName); try (IRestHighLevelClient client = createOpenSearchClient()) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 34cae88b5..815dfa71a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -156,11 +156,7 @@ class FlintSparkIndexMonitor( try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") - flintMetadataLogService - .startTransaction(indexName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(_ => {}) + flintMetadataLogService.recordHeartbeat(indexName) } else { logError("Streaming job is not active. Cancelling monitor task") stopMonitor(indexName)