diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index 3ad627a98..4c4cd1694 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -13,7 +13,6 @@ import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.cloudwatch.model.StatisticSet; -import com.amazonaws.util.StringUtils; import com.codahale.metrics.Clock; import com.codahale.metrics.Counter; import com.codahale.metrics.Counting; @@ -36,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -47,6 +47,8 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +85,16 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter { // Visible for testing public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev"; + public static final String DIMENSION_JOB_ID = "jobId"; + + public static final String DIMENSION_APPLICATION_ID = "applicationId"; + + public static final String DIMENSION_DOMAIN_ID = "domainId"; + + public static final String DIMENSION_INSTANCE_ROLE = "instanceRole"; + + public static final String UNKNOWN = "unknown"; + /** * Amazon CloudWatch rejects values that are either too small or too large. * Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2). @@ -339,7 +351,8 @@ private void stageMetricDatum(final boolean metricConfigured, if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) { final DimensionedName dimensionedName = DimensionedName.decode(metricName); final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); - dimensions.addAll(dimensionedName.getDimensions()); + MetricInfo metricInfo = getMetricInfo(dimensionedName); + dimensions.addAll(metricInfo.getDimensions()); if (shouldAppendDropwizardTypeDimension) { dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue)); } @@ -347,12 +360,49 @@ private void stageMetricDatum(final boolean metricConfigured, metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) .withValue(cleanMetricValue(metricValue)) - .withMetricName(dimensionedName.getName()) + .withMetricName(metricInfo.getMetricName()) .withDimensions(dimensions) .withUnit(standardUnit)); } } + private MetricInfo getMetricInfo(DimensionedName dimensionedName) { + final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN); + final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN); + final String domainId = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN); + final Dimension jobDimension = new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId); + final Dimension applicationDimension = new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId); + final Dimension domainIdDimension = new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(domainId); + Dimension instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN); + String metricName = dimensionedName.getName(); + String[] parts = metricName.split("\\."); + if (doesNameConsistsOfMetricNameSpace(parts)) { + metricName = Stream.of(parts).skip(2).collect(Collectors.joining(".")); + //For executors only id is added to the metric name, that's why the numeric check. + //If it is not numeric then the instance is driver. + if (StringUtils.isNumeric(parts[1])) { + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor" + parts[1]); + } + else { + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]); + } + } + Set dimensions = new HashSet<>(); + dimensions.add(jobDimension); + dimensions.add(applicationDimension); + dimensions.add(instanceRoleDimension); + dimensions.add(domainIdDimension); + dimensions.addAll(dimensionedName.getDimensions()); + return new MetricInfo(metricName, dimensions); + } + + // This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137 + // Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName. + private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) { + return metricNameParts.length >= 3 + && (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1])); + } + private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured, final String metricName, final Snapshot snapshot, @@ -478,6 +528,25 @@ public String getDesc() { } } + public static class MetricInfo { + private String metricName; + private Set dimensions; + + public MetricInfo(String metricName, Set dimensions) { + this.metricName = metricName; + this.dimensions = dimensions; + } + + public String getMetricName() { + return metricName; + } + + public Set getDimensions() { + return dimensions; + } + } + + public static class Builder { private final String namespace; diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java index 839e4c28f..da3a446d4 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java @@ -28,7 +28,8 @@ public static DimensionedName decode(final String encodedDimensionedName) { if (matcher.find() && matcher.groupCount() == 2) { final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim()); for (String t : matcher.group(2).split(",")) { - final String[] keyAndValue = t.split(":"); + //## acts as a distinct separator. + final String[] keyAndValue = t.split("##"); builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim()); } return builder.build(); @@ -59,7 +60,7 @@ public synchronized String encode() { final StringBuilder sb = new StringBuilder(this.name); sb.append('['); sb.append(this.dimensions.values().stream() - .map(dimension -> dimension.getName() + ":" + dimension.getValue()) + .map(dimension -> dimension.getName() + "##" + dimension.getValue()) .collect(Collectors.joining(","))); sb.append(']'); diff --git a/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala new file mode 100644 index 000000000..e22a61a51 --- /dev/null +++ b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +class FlintMetricSource(val sourceName: String) extends Source { + override val metricRegistry: MetricRegistry = new MetricRegistry +} diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java index 83df15067..2a875db2d 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -16,6 +16,8 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SlidingWindowReservoir; +import java.util.HashSet; +import java.util.Set; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,12 +49,17 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_DOMAIN_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.UNKNOWN; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -104,10 +111,12 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); - assertThat(dimensions).hasSize(1); + assertThat(dimensions).hasSize(5); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + assertDefaultDimensionsWithUnknownValue(dimensions); } + @Test public void reportedCounterShouldContainExpectedDimension() throws Exception { metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); @@ -116,6 +125,7 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -126,6 +136,7 @@ public void reportedGaugeShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE)); + assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -473,8 +484,59 @@ public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception { assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); + assertDefaultDimensionsWithUnknownValue(dimensions); + } + + @Test + public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceDriverMetric() throws Exception { + //setting jobId as unknown to invoke name parsing. + metricRegistry.counter(DimensionedName.withName("unknown.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver") + .withDimension("key1", "value1") + .withDimension("key2", "value2") + .build().encode()).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); + final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest(); + Set dimensions = metricInfo.getDimensions(); + assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); + assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); + assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("driver")); + assertThat(metricInfo.getMetricName()).isEqualTo("LiveListenerBus.listenerProcessingTime.org.apache.spark.HeartbeatReceiver"); + } + @Test + public void shouldParseDimensionedNamePrefixedWithMetricNameSpaceExecutorMetric() throws Exception { + //setting jobId as unknown to invoke name parsing. + metricRegistry.counter(DimensionedName.withName("unknown.1.NettyBlockTransfer.shuffle-client.usedDirectMemory") + .withDimension("key1", "value1") + .withDimension("key2", "value2") + .build().encode()).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); + + final DimensionedCloudWatchReporter.MetricInfo metricInfo = firstMetricDatumInfoFromCapturedRequest(); + Set dimensions = metricInfo.getDimensions(); + assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); + assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); + assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue( "executor1")); + assertThat(metricInfo.getMetricName()).isEqualTo("NettyBlockTransfer.shuffle-client.usedDirectMemory"); + } + + + + private void assertDefaultDimensionsWithUnknownValue(List dimensions) { + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); } + private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); final List metricData = putMetricDataRequest.getMetricData(); @@ -504,6 +566,12 @@ private List firstMetricDatumDimensionsFromCapturedRequest() { return metricDatum.getDimensions(); } + private DimensionedCloudWatchReporter.MetricInfo firstMetricDatumInfoFromCapturedRequest() { + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final MetricDatum metricDatum = putMetricDataRequest.getMetricData().get(0); + return new DimensionedCloudWatchReporter.MetricInfo(metricDatum.getMetricName(), new HashSet<>(metricDatum.getDimensions())); + } + private List allDimensionsFromCapturedRequest() { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); final List metricData = putMetricDataRequest.getMetricData(); diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java index dc3bd2a9c..d6145545d 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java @@ -11,7 +11,7 @@ public class DimensionedNameTest { @Test public void canDecodeDimensionedString() { - final String dimensioned = "test[key1:val1,key2:val2,key3:val3]"; + final String dimensioned = "test[key1##val1,key2##val2,key3##val3]"; final DimensionedName dimensionedName = DimensionedName.decode(dimensioned); @@ -32,7 +32,7 @@ public void canEncodeDimensionedNameToString() { .withDimension("key3", "val3") .build(); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); + Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode()); } @Test @@ -48,8 +48,8 @@ public void canDeriveDimensionedNameFromCurrent() { .withDimension("key3", "new_value") .withDimension("key4", "val4").build(); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]", + Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode()); + Assertions.assertEquals("test[key1##val1,key2##val2,key3##new_value,key4##val4]", derivedDimensionedName.encode()); } } \ No newline at end of file