Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for adding default dimensions in CWSink. #209

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.cloudwatch.model.StatisticSet;
import com.amazonaws.util.StringUtils;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
Expand All @@ -36,6 +35,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +47,8 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,6 +85,16 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter {
// Visible for testing
public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev";

public static final String DIMENSION_JOB_ID = "jobId";

public static final String DIMENSION_APPLICATION_ID = "applicationId";

public static final String DIMENSION_DOMAIN_ID = "domainId";

public static final String DIMENSION_INSTANCE_ROLE = "instanceRole";
Comment on lines +88 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although it's good to provide insights, DIMENSION_JOB_ID / DIMENSION_APPLICATION_ID / DIMENSION_DOMAIN_ID / DIMENSION_INSTANCE_ROLE are actually high cardinality dimensions which considered as seperate metrics. It will bring extra cost setting them as default dimensions. Ref: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_billing.html


public static final String UNKNOWN = "unknown";

/**
* Amazon CloudWatch rejects values that are either too small or too large.
* Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2).
Expand Down Expand Up @@ -339,20 +351,58 @@ private void stageMetricDatum(final boolean metricConfigured,
if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.addAll(dimensionedName.getDimensions());
MetricInfo metricInfo = getMetricInfo(dimensionedName);
dimensions.addAll(metricInfo.getDimensions());
if (shouldAppendDropwizardTypeDimension) {
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue));
}

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withValue(cleanMetricValue(metricValue))
.withMetricName(dimensionedName.getName())
.withMetricName(metricInfo.getMetricName())
.withDimensions(dimensions)
.withUnit(standardUnit));
}
}

private MetricInfo getMetricInfo(DimensionedName dimensionedName) {
final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the cons to add more dimensions in CW?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamsi-amazon @penghuo can we assume "SERVERLESS_EMR_JOB_ID" is isolated at job level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo Didn't get your question?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add more dimensions in the DimensionedName object while publishing the metric.

final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN);
final String domainId = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN);
Copy link
Collaborator

@noCharger noCharger Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noCharger Didn't get you.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vamsi-amazon do we persist cluster name in index meta field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I guess no. What are you trying to get to?

final Dimension jobDimension = new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId);
final Dimension applicationDimension = new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId);
final Dimension domainIdDimension = new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(domainId);
Dimension instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN);
String metricName = dimensionedName.getName();
String[] parts = metricName.split("\\.");
if (doesNameConsistsOfMetricNameSpace(parts)) {
metricName = Stream.of(parts).skip(2).collect(Collectors.joining("."));
//For executors only id is added to the metric name, that's why the numeric check.
//If it is not numeric then the instance is driver.
if (StringUtils.isNumeric(parts[1])) {
instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor" + parts[1]);
}
else {
instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]);
}
}
Set<Dimension> dimensions = new HashSet<>();
dimensions.add(jobDimension);
dimensions.add(applicationDimension);
dimensions.add(instanceRoleDimension);
dimensions.add(domainIdDimension);
dimensions.addAll(dimensionedName.getDimensions());
return new MetricInfo(metricName, dimensions);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured,
final String metricName,
final Snapshot snapshot,
Expand Down Expand Up @@ -478,6 +528,25 @@ public String getDesc() {
}
}

public static class MetricInfo {
private String metricName;
private Set<Dimension> dimensions;

public MetricInfo(String metricName, Set<Dimension> dimensions) {
this.metricName = metricName;
this.dimensions = dimensions;
}

public String getMetricName() {
return metricName;
}

public Set<Dimension> getDimensions() {
return dimensions;
}
}


public static class Builder {

private final String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public static DimensionedName decode(final String encodedDimensionedName) {
if (matcher.find() && matcher.groupCount() == 2) {
final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim());
for (String t : matcher.group(2).split(",")) {
final String[] keyAndValue = t.split(":");
//## acts as a distinct separator.
final String[] keyAndValue = t.split("##");
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim());
}
return builder.build();
Expand Down Expand Up @@ -59,7 +60,7 @@ public synchronized String encode() {
final StringBuilder sb = new StringBuilder(this.name);
sb.append('[');
sb.append(this.dimensions.values().stream()
.map(dimension -> dimension.getName() + ":" + dimension.getValue())
.map(dimension -> dimension.getName() + "##" + dimension.getValue())
.collect(Collectors.joining(",")));
sb.append(']');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -47,12 +49,17 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_DOMAIN_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY;
import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.UNKNOWN;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -104,10 +111,12 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).hasSize(1);
assertThat(dimensions).hasSize(5);
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
assertDefaultDimensionsWithUnknownValue(dimensions);
}


@Test
public void reportedCounterShouldContainExpectedDimension() throws Exception {
metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc();
Expand All @@ -116,6 +125,7 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception {
final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
Expand All @@ -126,6 +136,7 @@ public void reportedGaugeShouldContainExpectedDimension() throws Exception {
final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
Expand Down Expand Up @@ -473,8 +484,59 @@ public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception {
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertDefaultDimensionsWithUnknownValue(dimensions);
}

@Test
public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceDriverMetric() throws Exception {
//setting jobId as unknown to invoke name parsing.
metricRegistry.counter(DimensionedName.withName("unknown.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver")
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();
final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest();
Set<Dimension> dimensions = metricInfo.getDimensions();
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("driver"));
assertThat(metricInfo.getMetricName()).isEqualTo("LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver");
}
@Test
public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric() throws Exception {
//setting jobId as unknown to invoke name parsing.
metricRegistry.counter(DimensionedName.withName("unknown.1.NettyBlockTransfer.shuffle-client.usedDirectMemory")
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();

final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest();
Set<Dimension> dimensions = metricInfo.getDimensions();
assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor1"));
assertThat(metricInfo.getMetricName()).isEqualTo("NettyBlockTransfer.shuffle-client.usedDirectMemory");
}



private void assertDefaultDimensionsWithUnknownValue(List<Dimension> dimensions) {
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN));
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN));
}


private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down Expand Up @@ -504,6 +566,12 @@ private List<Dimension> firstMetricDatumDimensionsFromCapturedRequest() {
return metricDatum.getDimensions();
}

private DimensionedCloudWatchReporter.MetricInfo firstMetricDatumInfoFromCapturedRequest() {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final MetricDatum metricDatum = putMetricDataRequest.getMetricData().get(0);
return new DimensionedCloudWatchReporter.MetricInfo(metricDatum.getMetricName(), new HashSet<>(metricDatum.getDimensions()));
}

private List<Dimension> allDimensionsFromCapturedRequest() {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
public class DimensionedNameTest {
@Test
public void canDecodeDimensionedString() {
final String dimensioned = "test[key1:val1,key2:val2,key3:val3]";
final String dimensioned = "test[key1##val1,key2##val2,key3##val3]";

final DimensionedName dimensionedName = DimensionedName.decode(dimensioned);

Expand All @@ -32,7 +32,7 @@ public void canEncodeDimensionedNameToString() {
.withDimension("key3", "val3")
.build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode());
}

@Test
Expand All @@ -48,8 +48,8 @@ public void canDeriveDimensionedNameFromCurrent() {
.withDimension("key3", "new_value")
.withDimension("key4", "val4").build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]",
Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1##val1,key2##val2,key3##new_value,key4##val4]",
derivedDimensionedName.encode());
}
}
Loading