From c995ccfba136ba3e0046462b9ba94bd3b5856cf4 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Wed, 30 Oct 2024 21:59:10 -0700 Subject: [PATCH] Add external scheduler metrics (#827) * Add external scheduler metrics Signed-off-by: Tomoyuki Morita * Format code Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita Signed-off-by: Tomoyuki MORITA --- .../opensearch/flint/core/metrics/MetricConstants.java | 5 +++++ .../FlintSparkJobExternalSchedulingService.scala | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java index 733b656e7..ef4d01652 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java @@ -160,6 +160,11 @@ public final class MetricConstants { */ public static final String CHECKPOINT_DELETE_TIME_METRIC = "checkpoint.delete.processingTime"; + /** + * Prefix for externalScheduler metrics + */ + public static final String EXTERNAL_SCHEDULER_METRIC_PREFIX = "externalScheduler."; + /** * Metric prefix for tracking the index state transitions */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala index d043746c0..197b2f8c7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobExternalSchedulingService.scala @@ -10,6 +10,7 @@ import java.time.Instant import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState import org.opensearch.flint.common.scheduler.AsyncQueryScheduler import org.opensearch.flint.common.scheduler.model.{AsyncQuerySchedulerRequest, LangType} +import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.refresh.util.RefreshMetricsAspect @@ -83,8 +84,16 @@ class FlintSparkJobExternalSchedulingService( case AsyncQuerySchedulerAction.REMOVE => flintAsyncQueryScheduler.removeJob(request) case _ => throw new IllegalArgumentException(s"Unsupported action: $action") } + addExternalSchedulerMetrics(action) None // Return None for all cases } } + + private def addExternalSchedulerMetrics(action: AsyncQuerySchedulerAction): Unit = { + val actionName = action.name().toLowerCase() + MetricsUtil.addHistoricGauge( + MetricConstants.EXTERNAL_SCHEDULER_METRIC_PREFIX + actionName + ".count", + 1) + } }