Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dimensions to cloudwatch metric and metric filter option for CWSink. #176

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("org.apache.logging.log4j", "log4j-api"),
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this can be provided or not?

meanwhile I'm just thinking any impact on open source user who use Flint as library. Does all classes added have to be in Flint core module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Provided is not working...cloudwatch is not available.
  • I still didn't get the idea behind flint-core,[I only see OS Client] should I one more package as flint-common-utils?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There won't be any impact to opensource user. We emit metrics but nothing will be pushed unless one configures a sink.

Copy link
Collaborator

@dai-chen dai-chen Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Provided is not working...cloudwatch is not available.
  • I still didn't get the idea behind flint-core,[I only see OS Client] should I one more package as flint-common-utils?

For now we only have FlintOpenSearchClient. We may add FlintFileSystemClient later (for storing Flint index data on S3). I was thinking for user who use Flint library in their own application.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for them we are actually providing benefit by giving out metrics similar to driver and executor metrics. CWSink they can leverage if they wish, otherwise they can ignore. We can discuss offline on this and will post the summary over here.

exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.spark.SecurityManager;
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;

/**
Expand Down Expand Up @@ -185,6 +187,17 @@ public CloudWatchSink(
shouldAppendDropwizardTypeDimension = PropertyDefaults.SHOULD_PARSE_INLINE_DIMENSIONS;
}

final Optional<String> metricFilterRegex = getProperty(
properties,
PropertyKeys.METRIC_FILTER_REGEX);
MetricFilter metricFilter;
if (metricFilterRegex.isPresent()) {
Pattern pattern = Pattern.compile(metricFilterRegex.get());
metricFilter = (name, metric) -> pattern.matcher(DimensionedName.decode(name).getName()).find();
} else {
metricFilter = MetricFilter.ALL;
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
Expand All @@ -193,7 +206,7 @@ public CloudWatchSink(
this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(MetricFilter.ALL)
.filter(metricFilter)
.withPercentiles(
DimensionedCloudWatchReporter.Percentile.P50,
DimensionedCloudWatchReporter.Percentile.P75,
Expand Down Expand Up @@ -248,6 +261,7 @@ private static class PropertyKeys {
static final String POLLING_TIME_UNIT = "pollingTimeUnit";
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class DimensionedCloudWatchReporter extends ScheduledReporter {
private final StandardUnit durationUnit;
private final boolean shouldParseDimensionsFromName;
private final boolean shouldAppendDropwizardTypeDimension;
private MetricFilter filter;

private DimensionedCloudWatchReporter(final Builder builder) {
super(builder.metricRegistry, "coda-hale-metrics-cloud-watch-reporter", builder.metricFilter, builder.rateUnit, builder.durationUnit);
Expand All @@ -121,6 +122,7 @@ private DimensionedCloudWatchReporter(final Builder builder) {
this.durationUnit = builder.cwDurationUnit;
this.shouldParseDimensionsFromName = builder.withShouldParseDimensionsFromName;
this.shouldAppendDropwizardTypeDimension = builder.withShouldAppendDropwizardTypeDimension;
this.filter = MetricFilter.ALL;
}

@Override
Expand Down Expand Up @@ -335,39 +337,17 @@ private void stageMetricDatum(final boolean metricConfigured,
final List<MetricDatum> metricData) {
// Only submit metrics that show some data, so let's save some money
if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
final String name;
if (shouldParseDimensionsFromName) {
final String[] nameParts = metricName.split(" ");
final StringBuilder nameBuilder = new StringBuilder(nameParts[0]);
int i = 1;
for (; i < nameParts.length; ++i) {
final String[] dimensionParts = nameParts[i].split("=");
if (dimensionParts.length == 2
&& !StringUtils.isNullOrEmpty(dimensionParts[0])
&& !StringUtils.isNullOrEmpty(dimensionParts[1])) {
final Dimension dimension = new Dimension();
dimension.withName(dimensionParts[0]);
dimension.withValue(dimensionParts[1]);
dimensions.add(dimension);
} else {
nameBuilder.append(" ");
nameBuilder.append(nameParts[i]);
}
}
name = nameBuilder.toString();
} else {
name = metricName;
}

dimensions.addAll(dimensionedName.getDimensions());
if (shouldAppendDropwizardTypeDimension) {
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue));
}

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withValue(cleanMetricValue(metricValue))
.withMetricName(name)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withUnit(standardUnit));
}
Expand All @@ -379,6 +359,7 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure
final StandardUnit standardUnit,
final List<MetricDatum> metricData) {
if (metricConfigured) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
double scaledSum = convertDuration(LongStream.of(snapshot.getValues()).sum());
final StatisticSet statisticSet = new StatisticSet()
.withSum(scaledSum)
Expand All @@ -388,10 +369,11 @@ private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigure

final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY));
dimensions.addAll(dimensionedName.getDimensions());

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withMetricName(metricName)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withStatisticValues(statisticSet)
.withUnit(standardUnit));
Expand All @@ -404,6 +386,7 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured,
final StandardUnit standardUnit,
final List<MetricDatum> metricData) {
if (metricConfigured) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
double total = LongStream.of(snapshot.getValues()).sum();
final StatisticSet statisticSet = new StatisticSet()
.withSum(total)
Expand All @@ -413,10 +396,11 @@ private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured,

final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions);
dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY));
dimensions.addAll(dimensionedName.getDimensions());

metricData.add(new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withMetricName(metricName)
.withMetricName(dimensionedName.getName())
.withDimensions(dimensions)
.withStatisticValues(statisticSet)
.withUnit(standardUnit));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.opensearch.flint.core.metrics.reporter;

import com.amazonaws.services.cloudwatch.model.Dimension;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class DimensionedName {
private static final Pattern dimensionPattern = Pattern.compile("([\\w.-]+)\\[([\\w\\W]+)]");
private final String name;
private final Map<String, Dimension> dimensions;

private String encoded;

DimensionedName(final String name, final Map<String, Dimension> dimensions) {
this.name = name;
this.dimensions = Collections.unmodifiableMap(dimensions);
}

public static DimensionedName decode(final String encodedDimensionedName) {
final Matcher matcher = dimensionPattern.matcher(encodedDimensionedName);
if (matcher.find() && matcher.groupCount() == 2) {
final DimensionedNameBuilder builder = new DimensionedNameBuilder(matcher.group(1).trim());
for (String t : matcher.group(2).split(",")) {
final String[] keyAndValue = t.split(":");
builder.withDimension(keyAndValue[0].trim(), keyAndValue[1].trim());
}
return builder.build();
} else {
return new DimensionedNameBuilder(encodedDimensionedName).build();
}
}

public static DimensionedNameBuilder withName(String name) {
return new DimensionedNameBuilder(name);
}

public DimensionedNameBuilder withDimension(final String name, final String value) {
return new DimensionedNameBuilder(this.name, new HashMap<>(this.dimensions)).withDimension(name, value);
}

public String getName() {
return name;
}

public Set<Dimension> getDimensions() {
return new HashSet<>(dimensions.values());
}

public synchronized String encode() {
if (this.encoded == null) {
if (!dimensions.isEmpty()) {
final StringBuilder sb = new StringBuilder(this.name);
sb.append('[');
sb.append(this.dimensions.values().stream()
.map(dimension -> dimension.getName() + ":" + dimension.getValue())
.collect(Collectors.joining(",")));
sb.append(']');

this.encoded = sb.toString();
} else {
this.encoded = this.name;
}
}
return this.encoded;
}

@Override
public String toString() {
return this.encode();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DimensionedName that = (DimensionedName) o;
return Objects.equals(name, that.name) &&
Objects.equals(dimensions, that.dimensions);
}

@Override
public int hashCode() {
return Objects.hash(name, dimensions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.flint.core.metrics.reporter;

import com.amazonaws.services.cloudwatch.model.Dimension;
import java.util.HashMap;
import java.util.Map;

public class DimensionedNameBuilder {
private final String name;
private Map<String, Dimension> dimensions;

DimensionedNameBuilder(final String name) {
this(name, new HashMap<>());
}

DimensionedNameBuilder(final String name, final Map<String, Dimension> dimensions) {
this.name = name;
this.dimensions = dimensions;
}

public DimensionedName build() {
return new DimensionedName(this.name, this.dimensions);
}

public DimensionedNameBuilder withDimension(final String name, final String value) {
this.dimensions.put(name, new Dimension().withName(name).withValue(value));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;

import static com.amazonaws.services.cloudwatch.model.StandardUnit.Count;
import static com.amazonaws.services.cloudwatch.model.StandardUnit.Microseconds;
Expand Down Expand Up @@ -117,19 +118,6 @@ public void reportedCounterShouldContainExpectedDimension() throws Exception {
assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT));
}

@Test
public void reportedCounterShouldContainDimensionEmbeddedInName() throws Exception {
final String DIMENSION_NAME = "some_dimension";
final String DIMENSION_VALUE = "some_value";

metricRegistry.counter(ARBITRARY_COUNTER_NAME + " " + DIMENSION_NAME + "=" + DIMENSION_VALUE).inc();
reporterBuilder.withShouldParseDimensionsFromName(true).build().report();

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME).withValue(DIMENSION_VALUE));
}

@Test
public void reportedGaugeShouldContainExpectedDimension() throws Exception {
metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge<Long>) () -> 1L);
Expand Down Expand Up @@ -472,6 +460,21 @@ public void shouldReportHistogramSubsequentSnapshotValues_SumMaxMinValues() thro

}

@Test
public void shouldReportExpectedGlobalAndCustomDimensions() throws Exception {
metricRegistry.counter(DimensionedName.withName(ARBITRARY_COUNTER_NAME)
.withDimension("key1", "value1")
.withDimension("key2", "value2")
.build().encode()).inc();
reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report();

final List<Dimension> dimensions = firstMetricDatumDimensionsFromCapturedRequest();

assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2"));
assertThat(dimensions).contains(new Dimension().withName("key1").withValue("value1"));
assertThat(dimensions).contains(new Dimension().withName("key2").withValue("value2"));
}

private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) {
final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricData = putMetricDataRequest.getMetricData();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package opensearch.flint.core.metrics.reporter;

import static org.hamcrest.CoreMatchers.hasItems;

import com.amazonaws.services.cloudwatch.model.Dimension;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;

public class DimensionedNameTest {
@Test
public void canDecodeDimensionedString() {
final String dimensioned = "test[key1:val1,key2:val2,key3:val3]";

final DimensionedName dimensionedName = DimensionedName.decode(dimensioned);

Assertions.assertEquals("test", dimensionedName.getName());
Assertions.assertEquals(3, dimensionedName.getDimensions().size());

MatcherAssert.assertThat(dimensionedName.getDimensions(), hasItems(
new Dimension().withName("key1").withValue("val1"),
new Dimension().withName("key2").withValue("val2"),
new Dimension().withName("key3").withValue("val3")));
}

@Test
public void canEncodeDimensionedNameToString() {
final DimensionedName dimensionedName = DimensionedName.withName("test")
.withDimension("key1", "val1")
.withDimension("key2", "val2")
.withDimension("key3", "val3")
.build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
}

@Test
public void canDeriveDimensionedNameFromCurrent() {
final DimensionedName dimensionedName = DimensionedName.withName("test")
.withDimension("key1", "val1")
.withDimension("key2", "val2")
.withDimension("key3", "val3")
.build();


final DimensionedName derivedDimensionedName = dimensionedName
.withDimension("key3", "new_value")
.withDimension("key4", "val4").build();

Assertions.assertEquals("test[key1:val1,key2:val2,key3:val3]", dimensionedName.encode());
Assertions.assertEquals("test[key1:val1,key2:val2,key3:new_value,key4:val4]",
derivedDimensionedName.encode());
}
}
Loading