Skip to content

Commit

Permalink
Add metrics for successful/failed Spark index creation (#837) (#917)
Browse files Browse the repository at this point in the history
* Add index creation metrics



* Update emit metric function location



* Apply scalafmt



* revert FlintSparkIndexFactory metrics emission



* Update OS client to emit metrics



* Fix index kind detection



* Remove cumbersome with() method



* Revert the void change



* Make the code branchless



* Add default case to switches



* Apply PR feedback



---------


(cherry picked from commit 6f37b2d)

Signed-off-by: Simeon Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent fb82f43 commit 2a78109
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,21 @@ public final class MetricConstants {
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

/**
* Metric group related to skipping indices, such as create success and failure
*/
public static final String CREATE_SKIPPING_INDICES = "query.execution.index.skipping";

/**
* Metric group related to covering indices, such as create success and failure
*/
public static final String CREATE_COVERING_INDICES = "query.execution.index.covering";

/**
* Metric group related to materialized view indices, such as create success and failure
*/
public static final String CREATE_MV_INDICES = "query.execution.index.mv";

/**
* Metric for tracking the latency of checkpoint deletion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.flint.core.metrics.MetricConstants;
import org.opensearch.flint.core.metrics.MetricsUtil;
import scala.Option;

import java.io.IOException;
Expand All @@ -40,7 +42,13 @@ public FlintOpenSearchClient(FlintOptions options) {
@Override
public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
try {
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
emitIndexCreationSuccessMetric(metadata.kind());
} catch (IllegalStateException ex) {
emitIndexCreationFailureMetric(metadata.kind());
throw ex;
}
}

protected void createIndex(String indexName, String mapping, Option<String> settings) {
Expand Down Expand Up @@ -122,4 +130,28 @@ public IRestHighLevelClient createClient() {
private String sanitizeIndexName(String indexName) {
return OpenSearchClientUtils.sanitizeIndexName(indexName);
}

private void emitIndexCreationSuccessMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "success");
}

private void emitIndexCreationFailureMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "failed");
}

private void emitIndexCreationMetric(String indexKind, String status) {
switch (indexKind) {
case "skipping":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_SKIPPING_INDICES, status), 1);
break;
case "covering":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_COVERING_INDICES, status), 1);
break;
case "mv":
MetricsUtil.addHistoricGauge(String.format("%s.%s.count", MetricConstants.CREATE_MV_INDICES, status), 1);
break;
default:
break;
}
}
}

0 comments on commit 2a78109

Please sign in to comment.