Skip to content

Commit

Permalink
Add flint opensearch metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Jan 16, 2024
1 parent 9061eb9 commit 5b74a0a
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.cloudwatch.model.StatisticSet;
import com.amazonaws.util.StringUtils;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
Expand All @@ -36,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +47,8 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,6 +85,16 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter {
// Visible for testing
public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev";

public static final String DIMENSION_JOB_ID = "jobId";

public static final String DIMENSION_APPLICATION_ID = "applicationId";

public static final String DIMENSION_DOMAIN_ID = "domainId";

public static final String DIMENSION_INSTANCE_ROLE = "instanceRole";

public static final String UNKNOWN = "unknown";

/**
* Amazon CloudWatch rejects values that are either too small or too large.
* Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2).
Expand Down Expand Up @@ -339,20 +351,58 @@ private void stageMetricDatum(final boolean metricConfigured,
if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.addAll(dimensionedName.getDimensions());
MetricInfo metricInfo = getMetricInfo(dimensionedName);
dimensions.addAll(metricInfo.getDimensions());
if (shouldAppendDropwizardTypeDimension) {
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue));
}

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withValue(cleanMetricValue(metricValue))
.withMetricName(dimensionedName.getName())
.withMetricName(metricInfo.getMetricName())
.withDimensions(dimensions)
.withUnit(standardUnit));
}
}

private MetricInfo getMetricInfo(DimensionedName dimensionedName) {
final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN);
final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN);
final String domainId = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN);
final Dimension jobDimension = new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId);
final Dimension applicationDimension = new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId);
final Dimension domainIdDimension = new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(domainId);
Dimension instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN);
String metricName = dimensionedName.getName();
String[] parts = metricName.split("\\.");
if (doesNameConsistsOfMetricNameSpace(parts)) {
metricName = Stream.of(parts).skip(2).collect(Collectors.joining("."));
//For executors only id is added to the metric name, that's why the numeric check.
//If it is not numeric then the instance is driver.
if (StringUtils.isNumeric(parts[1])) {
instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor" + parts[1]);
}
else {
instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]);
}
}
Set<Dimension> dimensions = new HashSet<>();
dimensions.add(jobDimension);
dimensions.add(applicationDimension);
dimensions.add(instanceRoleDimension);
dimensions.add(domainIdDimension);
dimensions.addAll(dimensionedName.getDimensions());
return new MetricInfo(metricName, dimensions);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured,
final String metricName,
final Snapshot snapshot,
Expand Down Expand Up @@ -478,6 +528,25 @@ public String getDesc() {
}
}

public static class MetricInfo {
private String metricName;
private Set<Dimension> dimensions;

public MetricInfo(String metricName, Set<Dimension> dimensions) {
this.metricName = metricName;
this.dimensions = dimensions;
}

public String getMetricName() {
return metricName;
}

public Set<Dimension> getDimensions() {
return dimensions;
}
}


public static class Builder {

private final String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public static DimensionedName decode(final String encodedDimensionedName) {
if (matcher.find() && matcher.groupCount() == 2) {
final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim());
for (String t : matcher.group(2).split(",")) {
final String[] keyAndValue = t.split(":");
//## acts as a distinct separator.
final String[] keyAndValue = t.split("##");
builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim());
}
return builder.build();
Expand Down Expand Up @@ -59,7 +60,7 @@ public synchronized String encode() {
final StringBuilder sb = new StringBuilder(this.name);
sb.append('[');
sb.append(this.dimensions.values().stream()
.map(dimension -> dimension.getName() + ":" + dimension.getValue())
.map(dimension -> dimension.getName() + "##" + dimension.getValue())
.collect(Collectors.joining(",")));
sb.append(']');

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry

class FlintMetricSource(val sourceName: String) extends Source {
override val metricRegistry: MetricRegistry = new MetricRegistry
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -47,12 +49,17 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_DOMAIN_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.UNKNOWN;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -104,10 +111,12 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).hasSize(1);
assertThat(dimensions).hasSize(5);
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
assertDefaultDimensionsWithUnknownValue(dimensions);
}


@Test
public void reportedCounterShouldContainExpectedDimension() throws Exception {
metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc();
Expand All @@ -116,6 +125,7 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception {
final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
Expand All @@ -126,6 +136,7 @@ public void reportedGaugeShouldContainExpectedDimension() throws Exception {
final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
Expand Down Expand Up @@ -473,8 +484,59 @@ public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception {
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceDriverMetric() throws Exception {
//setting jobId as unknown to invoke name parsing.
metricRegistry.counter(DimensionedName.withName("unknown.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver")
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();
final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest();
Set<Dimension> dimensions = metricInfo.getDimensions();
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("driver"));
assertThat(metricInfo.getMetricName()).isEqualTo("LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver");
}
@Test
public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric() throws Exception {
//setting jobId as unknown to invoke name parsing.
metricRegistry.counter(DimensionedName.withName("unknown.1.NettyBlockTransfer.shuffle-client.usedDirectMemory")
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();

final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest();
Set<Dimension> dimensions = metricInfo.getDimensions();
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor1"));
assertThat(metricInfo.getMetricName()).isEqualTo("NettyBlockTransfer.shuffle-client.usedDirectMemory");
}



private void assertDefaultDimensionsWithUnknownValue(List<Dimension> dimensions) {
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
}


private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down Expand Up @@ -504,6 +566,12 @@ private List<Dimension> firstMetricDatumDimensionsFromCapturedRequest() {
return metricDatum.getDimensions();
}

private DimensionedCloudWatchReporter.MetricInfo firstMetricDatumInfoFromCapturedRequest() {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final MetricDatum metricDatum = putMetricDataRequest.getMetricData().get(0);
return new DimensionedCloudWatchReporter.MetricInfo(metricDatum.getMetricName(), new HashSet<>(metricDatum.getDimensions()));
}

private List<Dimension> allDimensionsFromCapturedRequest() {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public class DimensionedNameTest {
@Test
public void canDecodeDimensionedString() {
final String dimensioned = "test[key1:val1,key2:val2,key3:val3]";
final String dimensioned = "test[key1##val1,key2##val2,key3##val3]";

final DimensionedName dimensionedName = DimensionedName.decode(dimensioned);

Expand All @@ -32,7 +32,7 @@ public void canEncodeDimensionedNameToString() {
.withDimension("key3", "val3")
.build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode());
}

@Test
Expand All @@ -48,8 +48,8 @@ public void canDeriveDimensionedNameFromCurrent() {
.withDimension("key3", "new_value")
.withDimension("key4", "val4").build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]",
Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1##val1,key2##val2,key3##new_value,key4##val4]",
derivedDimensionedName.encode());
}
}

0 comments on commit 5b74a0a

Please sign in to comment.