Skip to content

Commit

Permalink
abstract recordHeartbeat method
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Jun 14, 2024
1 parent 9664595 commit 9502ad6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ default <T> OptimisticTransaction<T> startTransaction(String indexName) {
* @return optional metadata log
*/
Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName);

/**
* Record heartbeat timestamp for index streaming job.
*
* @param indexName index name
*/
void recordHeartbeat(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

Expand Down Expand Up @@ -55,6 +56,14 @@ public Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(Str
return getIndexMetadataLog(indexName, false);
}

@Override
public void recordHeartbeat(String indexName) {
startTransaction(indexName)
.initialLog(latest -> latest.state() == IndexState$.MODULE$.REFRESHING())
.finalLog(latest -> latest) // timestamp will update automatically
.commit(latest -> null);
}

private Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName, boolean initIfNotExist) {
LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName);
try (IRestHighLevelClient client = createOpenSearchClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,7 @@ class FlintSparkIndexMonitor(
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService
.startTransaction(indexName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
flintMetadataLogService.recordHeartbeat(indexName)
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
Expand Down

0 comments on commit 9502ad6

Please sign in to comment.