diff --git a/build.sbt b/build.sbt index ccb735ae0..8e2ef4fb5 100644 --- a/build.sbt +++ b/build.sbt @@ -59,6 +59,8 @@ lazy val flintCore = (project in file("flint-core")) exclude ("org.apache.logging.log4j", "log4j-api"), "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593" + exclude("com.fasterxml.jackson.core", "jackson-databind"), "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", diff --git a/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java index 293a05d4a..b69f0e4d0 100644 --- a/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java +++ b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java @@ -21,8 +21,10 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.spark.SecurityManager; import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; +import org.opensearch.flint.core.metrics.reporter.DimensionedName; import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException; /** @@ -185,6 +187,17 @@ public CloudWatchSink( shouldAppendDropwizardTypeDimension = PropertyDefaults.SHOULD_PARSE_INLINE_DIMENSIONS; } + final Optional metricFilterRegex = getProperty( + properties, + PropertyKeys.METRIC_FILTER_REGEX); + MetricFilter metricFilter; + if (metricFilterRegex.isPresent()) { + Pattern pattern = Pattern.compile(metricFilterRegex.get()); + metricFilter = (name, metric) -> pattern.matcher(DimensionedName.decode(name).getName()).find(); + } else { + metricFilter = MetricFilter.ALL; + } + final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder() .withCredentials(awsCredentialsProvider) .withRegion(awsRegion) @@ -193,7 +206,7 @@ public CloudWatchSink( this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get()) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) - .filter(MetricFilter.ALL) + .filter(metricFilter) .withPercentiles( DimensionedCloudWatchReporter.Percentile.P50, DimensionedCloudWatchReporter.Percentile.P75, @@ -248,6 +261,7 @@ private static class PropertyKeys { static final String POLLING_TIME_UNIT = "pollingTimeUnit"; static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions"; static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension"; + static final String METRIC_FILTER_REGEX = "regex"; } /** diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java index 450fe0d0d..3ad627a98 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -110,6 +110,7 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter { private final StandardUnit durationUnit; private final boolean shouldParseDimensionsFromName; private final boolean shouldAppendDropwizardTypeDimension; + private MetricFilter filter; private DimensionedCloudWatchReporter(final Builder builder) { super(builder.metricRegistry, "coda-hale-metrics-cloud-watch-reporter", builder.metricFilter, builder.rateUnit, builder.durationUnit); @@ -121,6 +122,7 @@ private DimensionedCloudWatchReporter(final Builder builder) { this.durationUnit = builder.cwDurationUnit; this.shouldParseDimensionsFromName = builder.withShouldParseDimensionsFromName; this.shouldAppendDropwizardTypeDimension = builder.withShouldAppendDropwizardTypeDimension; + this.filter = MetricFilter.ALL; } @Override @@ -335,31 +337,9 @@ private void stageMetricDatum(final boolean metricConfigured, final List metricData) { // Only submit metrics that show some data, so let's save some money if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) { + final DimensionedName dimensionedName = DimensionedName.decode(metricName); final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); - final String name; - if (shouldParseDimensionsFromName) { - final String[] nameParts = metricName.split(" "); - final StringBuilder nameBuilder = new StringBuilder(nameParts[0]); - int i = 1; - for (; i < nameParts.length; ++i) { - final String[] dimensionParts = nameParts[i].split("="); - if (dimensionParts.length == 2 - && !StringUtils.isNullOrEmpty(dimensionParts[0]) - && !StringUtils.isNullOrEmpty(dimensionParts[1])) { - final Dimension dimension = new Dimension(); - dimension.withName(dimensionParts[0]); - dimension.withValue(dimensionParts[1]); - dimensions.add(dimension); - } else { - nameBuilder.append(" "); - nameBuilder.append(nameParts[i]); - } - } - name = nameBuilder.toString(); - } else { - name = metricName; - } - + dimensions.addAll(dimensionedName.getDimensions()); if (shouldAppendDropwizardTypeDimension) { dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue)); } @@ -367,7 +347,7 @@ private void stageMetricDatum(final boolean metricConfigured, metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) .withValue(cleanMetricValue(metricValue)) - .withMetricName(name) + .withMetricName(dimensionedName.getName()) .withDimensions(dimensions) .withUnit(standardUnit)); } @@ -379,6 +359,7 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure final StandardUnit standardUnit, final List metricData) { if (metricConfigured) { + final DimensionedName dimensionedName = DimensionedName.decode(metricName); double scaledSum = convertDuration(LongStream.of(snapshot.getValues()).sum()); final StatisticSet statisticSet = new StatisticSet() .withSum(scaledSum) @@ -388,10 +369,11 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); + dimensions.addAll(dimensionedName.getDimensions()); metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) - .withMetricName(metricName) + .withMetricName(dimensionedName.getName()) .withDimensions(dimensions) .withStatisticValues(statisticSet) .withUnit(standardUnit)); @@ -404,6 +386,7 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured, final StandardUnit standardUnit, final List metricData) { if (metricConfigured) { + final DimensionedName dimensionedName = DimensionedName.decode(metricName); double total = LongStream.of(snapshot.getValues()).sum(); final StatisticSet statisticSet = new StatisticSet() .withSum(total) @@ -413,10 +396,11 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured, final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); + dimensions.addAll(dimensionedName.getDimensions()); metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) - .withMetricName(metricName) + .withMetricName(dimensionedName.getName()) .withDimensions(dimensions) .withStatisticValues(statisticSet) .withUnit(standardUnit)); diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java new file mode 100644 index 000000000..839e4c28f --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedName.java @@ -0,0 +1,96 @@ +package org.opensearch.flint.core.metrics.reporter; + +import com.amazonaws.services.cloudwatch.model.Dimension; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class DimensionedName { + private static final Pattern dimensionPattern = Pattern.compile("([\\w.-]+)\\[([\\w\\W]+)]"); + private final String name; + private final Map dimensions; + + private String encoded; + + DimensionedName(final String name, final Map dimensions) { + this.name = name; + this.dimensions = Collections.unmodifiableMap(dimensions); + } + + public static DimensionedName decode(final String encodedDimensionedName) { + final Matcher matcher = dimensionPattern.matcher(encodedDimensionedName); + if (matcher.find() && matcher.groupCount() == 2) { + final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim()); + for (String t : matcher.group(2).split(",")) { + final String[] keyAndValue = t.split(":"); + builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim()); + } + return builder.build(); + } else { + return new DimensionedNameBuilder(encodedDimensionedName).build(); + } + } + + public static DimensionedNameBuilder withName(String name) { + return new DimensionedNameBuilder(name); + } + + public DimensionedNameBuilder withDimension(final String name, final String value) { + return new DimensionedNameBuilder(this.name, new HashMap<>(this.dimensions)).withDimension(name, value); + } + + public String getName() { + return name; + } + + public Set getDimensions() { + return new HashSet<>(dimensions.values()); + } + + public synchronized String encode() { + if (this.encoded == null) { + if (!dimensions.isEmpty()) { + final StringBuilder sb = new StringBuilder(this.name); + sb.append('['); + sb.append(this.dimensions.values().stream() + .map(dimension -> dimension.getName() + ":" + dimension.getValue()) + .collect(Collectors.joining(","))); + sb.append(']'); + + this.encoded = sb.toString(); + } else { + this.encoded = this.name; + } + } + return this.encoded; + } + + @Override + public String toString() { + return this.encode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final DimensionedName that = (DimensionedName) o; + return Objects.equals(name, that.name) && + Objects.equals(dimensions, that.dimensions); + } + + @Override + public int hashCode() { + return Objects.hash(name, dimensions); + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameBuilder.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameBuilder.java new file mode 100644 index 000000000..603c58f95 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedNameBuilder.java @@ -0,0 +1,28 @@ +package org.opensearch.flint.core.metrics.reporter; + +import com.amazonaws.services.cloudwatch.model.Dimension; +import java.util.HashMap; +import java.util.Map; + +public class DimensionedNameBuilder { + private final String name; + private Map dimensions; + + DimensionedNameBuilder(final String name) { + this(name, new HashMap<>()); + } + + DimensionedNameBuilder(final String name, final Map dimensions) { + this.name = name; + this.dimensions = dimensions; + } + + public DimensionedName build() { + return new DimensionedName(this.name, this.dimensions); + } + + public DimensionedNameBuilder withDimension(final String name, final String value) { + this.dimensions.put(name, new Dimension().withName(name).withValue(value)); + return this; + } +} diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java index 991fd78b4..83df15067 100644 --- a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -35,6 +35,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; +import org.opensearch.flint.core.metrics.reporter.DimensionedName; import static com.amazonaws.services.cloudwatch.model.StandardUnit.Count; import static com.amazonaws.services.cloudwatch.model.StandardUnit.Microseconds; @@ -117,19 +118,6 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception { assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); } - @Test - public void reportedCounterShouldContainDimensionEmbeddedInName() throws Exception { - final String DIMENSION_NAME = "some_dimension"; - final String DIMENSION_VALUE = "some_value"; - - metricRegistry.counter(ARBITRARY_COUNTER_NAME + " " + DIMENSION_NAME + "=" + DIMENSION_VALUE).inc(); - reporterBuilder.withShouldParseDimensionsFromName(true).build().report(); - - final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); - - assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME).withValue(DIMENSION_VALUE)); - } - @Test public void reportedGaugeShouldContainExpectedDimension() throws Exception { metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge) () -> 1L); @@ -472,6 +460,21 @@ public void shouldReportHistogramSubsequentSnapshotValues_SumMaxMinValues() thro } + @Test + public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception { + metricRegistry.counter(DimensionedName.withName(ARBITRARY_COUNTER_NAME) + .withDimension("key1", "value1") + .withDimension("key2", "value2") + .build().encode()).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); + assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1")); + assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2")); + } + private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) { final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); final List metricData = putMetricDataRequest.getMetricData(); diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java new file mode 100644 index 000000000..dc3bd2a9c --- /dev/null +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedNameTest.java @@ -0,0 +1,55 @@ +package opensearch.flint.core.metrics.reporter; + +import static org.hamcrest.CoreMatchers.hasItems; + +import com.amazonaws.services.cloudwatch.model.Dimension; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.flint.core.metrics.reporter.DimensionedName; + +public class DimensionedNameTest { + @Test + public void canDecodeDimensionedString() { + final String dimensioned = "test[key1:val1,key2:val2,key3:val3]"; + + final DimensionedName dimensionedName = DimensionedName.decode(dimensioned); + + Assertions.assertEquals("test", dimensionedName.getName()); + Assertions.assertEquals(3, dimensionedName.getDimensions().size()); + + MatcherAssert.assertThat(dimensionedName.getDimensions(), hasItems( + new Dimension().withName("key1").withValue("val1"), + new Dimension().withName("key2").withValue("val2"), + new Dimension().withName("key3").withValue("val3"))); + } + + @Test + public void canEncodeDimensionedNameToString() { + final DimensionedName dimensionedName = DimensionedName.withName("test") + .withDimension("key1", "val1") + .withDimension("key2", "val2") + .withDimension("key3", "val3") + .build(); + + Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); + } + + @Test + public void canDeriveDimensionedNameFromCurrent() { + final DimensionedName dimensionedName = DimensionedName.withName("test") + .withDimension("key1", "val1") + .withDimension("key2", "val2") + .withDimension("key3", "val3") + .build(); + + + final DimensionedName derivedDimensionedName = dimensionedName + .withDimension("key3", "new_value") + .withDimension("key4", "val4").build(); + + Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode()); + Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]", + derivedDimensionedName.encode()); + } +} \ No newline at end of file