Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.6] Add metrics for successful/failed Spark index creation #917

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ public final class MetricConstants {
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

/**
* Metric group related to skipping indices, such as create success and failure
*/
public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping";

/**
* Metric group related to covering indices, such as create success and failure
*/
public static final String CREATE_COVERING_INDICES = "query.execution.index.covering";

/**
* Metric group related to materialized view indices, such as create success and failure
*/
public static final String CREATE_MV_INDICES = "query.execution.index.mv";

/**
* Metric for tracking the latency of checkpoint deletion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;
import scala.Option;

import java.io.IOException;
Expand All @@ -40,7 +42,13 @@ public FlintOpenSearchClient(FlintOptions options) {
@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
try {
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
emitIndexCreationSuccessMetric(metadata.kind());
} catch (IllegalStateException ex) {
emitIndexCreationFailureMetric(metadata.kind());
throw ex;
}
}

protected void createIndex(String indexName, String mapping, Option<String> settings) {
Expand Down Expand Up @@ -122,4 +130,28 @@ public IRestHighLevelClient createClient() {
private String sanitizeIndexName(String indexName) {
return OpenSearchClientUtils.sanitizeIndexName(indexName);
}

private void emitIndexCreationSuccessMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "success");
}

private void emitIndexCreationFailureMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "failed");
}

private void emitIndexCreationMetric(String indexKind, String status) {
switch (indexKind) {
case "skipping":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_SKIPPING_INDICES, status), 1);
break;
case "covering":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_COVERING_INDICES, status), 1);
break;
case "mv":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_MV_INDICES, status), 1);
break;
default:
break;
}
}
}
Loading