diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 733b656e7..ef4d01652 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -160,6 +160,11 @@ public final class MetricConstants { */ public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime"; + /** + * Prefix for externalScheduler metrics + */ + public static final String EXTERNAL_SCHEDULER_METRIC_PREFIX = "externalScheduler."; + /** * Metric prefix for tracking the index state transitions */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala index d043746c0..197b2f8c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala @@ -10,6 +10,7 @@ import java.time.Instant import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState import org.opensearch.flint.common.scheduler.AsyncQueryScheduler import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect @@ -83,8 +84,16 @@ class FlintSparkJobExternalSchedulingService( case AsyncQuerySchedulerAction.REMOVE => flintAsyncQueryScheduler.removeJob(request) case _ => throw new IllegalArgumentException(s"Unsupported action: $action") } + addExternalSchedulerMetrics(action) None // Return None for all cases } } + + private def addExternalSchedulerMetrics(action: AsyncQuerySchedulerAction): Unit = { + val actionName = action.name().toLowerCase() + MetricsUtil.addHistoricGauge( + MetricConstants.EXTERNAL_SCHEDULER_METRIC_PREFIX + actionName + ".count", + 1) + } }