Skip to content

Commit

Permalink
Add index creation metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Simeon Widdis <[email protected]>
  • Loading branch information
Swiddis committed Oct 30, 2024
1 parent 7bc0927 commit b02e574
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,21 @@ public final class MetricConstants {
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

/**
* Metric group related to skipping indices, such as create success and failure
*/
public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping";

/**
* Metric group related to covering indices, such as create success and failure
*/
public static final String CREATE_COVERING_INDICES = "query.execution.index.covering";

/**
* Metric group related to materialized view indices, such as create success and failure
*/
public static final String CREATE_MV_INDICES = "query.execution.index.mv";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand All @@ -26,6 +28,18 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.internal.Logging

def emitIndexCreationStatusMetric(metadata: FlintMetadata, success: Boolean): Unit = {
val successSuffix = if (success) ".create_success" else ".create_failed"
metadata.kind match {
case SKIPPING_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(MetricConstants.CREATE_SKIPPING_INDICES + successSuffix + ".count", 1)
case COVERING_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(MetricConstants.CREATE_COVERING_INDICES + successSuffix + ".count", 1)
case MV_INDEX_TYPE =>
MetricsUtil.addHistoricGauge(MetricConstants.CREATE_MV_INDICES + successSuffix + ".count", 1)
}
}

/**
* Flint Spark index factory that encapsulates specific Flint index instance creation. This is for
* internal code use instead of user facing API.
Expand All @@ -42,9 +56,12 @@ object FlintSparkIndexFactory extends Logging {
*/
def create(metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(metadata))
val result = doCreate(metadata)
emitIndexCreationStatusMetric(metadata, success=true)
Some(result)
} catch {
case e: Exception =>
emitIndexCreationStatusMetric(metadata, success=false)
logWarning(s"Failed to create Flint index from metadata $metadata", e)
None
}
Expand Down

0 comments on commit b02e574

Please sign in to comment.