Skip to content

Commit

Permalink
Add dimensions to cloudwatch metric and metric filter option for CWSink
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Nov 22, 2023
1 parent be82024 commit 5db289f
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 41 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("org.apache.logging.log4j", "log4j-api"),
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.spark.SecurityManager;
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;

/**
Expand Down Expand Up @@ -185,6 +187,17 @@ public CloudWatchSink(
shouldAppendDropwizardTypeDimension = PropertyDefaults.SHOULD_PARSE_INLINE_DIMENSIONS;
}

final Optional<String> metricFilterRegex = getProperty(
properties,
PropertyKeys.METRIC_FILTER_REGEX);
MetricFilter metricFilter;
if (metricFilterRegex.isPresent()) {
Pattern pattern = Pattern.compile(metricFilterRegex.get());
metricFilter = (name, metric) -> pattern.matcher(DimensionedName.decode(name).getName()).find();
} else {
metricFilter = MetricFilter.ALL;
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
Expand All @@ -193,7 +206,7 @@ public CloudWatchSink(
this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.filter(metricFilter)
.withPercentiles(
DimensionedCloudWatchReporter.Percentile.P50,
DimensionedCloudWatchReporter.Percentile.P75,
Expand Down Expand Up @@ -248,6 +261,7 @@ private static class PropertyKeys {
static final String POLLING_TIME_UNIT = "pollingTimeUnit";
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter {
private final StandardUnit durationUnit;
private final boolean shouldParseDimensionsFromName;
private final boolean shouldAppendDropwizardTypeDimension;
private MetricFilter filter;

private DimensionedCloudWatchReporter(final Builder builder) {
super(builder.metricRegistry, "coda-hale-metrics-cloud-watch-reporter", builder.metricFilter, builder.rateUnit, builder.durationUnit);
Expand All @@ -121,6 +122,7 @@ private DimensionedCloudWatchReporter(final Builder builder) {
this.durationUnit = builder.cwDurationUnit;
this.shouldParseDimensionsFromName = builder.withShouldParseDimensionsFromName;
this.shouldAppendDropwizardTypeDimension = builder.withShouldAppendDropwizardTypeDimension;
this.filter = MetricFilter.ALL;
}

@Override
Expand Down Expand Up @@ -335,39 +337,17 @@ private void stageMetricDatum(final boolean metricConfigured,
final List<MetricDatum> metricData) {
// Only submit metrics that show some data, so let's save some money
if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
final String name;
if (shouldParseDimensionsFromName) {
final String[] nameParts = metricName.split(" ");
final StringBuilder nameBuilder = new StringBuilder(nameParts[0]);
int i = 1;
for (; i < nameParts.length; ++i) {
final String[] dimensionParts = nameParts[i].split("=");
if (dimensionParts.length == 2
&& !StringUtils.isNullOrEmpty(dimensionParts[0])
&& !StringUtils.isNullOrEmpty(dimensionParts[1])) {
final Dimension dimension = new Dimension();
dimension.withName(dimensionParts[0]);
dimension.withValue(dimensionParts[1]);
dimensions.add(dimension);
} else {
nameBuilder.append(" ");
nameBuilder.append(nameParts[i]);
}
}
name = nameBuilder.toString();
} else {
name = metricName;
}

dimensions.addAll(dimensionedName.getDimensions());
if (shouldAppendDropwizardTypeDimension) {
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue));
}

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withValue(cleanMetricValue(metricValue))
.withMetricName(name)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withUnit(standardUnit));
}
Expand All @@ -379,6 +359,7 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure
final StandardUnit standardUnit,
final List<MetricDatum> metricData) {
if (metricConfigured) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
double scaledSum = convertDuration(LongStream.of(snapshot.getValues()).sum());
final StatisticSet statisticSet = new StatisticSet()
.withSum(scaledSum)
Expand All @@ -388,10 +369,11 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure

final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY));
dimensions.addAll(dimensionedName.getDimensions());

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withMetricName(metricName)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withStatisticValues(statisticSet)
.withUnit(standardUnit));
Expand All @@ -404,6 +386,7 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured,
final StandardUnit standardUnit,
final List<MetricDatum> metricData) {
if (metricConfigured) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
double total = LongStream.of(snapshot.getValues()).sum();
final StatisticSet statisticSet = new StatisticSet()
.withSum(total)
Expand All @@ -413,10 +396,11 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured,

final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY));
dimensions.addAll(dimensionedName.getDimensions());

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withMetricName(metricName)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withStatisticValues(statisticSet)
.withUnit(standardUnit));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.opensearch.flint.core.metrics.reporter;

import com.amazonaws.services.cloudwatch.model.Dimension;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class DimensionedName {
private static final Pattern dimensionPattern = Pattern.compile("([\\w.-]+)\\[([\\w\\W]+)]");
private final String name;
private final Map<String, Dimension> dimensions;

private String encoded;

DimensionedName(final String name, final Map<String, Dimension> dimensions) {
this.name = name;
this.dimensions = Collections.unmodifiableMap(dimensions);
}

public static DimensionedName decode(final String encodedDimensionedName) {
final Matcher matcher = dimensionPattern.matcher(encodedDimensionedName);
if (matcher.find() && matcher.groupCount() == 2) {
final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim());
for (String t : matcher.group(2).split(",")) {
final String[] keyAndValue = t.split(":");
builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim());
}
return builder.build();
} else {
return new DimensionedNameBuilder(encodedDimensionedName).build();
}
}

public static DimensionedNameBuilder withName(String name) {
return new DimensionedNameBuilder(name);
}

public DimensionedNameBuilder withDimension(final String name, final String value) {
return new DimensionedNameBuilder(this.name, new HashMap<>(this.dimensions)).withDimension(name, value);
}

public String getName() {
return name;
}

public Set<Dimension> getDimensions() {
return new HashSet<>(dimensions.values());
}

public synchronized String encode() {
if (this.encoded == null) {
if (!dimensions.isEmpty()) {
final StringBuilder sb = new StringBuilder(this.name);
sb.append('[');
sb.append(this.dimensions.values().stream()
.map(dimension -> dimension.getName() + ":" + dimension.getValue())
.collect(Collectors.joining(",")));
sb.append(']');

this.encoded = sb.toString();
} else {
this.encoded = this.name;
}
}
return this.encoded;
}

@Override
public String toString() {
return this.encode();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DimensionedName that = (DimensionedName) o;
return Objects.equals(name, that.name) &&
Objects.equals(dimensions, that.dimensions);
}

@Override
public int hashCode() {
return Objects.hash(name, dimensions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.flint.core.metrics.reporter;

import com.amazonaws.services.cloudwatch.model.Dimension;
import java.util.HashMap;
import java.util.Map;

public class DimensionedNameBuilder {
private final String name;
private Map<String, Dimension> dimensions;

DimensionedNameBuilder(final String name) {
this(name, new HashMap<>());
}

DimensionedNameBuilder(final String name, final Map<String, Dimension> dimensions) {
this.name = name;
this.dimensions = dimensions;
}

public DimensionedName build() {
return new DimensionedName(this.name, this.dimensions);
}

public DimensionedNameBuilder withDimension(final String name, final String value) {
this.dimensions.put(name, new Dimension().withName(name).withValue(value));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;

import static com.amazonaws.services.cloudwatch.model.StandardUnit.Count;
import static com.amazonaws.services.cloudwatch.model.StandardUnit.Microseconds;
Expand Down Expand Up @@ -117,19 +118,6 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception {
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
}

@Test
public void reportedCounterShouldContainDimensionEmbeddedInName() throws Exception {
final String DIMENSION_NAME = "some_dimension";
final String DIMENSION_VALUE = "some_value";

metricRegistry.counter(ARBITRARY_COUNTER_NAME + " " + DIMENSION_NAME + "=" + DIMENSION_VALUE).inc();
reporterBuilder.withShouldParseDimensionsFromName(true).build().report();

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME).withValue(DIMENSION_VALUE));
}

@Test
public void reportedGaugeShouldContainExpectedDimension() throws Exception {
metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge<Long>) () -> 1L);
Expand Down Expand Up @@ -472,6 +460,21 @@ public void shouldReportHistogramSubsequentSnapshotValues_SumMaxMinValues() thro

}

@Test
public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception {
metricRegistry.counter(DimensionedName.withName(ARBITRARY_COUNTER_NAME)
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
}

private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package opensearch.flint.core.metrics.reporter;

import static org.hamcrest.CoreMatchers.hasItems;

import com.amazonaws.services.cloudwatch.model.Dimension;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;

public class DimensionedNameTest {
@Test
public void canDecodeDimensionedString() {
final String dimensioned = "test[key1:val1,key2:val2,key3:val3]";

final DimensionedName dimensionedName = DimensionedName.decode(dimensioned);

Assertions.assertEquals("test", dimensionedName.getName());
Assertions.assertEquals(3, dimensionedName.getDimensions().size());

MatcherAssert.assertThat(dimensionedName.getDimensions(), hasItems(
new Dimension().withName("key1").withValue("val1"),
new Dimension().withName("key2").withValue("val2"),
new Dimension().withName("key3").withValue("val3")));
}

@Test
public void canEncodeDimensionedNameToString() {
final DimensionedName dimensionedName = DimensionedName.withName("test")
.withDimension("key1", "val1")
.withDimension("key2", "val2")
.withDimension("key3", "val3")
.build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
}

@Test
public void canDeriveDimensionedNameFromCurrent() {
final DimensionedName dimensionedName = DimensionedName.withName("test")
.withDimension("key1", "val1")
.withDimension("key2", "val2")
.withDimension("key3", "val3")
.build();


final DimensionedName derivedDimensionedName = dimensionedName
.withDimension("key3", "new_value")
.withDimension("key4", "val4").build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]",
derivedDimensionedName.encode());
}
}

0 comments on commit 5db289f

Please sign in to comment.