diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index 3ad627a98..86137892a 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -13,7 +13,6 @@ import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.cloudwatch.model.StatisticSet; -import com.amazonaws.util.StringUtils; import com.codahale.metrics.Clock; import com.codahale.metrics.Counter; import com.codahale.metrics.Counting; @@ -36,6 +35,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -47,6 +47,8 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +85,16 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter { // Visible for testing public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev"; + public static final String DIMENSION_JOB_ID = "jobId"; + + public static final String DIMENSION_APPLICATION_ID = "applicationId"; + + public static final String DIMENSION_DOMAIN_ID = "domainId"; + + public static final String DIMENSION_INSTANCE_ROLE = "instanceRole"; + + public static final String UNKNOWN = "unknown"; + /** * Amazon CloudWatch rejects values that are either too small or too large. * Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2). @@ -132,10 +144,10 @@ public void report(final SortedMap gauges, final SortedMap meters, final SortedMap timers) { + LOGGER.error("Publishing metrics : {}", new Date().getTime()); if (builder.withDryRun) { LOGGER.warn("** Reporter is running in 'DRY RUN' mode **"); } - try { final List metricData = new ArrayList<>( gauges.size() + counters.size() + 10 * histograms.size() + 10 * timers.size()); @@ -177,6 +189,7 @@ public void report(final SortedMap gauges, LOGGER.debug("Dry run - constructed PutMetricDataRequest: {}", putMetricDataRequest); } } else { + LOGGER.error("Dry run - constructed PutMetricDataRequest: {}", putMetricDataRequest); cloudWatchFutures.add(cloudWatchAsyncClient.putMetricDataAsync(putMetricDataRequest)); } } @@ -335,10 +348,15 @@ private void stageMetricDatum(final boolean metricConfigured, final StandardUnit standardUnit, final String dimensionValue, final List metricData) { + if (metricName.contains("numberAllExecutors")) { + LOGGER.error("Number all executors : {} ", metricValue); + } // Only submit metrics that show some data, so let's save some money if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) { final DimensionedName dimensionedName = DimensionedName.decode(metricName); final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); + Pair> finalNameAndDefaultDimensions = getFinalMetricNameAndDefaultDimensions(dimensionedName); + dimensions.addAll(finalNameAndDefaultDimensions.getRight()); dimensions.addAll(dimensionedName.getDimensions()); if (shouldAppendDropwizardTypeDimension) { dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue)); @@ -347,12 +365,45 @@ private void stageMetricDatum(final boolean metricConfigured, metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) .withValue(cleanMetricValue(metricValue)) - .withMetricName(dimensionedName.getName()) + .withMetricName(finalNameAndDefaultDimensions.getLeft()) .withDimensions(dimensions) .withUnit(standardUnit)); } } + private Pair> getFinalMetricNameAndDefaultDimensions(DimensionedName dimensionedName) { + final String jobId = System.getenv().getOrDefault("SERVERLESS_EMR_JOB_ID", UNKNOWN); + final String applicationId = System.getenv().getOrDefault("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", UNKNOWN); + final String domainId = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN); + final Dimension jobDimension = new Dimension().withName(DIMENSION_JOB_ID).withValue(jobId); + final Dimension applicationDimension = new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(applicationId); + final Dimension domainIdDimension = new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(domainId); + Dimension instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN); + String metricName = dimensionedName.getName(); + String[] parts = metricName.split("\\."); + if (doesNameConsistsOfMetricNameSpace(parts, jobId)) { + metricName = Stream.of(parts).skip(2).collect(Collectors.joining(".")); + //For executors only id is added to the metric name, thats why the numeric check. + //If it is not numeric then the instance is driver. + if (StringUtils.isNumeric(parts[1])) { + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue("executor" + parts[1]); + } + else { + instanceRoleDimension = new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(parts[1]); + } + } + Set dimensions = new HashSet<>(); + dimensions.add(jobDimension); + dimensions.add(applicationDimension); + dimensions.add(instanceRoleDimension); + dimensions.add(domainIdDimension); + return Pair.of(metricName, dimensions); + } + + private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts, String jobId) { + return metricNameParts[0].equals(jobId); + } + private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured, final String metricName, final Snapshot snapshot, diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java index 839e4c28f..da3a446d4 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java @@ -28,7 +28,8 @@ public static DimensionedName decode(final String encodedDimensionedName) { if (matcher.find() && matcher.groupCount() == 2) { final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim()); for (String t : matcher.group(2).split(",")) { - final String[] keyAndValue = t.split(":"); + //## acts as a distinct separator. + final String[] keyAndValue = t.split("##"); builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim()); } return builder.build(); @@ -59,7 +60,7 @@ public synchronized String encode() { final StringBuilder sb = new StringBuilder(this.name); sb.append('['); sb.append(this.dimensions.values().stream() - .map(dimension -> dimension.getName() + ":" + dimension.getValue()) + .map(dimension -> dimension.getName() + "##" + dimension.getValue()) .collect(Collectors.joining(","))); sb.append(']'); diff --git a/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala new file mode 100644 index 000000000..e22a61a51 --- /dev/null +++ b/flint-core/src/main/scala/org/apache/spark/metrics/source/FlintMetricSource.scala @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +class FlintMetricSource(val sourceName: String) extends Source { + override val metricRegistry: MetricRegistry = new MetricRegistry +} diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java index 83df15067..a658ec140 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -47,12 +47,17 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_APPLICATION_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_DOMAIN_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_INSTANCE_ROLE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_JOB_ID; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV; import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.UNKNOWN; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -104,10 +109,12 @@ public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); - assertThat(dimensions).hasSize(1); + assertThat(dimensions).hasSize(5); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + assertDefaultDimensionsWithUnknownValue(dimensions); } + @Test public void reportedCounterShouldContainExpectedDimension() throws Exception { metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); @@ -116,6 +123,7 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -126,6 +134,7 @@ public void reportedGaugeShouldContainExpectedDimension() throws Exception { final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE)); + assertDefaultDimensionsWithUnknownValue(dimensions); } @Test @@ -475,6 +484,15 @@ public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception { assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); } + + private void assertDefaultDimensionsWithUnknownValue(List dimensions) { + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_JOB_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_DOMAIN_ID).withValue(UNKNOWN)); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_APPLICATION_ID).withValue(UNKNOWN)); + } + + private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); final List metricData = putMetricDataRequest.getMetricData(); diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java index dc3bd2a9c..d6145545d 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java @@ -11,7 +11,7 @@ public class DimensionedNameTest { @Test public void canDecodeDimensionedString() { - final String dimensioned = "test[key1:val1,key2:val2,key3:val3]"; + final String dimensioned = "test[key1##val1,key2##val2,key3##val3]"; final DimensionedName dimensionedName = DimensionedName.decode(dimensioned); @@ -32,7 +32,7 @@ public void canEncodeDimensionedNameToString() { .withDimension("key3", "val3") .build(); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); + Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode()); } @Test @@ -48,8 +48,8 @@ public void canDeriveDimensionedNameFromCurrent() { .withDimension("key3", "new_value") .withDimension("key4", "val4").build(); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); - Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]", + Assertions.assertEquals("test[key1##val1,key2##val2,key3##val3]", dimensionedName.encode()); + Assertions.assertEquals("test[key1##val1,key2##val2,key3##new_value,key4##val4]", derivedDimensionedName.encode()); } } \ No newline at end of file