From b9694d259c2e130a310a4c6ac8b33a3de6a74f1a Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 20 Nov 2023 18:11:17 -0800 Subject: [PATCH] Metrics Addition Signed-off-by: Vamsi Manohar --- build.sbt | 9 +- .../spark/metrics/sink/CloudWatchSink.java | 258 ++++++ .../DimensionedCloudWatchReporter.java | 814 ++++++++++++++++++ .../InvalidMetricsPropertyException.java | 15 + .../metrics/sink/CloudWatchSinkTests.java | 79 ++ .../DimensionedCloudWatchReporterTest.java | 539 ++++++++++++ project/plugins.sbt | 1 + 7 files changed, 1714 insertions(+), 1 deletion(-) create mode 100644 flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/InvalidMetricsPropertyException.java create mode 100644 flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java create mode 100644 flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java diff --git a/build.sbt b/build.sbt index 938f19a64..567729b93 100644 --- a/build.sbt +++ b/build.sbt @@ -63,7 +63,14 @@ lazy val flintCore = (project in file("flint-core")) "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", "org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test", - "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", + "org.mockito" % "mockito-core" % "2.23.0" % "test", + "org.mockito" % "mockito-junit-jupiter" % "3.12.4" % "test", + "org.junit.jupiter" % "junit-jupiter-api" % "5.9.0" % "test", + "org.junit.jupiter" % "junit-jupiter-engine" % "5.9.0" % "test", + "com.google.truth" % "truth" % "1.1.5" % "test" + ), + libraryDependencies ++= deps(sparkVersion), publish / skip := true) lazy val pplSparkIntegration = (project in file("ppl-spark-integration")) diff --git a/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java new file mode 100644 index 000000000..7af5bd5b6 --- /dev/null +++ b/flint-core/src/main/java/org/apache/spark/metrics/sink/CloudWatchSink.java @@ -0,0 +1,258 @@ +package org.apache.spark.metrics.sink; +// It is required that this class be built into the Spark Metrics package path because +// the Sink interface and SecurityManager class are each private. + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.AwsRegionProvider; +import com.amazonaws.regions.DefaultAwsRegionProviderChain; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClient; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.spark.SecurityManager; +import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; +import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException; + +/** + * Implementation of the Spark metrics {@link Sink} interface + * for reporting internal Spark metrics into CloudWatch. Spark's metric system uses DropWizard's + * metric library internally, so this class simply wraps the {@link DimensionedCloudWatchReporter} + * with the constructor and methods mandated for Spark metric Sinks. + * + * @see org.apache.spark.metrics.MetricsSystem + * @see ScheduledReporter + * @author kmccaw + */ +public class CloudWatchSink implements Sink { + + private final ScheduledReporter reporter; + + private final long pollingPeriod; + + private final boolean shouldParseInlineDimensions; + + private final boolean shouldAppendDropwizardTypeDimension; + + private final TimeUnit pollingTimeUnit; + + /** + * Constructor with the signature required by Spark, which loads the class through reflection. + * + * @see org.apache.spark.metrics.MetricsSystem + * @param properties Properties for this sink defined in Spark's "metrics.properties" configuration file. + * @param metricRegistry The DropWizard MetricRegistry used by Sparks {@link org.apache.spark.metrics.MetricsSystem} + * @param securityManager Unused argument; required by the Spark sink constructor signature. + */ + public CloudWatchSink( + final Properties properties, + final MetricRegistry metricRegistry, + final SecurityManager securityManager) { + // First extract properties defined in the Spark metrics configuration + + // Extract the required namespace property. This is used as the namespace + // for all metrics reported to CloudWatch + final Optional namespaceProperty = getProperty(properties, PropertyKeys.NAMESPACE); + if (!namespaceProperty.isPresent()) { + final String message = "CloudWatch Spark metrics sink requires '" + + PropertyKeys.NAMESPACE + "' property."; + throw new InvalidMetricsPropertyException(message); + } + + // Extract the optional AWS credentials. If either of the access or secret keys are + // missing in the properties, fall back to using the credentials of the EC2 instance. + final Optional accessKeyProperty = getProperty(properties, PropertyKeys.AWS_ACCESS_KEY_ID); + final Optional secretKeyProperty = getProperty(properties, PropertyKeys.AWS_SECRET_KEY); + final AWSCredentialsProvider awsCredentialsProvider; + if (accessKeyProperty.isPresent() && secretKeyProperty.isPresent()) { + final AWSCredentials awsCredentials = new BasicAWSCredentials( + accessKeyProperty.get(), + secretKeyProperty.get()); + awsCredentialsProvider = new AWSStaticCredentialsProvider(awsCredentials); + } else { + // If the AWS credentials aren't specified in the properties, fall back to using the + // DefaultAWSCredentialsProviderChain, which looks for credentials in the order + // (1) Environment Variables + // (2) Java System Properties + // (3) Credentials file at ~/.aws/credentials + // (4) AWS_CONTAINER_CREDENTIALS_RELATIVE_URI + // (5) EC2 Instance profile credentials + awsCredentialsProvider = DefaultAWSCredentialsProviderChain.getInstance(); + } + + // Extract the AWS region CloudWatch metrics should be reported to. + final Optional regionProperty = getProperty(properties, PropertyKeys.AWS_REGION); + final Regions awsRegion; + if (regionProperty.isPresent()) { + try { + awsRegion = Regions.fromName(regionProperty.get()); + } catch (IllegalArgumentException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + regionProperty.get(), + PropertyKeys.AWS_REGION); + throw new InvalidMetricsPropertyException(message, e); + } + } else { + final AwsRegionProvider regionProvider = new DefaultAwsRegionProviderChain(); + awsRegion = Regions.fromName(regionProvider.getRegion()); + } + + // Extract the polling period, the interval at which metrics are reported. + final Optional pollingPeriodProperty = getProperty(properties, PropertyKeys.POLLING_PERIOD); + if (pollingPeriodProperty.isPresent()) { + try { + final long parsedPollingPeriod = Long.parseLong(pollingPeriodProperty.get()); + // Confirm that the value of this property is a positive number + if (parsedPollingPeriod <= 0) { + final String message = String.format( + "The value (%s) of the \"%s\" CloudWatchSink metrics property is non-positive.", + pollingPeriodProperty.get(), + PropertyKeys.POLLING_PERIOD); + throw new InvalidMetricsPropertyException(message); + } + pollingPeriod = parsedPollingPeriod; + } catch (NumberFormatException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + pollingPeriodProperty.get(), + PropertyKeys.POLLING_PERIOD); + throw new InvalidMetricsPropertyException(message, e); + } + } else { + pollingPeriod = PropertyDefaults.POLLING_PERIOD; + } + + final Optional pollingTimeUnitProperty = getProperty(properties, PropertyKeys.POLLING_TIME_UNIT); + if (pollingTimeUnitProperty.isPresent()) { + try { + pollingTimeUnit = TimeUnit.valueOf(pollingTimeUnitProperty.get().toUpperCase()); + } catch (IllegalArgumentException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + pollingTimeUnitProperty.get(), + PropertyKeys.POLLING_TIME_UNIT); + throw new InvalidMetricsPropertyException(message, e); + } + } else { + pollingTimeUnit = PropertyDefaults.POLLING_PERIOD_TIME_UNIT; + } + + // Extract the inline dimension parsing setting. + final Optional shouldParseInlineDimensionsProperty = getProperty( + properties, + PropertyKeys.SHOULD_PARSE_INLINE_DIMENSIONS); + if (shouldParseInlineDimensionsProperty.isPresent()) { + try { + shouldParseInlineDimensions = Boolean.parseBoolean(shouldParseInlineDimensionsProperty.get()); + } catch (IllegalArgumentException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + shouldParseInlineDimensionsProperty.get(), + PropertyKeys.SHOULD_PARSE_INLINE_DIMENSIONS); + throw new InvalidMetricsPropertyException(message, e); + } + } else { + shouldParseInlineDimensions = PropertyDefaults.SHOULD_PARSE_INLINE_DIMENSIONS; + } + + // Extract the setting to append dropwizard metrics types as a dimension + final Optional shouldAppendDropwizardTypeDimensionProperty = getProperty( + properties, + PropertyKeys.SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION); + if (shouldAppendDropwizardTypeDimensionProperty.isPresent()) { + try { + shouldAppendDropwizardTypeDimension = Boolean.parseBoolean(shouldAppendDropwizardTypeDimensionProperty.get()); + } catch (IllegalArgumentException e) { + final String message = String.format( + "Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.", + shouldAppendDropwizardTypeDimensionProperty.get(), + PropertyKeys.SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION); + throw new InvalidMetricsPropertyException(message, e); + } + } else { + shouldAppendDropwizardTypeDimension = PropertyDefaults.SHOULD_PARSE_INLINE_DIMENSIONS; + } + + final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder() + .withCredentials(awsCredentialsProvider) + .withRegion(awsRegion) + .build(); + + this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get()) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .filter(MetricFilter.ALL) + .withPercentiles( + DimensionedCloudWatchReporter.Percentile.P50, + DimensionedCloudWatchReporter.Percentile.P75, + DimensionedCloudWatchReporter.Percentile.P99) + .withOneMinuteMeanRate() + .withFiveMinuteMeanRate() + .withFifteenMinuteMeanRate() + .withMeanRate() + .withArithmeticMean() + .withStdDev() + .withStatisticSet() + .withGlobalDimensions() + .withShouldParseDimensionsFromName(shouldParseInlineDimensions) + .withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension) + .build(); + } + + @Override + public void start() { + reporter.start(pollingPeriod, pollingTimeUnit); + } + + @Override + public void stop() { + reporter.stop(); + } + + @Override + public void report() { + reporter.report(); + } + + /** + * Returns the value for specified property key as an Optional. + * @param properties + * @param key + * @return + */ + private static Optional getProperty(Properties properties, final String key) { + return Optional.ofNullable(properties.getProperty(key)); + } + + /** + * The keys used in the metrics properties configuration file. + */ + private static class PropertyKeys { + static final String NAMESPACE = "namespace"; + static final String AWS_ACCESS_KEY_ID = "awsAccessKeyId"; + static final String AWS_SECRET_KEY = "awsSecretKey"; + static final String AWS_REGION = "awsRegion"; + static final String POLLING_PERIOD = "pollingPeriod"; + static final String POLLING_TIME_UNIT = "pollingTimeUnit"; + static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions"; + static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension"; + } + + /** + * The default values for optional properties in the metrics properties configuration file. + */ + private static class PropertyDefaults { + static final long POLLING_PERIOD = 1; + static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES; + static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false; + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java new file mode 100644 index 000000000..f6c123851 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporter.java @@ -0,0 +1,814 @@ +package org.opensearch.flint.core.metrics.reporter; + +import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; +import com.amazonaws.services.cloudwatch.model.Dimension; +import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException; +import com.amazonaws.services.cloudwatch.model.MetricDatum; +import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; +import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; +import com.amazonaws.services.cloudwatch.model.StandardUnit; +import com.amazonaws.services.cloudwatch.model.StatisticSet; +import com.amazonaws.util.StringUtils; +import com.codahale.metrics.Clock; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Counting; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metered; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.codahale.metrics.jvm.BufferPoolMetricSet; +import com.codahale.metrics.jvm.ClassLoadingGaugeSet; +import com.codahale.metrics.jvm.FileDescriptorRatioGauge; +import com.codahale.metrics.jvm.GarbageCollectorMetricSet; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import com.codahale.metrics.jvm.ThreadStatesGaugeSet; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reports metrics to Amazon's CloudWatch periodically. + *

+ * Use {@link Builder} to construct instances of this class. The {@link Builder} + * allows to configure what aggregated metrics will be reported as a single {@link MetricDatum} to CloudWatch. + *

+ * There are a bunch of {@code with*} methods that provide a sufficient fine-grained control over what metrics + * should be reported. + * + * Forked from https://github.com/azagniotov/codahale-aggregated-metrics-cloudwatch-reporter. + */ +public class DimensionedCloudWatchReporter extends ScheduledReporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(DimensionedCloudWatchReporter.class); + + // Visible for testing + public static final String DIMENSION_NAME_TYPE = "Type"; + + // Visible for testing + public static final String DIMENSION_GAUGE = "gauge"; + + // Visible for testing + public static final String DIMENSION_COUNT = "count"; + + // Visible for testing + public static final String DIMENSION_SNAPSHOT_SUMMARY = "snapshot-summary"; + + // Visible for testing + public static final String DIMENSION_SNAPSHOT_MEAN = "snapshot-mean"; + + // Visible for testing + public static final String DIMENSION_SNAPSHOT_STD_DEV = "snapshot-std-dev"; + + /** + * Amazon CloudWatch rejects values that are either too small or too large. + * Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10) or 2e-360 to 2e360 (Base 2). + *

+ * In addition, special values (e.g., NaN, +Infinity, -Infinity) are not supported. + */ + private static final double SMALLEST_SENDABLE_VALUE = 8.515920e-109; + private static final double LARGEST_SENDABLE_VALUE = 1.174271e+108; + + /** + * Each CloudWatch API request may contain at maximum 20 datums + */ + private static final int MAXIMUM_DATUMS_PER_REQUEST = 20; + + /** + * We only submit the difference in counters since the last submission. This way we don't have to reset + * the counters within this application. + */ + private final Map lastPolledCounts; + + private final Builder builder; + private final String namespace; + private final AmazonCloudWatchAsync cloudWatchAsyncClient; + private final StandardUnit rateUnit; + private final StandardUnit durationUnit; + private final boolean shouldParseDimensionsFromName; + private final boolean shouldAppendDropwizardTypeDimension; + + private DimensionedCloudWatchReporter(final Builder builder) { + super(builder.metricRegistry, "coda-hale-metrics-cloud-watch-reporter", builder.metricFilter, builder.rateUnit, builder.durationUnit); + this.builder = builder; + this.namespace = builder.namespace; + this.cloudWatchAsyncClient = builder.cloudWatchAsyncClient; + this.lastPolledCounts = new ConcurrentHashMap<>(); + this.rateUnit = builder.cwRateUnit; + this.durationUnit = builder.cwDurationUnit; + this.shouldParseDimensionsFromName = builder.withShouldParseDimensionsFromName; + this.shouldAppendDropwizardTypeDimension = builder.withShouldAppendDropwizardTypeDimension; + } + + @Override + public void report(final SortedMap gauges, + final SortedMap counters, + final SortedMap histograms, + final SortedMap meters, + final SortedMap timers) { + + if (builder.withDryRun) { + LOGGER.warn("** Reporter is running in 'DRY RUN' mode **"); + } + + try { + final List metricData = new ArrayList<>( + gauges.size() + counters.size() + 10 * histograms.size() + 10 * timers.size()); + + for (final Map.Entry gaugeEntry : gauges.entrySet()) { + processGauge(gaugeEntry.getKey(), gaugeEntry.getValue(), metricData); + } + + for (final Map.Entry counterEntry : counters.entrySet()) { + processCounter(counterEntry.getKey(), counterEntry.getValue(), metricData); + } + + for (final Map.Entry histogramEntry : histograms.entrySet()) { + processCounter(histogramEntry.getKey(), histogramEntry.getValue(), metricData); + processHistogram(histogramEntry.getKey(), histogramEntry.getValue(), metricData); + } + + for (final Map.Entry meterEntry : meters.entrySet()) { + processCounter(meterEntry.getKey(), meterEntry.getValue(), metricData); + processMeter(meterEntry.getKey(), meterEntry.getValue(), metricData); + } + + for (final Map.Entry timerEntry : timers.entrySet()) { + processCounter(timerEntry.getKey(), timerEntry.getValue(), metricData); + processMeter(timerEntry.getKey(), timerEntry.getValue(), metricData); + processTimer(timerEntry.getKey(), timerEntry.getValue(), metricData); + } + + final Collection> metricDataPartitions = partition(metricData, MAXIMUM_DATUMS_PER_REQUEST); + final List> cloudWatchFutures = new ArrayList<>(metricData.size()); + + for (final List partition : metricDataPartitions) { + final PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest() + .withNamespace(namespace) + .withMetricData(partition); + + if (builder.withDryRun) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dry run - constructed PutMetricDataRequest: {}", putMetricDataRequest); + } + } else { + cloudWatchFutures.add(cloudWatchAsyncClient.putMetricDataAsync(putMetricDataRequest)); + } + } + + for (final Future cloudWatchFuture : cloudWatchFutures) { + try { + cloudWatchFuture.get(); + } catch (final Exception e) { + LOGGER.error("Error reporting metrics to CloudWatch. The data in this CloudWatch API request " + + "may have been discarded, did not make it to CloudWatch.", e); + } + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sent {} metric datums to CloudWatch. Namespace: {}, metric data {}", metricData.size(), namespace, metricData); + } + + } catch (final RuntimeException e) { + LOGGER.error("Error marshalling CloudWatch metrics.", e); + } + } + + @Override + public void stop() { + try { + super.stop(); + } catch (final Exception e) { + LOGGER.error("Error when stopping the reporter.", e); + } finally { + if (!builder.withDryRun) { + try { + cloudWatchAsyncClient.shutdown(); + } catch (final Exception e) { + LOGGER.error("Error shutting down AmazonCloudWatchAsync", cloudWatchAsyncClient, e); + } + } + } + } + + private void processGauge(final String metricName, final Gauge gauge, final List metricData) { + if (gauge.getValue() instanceof Number) { + final Number number = (Number) gauge.getValue(); + stageMetricDatum(true, metricName, number.doubleValue(), StandardUnit.None, DIMENSION_GAUGE, metricData); + } + } + + private void processCounter(final String metricName, final Counting counter, final List metricData) { + long currentCount = counter.getCount(); + Long lastCount = lastPolledCounts.get(counter); + lastPolledCounts.put(counter, currentCount); + + if (lastCount == null) { + lastCount = 0L; + } + + // Only submit metrics that have changed - let's save some money! + final long delta = currentCount - lastCount; + stageMetricDatum(true, metricName, delta, StandardUnit.Count, DIMENSION_COUNT, metricData); + } + + /** + * The rates of {@link Metered} are reported after being converted using the rate factor, which is deduced from + * the set rate unit + * + * @see Timer#getSnapshot + * @see #getRateUnit + * @see #convertRate(double) + */ + private void processMeter(final String metricName, final Metered meter, final List metricData) { + final String formattedRate = String.format("-rate [per-%s]", getRateUnit()); + stageMetricDatum(builder.withOneMinuteMeanRate, metricName, convertRate(meter.getOneMinuteRate()), rateUnit, "1-min-mean" + formattedRate, metricData); + stageMetricDatum(builder.withFiveMinuteMeanRate, metricName, convertRate(meter.getFiveMinuteRate()), rateUnit, "5-min-mean" + formattedRate, metricData); + stageMetricDatum(builder.withFifteenMinuteMeanRate, metricName, convertRate(meter.getFifteenMinuteRate()), rateUnit, "15-min-mean" + formattedRate, metricData); + stageMetricDatum(builder.withMeanRate, metricName, convertRate(meter.getMeanRate()), rateUnit, "mean" + formattedRate, metricData); + } + + /** + * The {@link Snapshot} values of {@link Timer} are reported as {@link StatisticSet} after conversion. The + * conversion is done using the duration factor, which is deduced from the set duration unit. + *

+ * Please note, the reported values submitted only if they show some data (greater than zero) in order to: + *

+ * 1. save some money + * 2. prevent com.amazonaws.services.cloudwatch.model.InvalidParameterValueException if empty {@link Snapshot} + * is submitted + *

+ * If {@link Builder#withZeroValuesSubmission()} is {@code true}, then all values will be submitted + * + * @see Timer#getSnapshot + * @see #getDurationUnit + * @see #convertDuration(double) + */ + private void processTimer(final String metricName, final Timer timer, final List metricData) { + final Snapshot snapshot = timer.getSnapshot(); + + if (builder.withZeroValuesSubmission || snapshot.size() > 0) { + for (final Percentile percentile : builder.percentiles) { + final double convertedDuration = convertDuration(snapshot.getValue(percentile.getQuantile())); + stageMetricDatum(true, metricName, convertedDuration, durationUnit, percentile.getDesc(), metricData); + } + } + + // prevent empty snapshot from causing InvalidParameterValueException + if (snapshot.size() > 0) { + final String formattedDuration = String.format(" [in-%s]", getDurationUnit()); + stageMetricDatum(builder.withArithmeticMean, metricName, convertDuration(snapshot.getMean()), durationUnit, DIMENSION_SNAPSHOT_MEAN + formattedDuration, metricData); + stageMetricDatum(builder.withStdDev, metricName, convertDuration(snapshot.getStdDev()), durationUnit, DIMENSION_SNAPSHOT_STD_DEV + formattedDuration, metricData); + stageMetricDatumWithConvertedSnapshot(builder.withStatisticSet, metricName, snapshot, durationUnit, metricData); + } + } + + /** + * The {@link Snapshot} values of {@link Histogram} are reported as {@link StatisticSet} raw. In other words, the + * conversion using the duration factor does NOT apply. + *

+ * Please note, the reported values submitted only if they show some data (greater than zero) in order to: + *

+ * 1. save some money + * 2. prevent com.amazonaws.services.cloudwatch.model.InvalidParameterValueException if empty {@link Snapshot} + * is submitted + *

+ * If {@link Builder#withZeroValuesSubmission()} is {@code true}, then all values will be submitted + * + * @see Histogram#getSnapshot + */ + private void processHistogram(final String metricName, final Histogram histogram, final List metricData) { + final Snapshot snapshot = histogram.getSnapshot(); + + if (builder.withZeroValuesSubmission || snapshot.size() > 0) { + for (final Percentile percentile : builder.percentiles) { + final double value = snapshot.getValue(percentile.getQuantile()); + stageMetricDatum(true, metricName, value, StandardUnit.None, percentile.getDesc(), metricData); + } + } + + // prevent empty snapshot from causing InvalidParameterValueException + if (snapshot.size() > 0) { + stageMetricDatum(builder.withArithmeticMean, metricName, snapshot.getMean(), StandardUnit.None, DIMENSION_SNAPSHOT_MEAN, metricData); + stageMetricDatum(builder.withStdDev, metricName, snapshot.getStdDev(), StandardUnit.None, DIMENSION_SNAPSHOT_STD_DEV, metricData); + stageMetricDatumWithRawSnapshot(builder.withStatisticSet, metricName, snapshot, StandardUnit.None, metricData); + } + } + + /** + * Please note, the reported values submitted only if they show some data (greater than zero) in order to: + *

+ * 1. save some money + * 2. prevent com.amazonaws.services.cloudwatch.model.InvalidParameterValueException if empty {@link Snapshot} + * is submitted + *

+ * If {@link Builder#withZeroValuesSubmission()} is {@code true}, then all values will be submitted + */ + private void stageMetricDatum(final boolean metricConfigured, + final String metricName, + final double metricValue, + final StandardUnit standardUnit, + final String dimensionValue, + final List metricData) { + // Only submit metrics that show some data, so let's save some money + if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) { + final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); + final String name; + if (shouldParseDimensionsFromName) { + final String[] nameParts = metricName.split(" "); + final StringBuilder nameBuilder = new StringBuilder(nameParts[0]); + int i = 1; + for (; i < nameParts.length; ++i) { + final String[] dimensionParts = nameParts[i].split("="); + if (dimensionParts.length == 2 + && !StringUtils.isNullOrEmpty(dimensionParts[0]) + && !StringUtils.isNullOrEmpty(dimensionParts[1])) { + final Dimension dimension = new Dimension(); + dimension.withName(dimensionParts[0]); + dimension.withValue(dimensionParts[1]); + dimensions.add(dimension); + } else { + nameBuilder.append(" "); + nameBuilder.append(nameParts[i]); + } + } + name = nameBuilder.toString(); + } else { + name = metricName; + } + + if (shouldAppendDropwizardTypeDimension) { + dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue)); + } + + metricData.add(new MetricDatum() + .withTimestamp(new Date(builder.clock.getTime())) + .withValue(cleanMetricValue(metricValue)) + .withMetricName(name) + .withDimensions(dimensions) + .withUnit(standardUnit)); + } + } + + private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured, + final String metricName, + final Snapshot snapshot, + final StandardUnit standardUnit, + final List metricData) { + if (metricConfigured) { + double scaledSum = convertDuration(LongStream.of(snapshot.getValues()).sum()); + final StatisticSet statisticSet = new StatisticSet() + .withSum(scaledSum) + .withSampleCount((double) snapshot.size()) + .withMinimum(convertDuration(snapshot.getMin())) + .withMaximum(convertDuration(snapshot.getMax())); + + final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); + dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); + + metricData.add(new MetricDatum() + .withTimestamp(new Date(builder.clock.getTime())) + .withMetricName(metricName) + .withDimensions(dimensions) + .withStatisticValues(statisticSet) + .withUnit(standardUnit)); + } + } + + private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured, + final String metricName, + final Snapshot snapshot, + final StandardUnit standardUnit, + final List metricData) { + if (metricConfigured) { + double total = LongStream.of(snapshot.getValues()).sum(); + final StatisticSet statisticSet = new StatisticSet() + .withSum(total) + .withSampleCount((double) snapshot.size()) + .withMinimum((double) snapshot.getMin()) + .withMaximum((double) snapshot.getMax()); + + final Set dimensions = new LinkedHashSet<>(builder.globalDimensions); + dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); + + metricData.add(new MetricDatum() + .withTimestamp(new Date(builder.clock.getTime())) + .withMetricName(metricName) + .withDimensions(dimensions) + .withStatisticValues(statisticSet) + .withUnit(standardUnit)); + } + } + + private double cleanMetricValue(final double metricValue) { + double absoluteValue = Math.abs(metricValue); + if (absoluteValue < SMALLEST_SENDABLE_VALUE) { + // Allow 0 through untouched, everything else gets rounded to SMALLEST_SENDABLE_VALUE + if (absoluteValue > 0) { + if (metricValue < 0) { + return -SMALLEST_SENDABLE_VALUE; + } else { + return SMALLEST_SENDABLE_VALUE; + } + } + } else if (absoluteValue > LARGEST_SENDABLE_VALUE) { + if (metricValue < 0) { + return -LARGEST_SENDABLE_VALUE; + } else { + return LARGEST_SENDABLE_VALUE; + } + } + return metricValue; + } + + private static Collection> partition(final Collection wholeCollection, final int partitionSize) { + final int[] itemCounter = new int[]{0}; + + return wholeCollection.stream() + .collect(Collectors.groupingBy(item -> itemCounter[0]++ / partitionSize)) + .values(); + } + + /** + * Creates a new {@link Builder} that sends values from the given {@link MetricRegistry} to the given namespace + * using the given CloudWatch client. + * + * @param metricRegistry {@link MetricRegistry} instance + * @param client {@link AmazonCloudWatchAsync} instance + * @param namespace the namespace. Must be non-null and not empty. + * @return {@link Builder} instance + */ + public static Builder forRegistry( + final MetricRegistry metricRegistry, + final AmazonCloudWatchAsync client, + final String namespace) { + return new Builder(metricRegistry, client, namespace); + } + + public enum Percentile { + P50(0.50, "50%"), + P75(0.75, "75%"), + P95(0.95, "95%"), + P98(0.98, "98%"), + P99(0.99, "99%"), + P995(0.995, "99.5%"), + P999(0.999, "99.9%"); + + private final double quantile; + private final String desc; + + Percentile(final double quantile, final String desc) { + this.quantile = quantile; + this.desc = desc; + } + + public double getQuantile() { + return quantile; + } + + public String getDesc() { + return desc; + } + } + + public static class Builder { + + private final String namespace; + private final AmazonCloudWatchAsync cloudWatchAsyncClient; + private final MetricRegistry metricRegistry; + + private Percentile[] percentiles; + private boolean withOneMinuteMeanRate; + private boolean withFiveMinuteMeanRate; + private boolean withFifteenMinuteMeanRate; + private boolean withMeanRate; + private boolean withArithmeticMean; + private boolean withStdDev; + private boolean withDryRun; + private boolean withZeroValuesSubmission; + private boolean withStatisticSet; + private boolean withJvmMetrics; + private boolean withShouldParseDimensionsFromName; + private boolean withShouldAppendDropwizardTypeDimension=true; + private MetricFilter metricFilter; + private TimeUnit rateUnit; + private TimeUnit durationUnit; + private StandardUnit cwRateUnit; + private StandardUnit cwDurationUnit; + private Set globalDimensions; + private final Clock clock; + + private Builder( + final MetricRegistry metricRegistry, + final AmazonCloudWatchAsync cloudWatchAsyncClient, + final String namespace) { + this.metricRegistry = metricRegistry; + this.cloudWatchAsyncClient = cloudWatchAsyncClient; + this.namespace = namespace; + this.percentiles = new Percentile[]{Percentile.P75, Percentile.P95, Percentile.P999}; + this.metricFilter = MetricFilter.ALL; + this.rateUnit = TimeUnit.SECONDS; + this.durationUnit = TimeUnit.MILLISECONDS; + this.globalDimensions = new LinkedHashSet<>(); + this.cwRateUnit = toStandardUnit(rateUnit); + this.cwDurationUnit = toStandardUnit(durationUnit); + this.clock = Clock.defaultClock(); + } + + /** + * Convert rates to the given time unit. + * + * @param rateUnit a unit of time + * @return {@code this} + */ + public Builder convertRatesTo(final TimeUnit rateUnit) { + this.rateUnit = rateUnit; + return this; + } + + /** + * Convert durations to the given time unit. + * + * @param durationUnit a unit of time + * @return {@code this} + */ + public Builder convertDurationsTo(final TimeUnit durationUnit) { + this.durationUnit = durationUnit; + return this; + } + + /** + * Only report metrics which match the given filter. + * + * @param metricFilter a {@link MetricFilter} + * @return {@code this} + */ + public Builder filter(final MetricFilter metricFilter) { + this.metricFilter = metricFilter; + return this; + } + + /** + * If the one minute rate should be sent for {@link Meter} and {@link Timer}. {@code false} by default. + *

+ * The rate values are converted before reporting based on the rate unit set + * + * @return {@code this} + * @see ScheduledReporter#convertRate(double) + * @see Meter#getOneMinuteRate() + * @see Timer#getOneMinuteRate() + */ + public Builder withOneMinuteMeanRate() { + withOneMinuteMeanRate = true; + return this; + } + + /** + * If the five minute rate should be sent for {@link Meter} and {@link Timer}. {@code false} by default. + *

+ * The rate values are converted before reporting based on the rate unit set + * + * @return {@code this} + * @see ScheduledReporter#convertRate(double) + * @see Meter#getFiveMinuteRate() + * @see Timer#getFiveMinuteRate() + */ + public Builder withFiveMinuteMeanRate() { + withFiveMinuteMeanRate = true; + return this; + } + + /** + * If the fifteen minute rate should be sent for {@link Meter} and {@link Timer}. {@code false} by default. + *

+ * The rate values are converted before reporting based on the rate unit set + * + * @return {@code this} + * @see ScheduledReporter#convertRate(double) + * @see Meter#getFifteenMinuteRate() + * @see Timer#getFifteenMinuteRate() + */ + public Builder withFifteenMinuteMeanRate() { + withFifteenMinuteMeanRate = true; + return this; + } + + /** + * If the mean rate should be sent for {@link Meter} and {@link Timer}. {@code false} by default. + *

+ * The rate values are converted before reporting based on the rate unit set + * + * @return {@code this} + * @see ScheduledReporter#convertRate(double) + * @see Meter#getMeanRate() + * @see Timer#getMeanRate() + */ + public Builder withMeanRate() { + withMeanRate = true; + return this; + } + + /** + * If the arithmetic mean of {@link Snapshot} values in {@link Histogram} and {@link Timer} should be sent. + * {@code false} by default. + *

+ * The {@link Timer#getSnapshot()} values are converted before reporting based on the duration unit set + * The {@link Histogram#getSnapshot()} values are reported as is + * + * @return {@code this} + * @see ScheduledReporter#convertDuration(double) + * @see Snapshot#getMean() + */ + public Builder withArithmeticMean() { + withArithmeticMean = true; + return this; + } + + /** + * If the standard deviation of {@link Snapshot} values in {@link Histogram} and {@link Timer} should be sent. + * {@code false} by default. + *

+ * The {@link Timer#getSnapshot()} values are converted before reporting based on the duration unit set + * The {@link Histogram#getSnapshot()} values are reported as is + * + * @return {@code this} + * @see ScheduledReporter#convertDuration(double) + * @see Snapshot#getStdDev() + */ + public Builder withStdDev() { + withStdDev = true; + return this; + } + + /** + * If lifetime {@link Snapshot} summary of {@link Histogram} and {@link Timer} should be translated + * to {@link StatisticSet} in the most direct way possible and reported. {@code false} by default. + *

+ * The {@link Snapshot} duration values are converted before reporting based on the duration unit set + * + * @return {@code this} + * @see ScheduledReporter#convertDuration(double) + */ + public Builder withStatisticSet() { + withStatisticSet = true; + return this; + } + + /** + * If JVM statistic should be reported. Supported metrics include: + *

+ * - Run count and elapsed times for all supported garbage collectors + * - Memory usage for all memory pools, including off-heap memory + * - Breakdown of thread states, including deadlocks + * - File descriptor usage + * - Buffer pool sizes and utilization (Java 7 only) + *

+ * {@code false} by default. + * + * @return {@code this} + */ + public Builder withJvmMetrics() { + withJvmMetrics = true; + return this; + } + + /** + * If CloudWatch dimensions should be parsed off the the metric name: + * + * {@code false} by default. + * + * @return {@code this} + */ + public Builder withShouldParseDimensionsFromName(final boolean value) { + withShouldParseDimensionsFromName = value; + return this; + } + + /** + * If the Dropwizard metric type should be reported as a CloudWatch dimension. + * + * {@code false} by default. + * + * @return {@code this} + */ + public Builder withShouldAppendDropwizardTypeDimension(final boolean value) { + withShouldAppendDropwizardTypeDimension = value; + return this; + } + + /** + * Does not actually POST to CloudWatch, logs the {@link PutMetricDataRequest putMetricDataRequest} instead. + * {@code false} by default. + * + * @return {@code this} + */ + public Builder withDryRun() { + withDryRun = true; + return this; + } + + /** + * POSTs to CloudWatch all values. Otherwise, the reporter does not POST values which are zero in order to save + * costs. Also, some users have been experiencing {@link InvalidParameterValueException} when submitting zero + * values. Please refer to: + * https://github.com/azagniotov/codahale-aggregated-metrics-cloudwatch-reporter/issues/4 + *

+ * {@code false} by default. + * + * @return {@code this} + */ + public Builder withZeroValuesSubmission() { + withZeroValuesSubmission = true; + return this; + } + + /** + * The {@link Histogram} and {@link Timer} percentiles to send. If 0.5 is included, it'll be + * reported as median.This defaults to 0.75, 0.95 and 0.999. + *

+ * The {@link Timer#getSnapshot()} percentile values are converted before reporting based on the duration unit + * The {@link Histogram#getSnapshot()} percentile values are reported as is + * + * @param percentiles the percentiles to send. Replaces the default percentiles. + * @return {@code this} + */ + public Builder withPercentiles(final Percentile... percentiles) { + if (percentiles.length > 0) { + this.percentiles = percentiles; + } + return this; + } + + /** + * Global {@link Set} of {@link Dimension} to send with each {@link MetricDatum}. A dimension is a name/value + * pair that helps you to uniquely identify a metric. Every metric has specific characteristics that describe + * it, and you can think of dimensions as categories for those characteristics. + *

+ * Whenever you add a unique name/value pair to one of your metrics, you are creating a new metric. + * Defaults to {@code empty} {@link Set}. + * + * @param dimensions arguments in a form of {@code name=value}. The number of arguments is variable and may be + * zero. The maximum number of arguments is limited by the maximum dimension of a Java array + * as defined by the Java Virtual Machine Specification. Each {@code name=value} string + * will be converted to an instance of {@link Dimension} + * @return {@code this} + */ + public Builder withGlobalDimensions(final String... dimensions) { + for (final String pair : dimensions) { + final List splitted = Stream.of(pair.split("=")).map(String::trim).collect(Collectors.toList()); + this.globalDimensions.add(new Dimension().withName(splitted.get(0)).withValue(splitted.get(1))); + } + return this; + } + + public DimensionedCloudWatchReporter build() { + + if (withJvmMetrics) { + metricRegistry.register("jvm.uptime", (Gauge) () -> ManagementFactory.getRuntimeMXBean().getUptime()); + metricRegistry.register("jvm.current_time", (Gauge) clock::getTime); + metricRegistry.register("jvm.classes", new ClassLoadingGaugeSet()); + metricRegistry.register("jvm.fd_usage", new FileDescriptorRatioGauge()); + metricRegistry.register("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer())); + metricRegistry.register("jvm.gc", new GarbageCollectorMetricSet()); + metricRegistry.register("jvm.memory", new MemoryUsageGaugeSet()); + metricRegistry.register("jvm.thread-states", new ThreadStatesGaugeSet()); + } + + cwRateUnit = toStandardUnit(rateUnit); + cwDurationUnit = toStandardUnit(durationUnit); + + return new DimensionedCloudWatchReporter(this); + } + + private StandardUnit toStandardUnit(final TimeUnit timeUnit) { + switch (timeUnit) { + case SECONDS: + return StandardUnit.Seconds; + case MILLISECONDS: + return StandardUnit.Milliseconds; + case MICROSECONDS: + return StandardUnit.Microseconds; + default: + throw new IllegalArgumentException("Unsupported TimeUnit: " + timeUnit); + } + } + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/InvalidMetricsPropertyException.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/InvalidMetricsPropertyException.java new file mode 100644 index 000000000..2ea27eefa --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/reporter/InvalidMetricsPropertyException.java @@ -0,0 +1,15 @@ +package org.opensearch.flint.core.metrics.reporter; + + +import java.io.Serializable; + +public class InvalidMetricsPropertyException extends RuntimeException implements Serializable { + + public InvalidMetricsPropertyException(final String message) { + super(message); + } + + public InvalidMetricsPropertyException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java b/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java new file mode 100644 index 000000000..143561779 --- /dev/null +++ b/flint-core/src/test/java/apache/spark/metrics/sink/CloudWatchSinkTests.java @@ -0,0 +1,79 @@ +package apache.spark.metrics.sink; + +import org.apache.spark.SecurityManager; +import com.codahale.metrics.MetricRegistry; +import org.apache.spark.metrics.sink.CloudWatchSink; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.function.Executable; +import org.mockito.Mockito; + +import java.util.Properties; +import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException; + +class CloudWatchSinkTests { + private final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class); + private final SecurityManager securityManager = Mockito.mock(SecurityManager.class); + + @Test + void should_throwException_when_namespacePropertyIsNotSet() { + final Properties properties = getDefaultValidProperties(); + properties.remove("namespace"); + final Executable executable = () -> { + final CloudWatchSink + cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + } + + @Test + void should_throwException_when_awsPropertyIsInvalid() { + final Properties properties = getDefaultValidProperties(); + properties.setProperty("awsRegion", "someInvalidRegion"); + final Executable executable = () -> { + final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + } + + @Test + void should_throwException_when_pollingPeriodPropertyIsNotANumber() { + final Properties properties = getDefaultValidProperties(); + properties.setProperty("pollingPeriod", "notANumber"); + final Executable executable = () -> { + final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + } + + @Test + void should_throwException_when_pollingPeriodPropertyIsNegative() { + final Properties properties = getDefaultValidProperties(); + properties.setProperty("pollingPeriod", "-5"); + final Executable executable = () -> { + final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + } + + @Test + void should_throwException_when_pollingTimeUnitPropertyIsInvalid() { + final Properties properties = getDefaultValidProperties(); + properties.setProperty("pollingTimeUnit", "notATimeUnitValue"); + final Executable executable = () -> { + final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager); + }; + Assertions.assertThrows(InvalidMetricsPropertyException.class, executable); + } + + private Properties getDefaultValidProperties() { + final Properties properties = new Properties(); + properties.setProperty("namespace", "namespaceValue"); + properties.setProperty("awsAccessKeyId", "awsAccessKeyIdValue"); + properties.setProperty("awsSecretKey", "awsSecretKeyValue"); + properties.setProperty("awsRegion", "us-east-1"); + properties.setProperty("pollingPeriod", "1"); + properties.setProperty("pollingTimeUnit", "MINUTES"); + return properties; + } +} diff --git a/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java new file mode 100644 index 000000000..2743472e5 --- /dev/null +++ b/flint-core/src/test/java/opensearch/flint/core/metrics/reporter/DimensionedCloudWatchReporterTest.java @@ -0,0 +1,539 @@ +package opensearch.flint.core.metrics.reporter; + +import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClient; +import com.amazonaws.services.cloudwatch.model.Dimension; +import com.amazonaws.services.cloudwatch.model.MetricDatum; +import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; +import com.amazonaws.services.cloudwatch.model.PutMetricDataResult; +import com.codahale.metrics.EWMA; +import com.codahale.metrics.ExponentialMovingAverages; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter; + +import static com.amazonaws.services.cloudwatch.model.StandardUnit.Count; +import static com.amazonaws.services.cloudwatch.model.StandardUnit.Microseconds; +import static com.amazonaws.services.cloudwatch.model.StandardUnit.Milliseconds; +import static com.amazonaws.services.cloudwatch.model.StandardUnit.None; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_COUNT; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_GAUGE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_NAME_TYPE; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_MEAN; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_STD_DEV; +import static org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter.DIMENSION_SNAPSHOT_SUMMARY; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class DimensionedCloudWatchReporterTest { + + private static final String NAMESPACE = "namespace"; + private static final String ARBITRARY_COUNTER_NAME = "TheCounter"; + private static final String ARBITRARY_METER_NAME = "TheMeter"; + private static final String ARBITRARY_HISTOGRAM_NAME = "TheHistogram"; + private static final String ARBITRARY_TIMER_NAME = "TheTimer"; + private static final String ARBITRARY_GAUGE_NAME = "TheGauge"; + + @Mock + private AmazonCloudWatchAsyncClient mockAmazonCloudWatchAsyncClient; + + @Mock + private Future mockPutMetricDataResultFuture; + + @Captor + private ArgumentCaptor metricDataRequestCaptor; + + private MetricRegistry metricRegistry; + private DimensionedCloudWatchReporter.Builder reporterBuilder; + + @BeforeAll + public static void beforeClass() throws Exception { + reduceExponentialMovingAveragesDefaultTickInterval(); + } + + @BeforeEach + public void setUp() throws Exception { + metricRegistry = new MetricRegistry(); + reporterBuilder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, mockAmazonCloudWatchAsyncClient, NAMESPACE); + when(mockAmazonCloudWatchAsyncClient.putMetricDataAsync(metricDataRequestCaptor.capture())).thenReturn(mockPutMetricDataResultFuture); + } + + @Test + public void shouldNotInvokeCloudWatchClientInDryRunMode() { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.withDryRun().build().report(); + + verify(mockAmazonCloudWatchAsyncClient, never()).putMetricDataAsync(any(PutMetricDataRequest.class)); + } + + @Test + public void shouldReportWithoutGlobalDimensionsWhenGlobalDimensionsNotConfigured() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.build().report(); // When 'withGlobalDimensions' was not called + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).hasSize(1); + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + } + + @Test + public void reportedCounterShouldContainExpectedDimension() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_COUNT)); + } + + @Test + public void reportedCounterShouldContainDimensionEmbeddedInName() throws Exception { + final String DIMENSION_NAME = "some_dimension"; + final String DIMENSION_VALUE = "some_value"; + + metricRegistry.counter(ARBITRARY_COUNTER_NAME + " " + DIMENSION_NAME + "=" + DIMENSION_VALUE).inc(); + reporterBuilder.withShouldParseDimensionsFromName(true).build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME).withValue(DIMENSION_VALUE)); + } + + @Test + public void reportedGaugeShouldContainExpectedDimension() throws Exception { + metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge) () -> 1L); + reporterBuilder.build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_GAUGE)); + } + + @Test + public void shouldNotReportGaugeWhenMetricValueNotOfTypeNumber() throws Exception { + metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge) () -> "bad value type"); + reporterBuilder.build().report(); + + verify(mockAmazonCloudWatchAsyncClient, never()).putMetricDataAsync(any(PutMetricDataRequest.class)); + } + + @Test + public void neverReportMetersCountersGaugesWithZeroValues() throws Exception { + metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge) () -> 0L); + metricRegistry.meter(ARBITRARY_METER_NAME).mark(0); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(0); + + buildReportWithSleep(reporterBuilder + .withArithmeticMean() + .withOneMinuteMeanRate() + .withFiveMinuteMeanRate() + .withFifteenMinuteMeanRate() + .withMeanRate()); + + verify(mockAmazonCloudWatchAsyncClient, never()).putMetricDataAsync(any(PutMetricDataRequest.class)); + } + + @Test + public void reportMetersCountersGaugesWithZeroValuesOnlyWhenConfigured() throws Exception { + metricRegistry.register(ARBITRARY_GAUGE_NAME, (Gauge) () -> 0L); + metricRegistry.meter(ARBITRARY_METER_NAME).mark(0); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(0); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(-1L, TimeUnit.NANOSECONDS); + + buildReportWithSleep(reporterBuilder + .withArithmeticMean() + .withOneMinuteMeanRate() + .withFiveMinuteMeanRate() + .withFifteenMinuteMeanRate() + .withZeroValuesSubmission() + .withMeanRate()); + + verify(mockAmazonCloudWatchAsyncClient, times(1)).putMetricDataAsync(metricDataRequestCaptor.capture()); + + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final List metricData = putMetricDataRequest.getMetricData(); + for (final MetricDatum metricDatum : metricData) { + assertThat(metricDatum.getValue()).isEqualTo(0.0); + } + } + + @Test + public void reportedMeterShouldContainExpectedOneMinuteMeanRateDimension() throws Exception { + metricRegistry.meter(ARBITRARY_METER_NAME).mark(1); + buildReportWithSleep(reporterBuilder.withOneMinuteMeanRate()); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("1-min-mean-rate [per-second]")); + } + + @Test + public void reportedMeterShouldContainExpectedFiveMinuteMeanRateDimension() throws Exception { + metricRegistry.meter(ARBITRARY_METER_NAME).mark(1); + buildReportWithSleep(reporterBuilder.withFiveMinuteMeanRate()); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("5-min-mean-rate [per-second]")); + } + + @Test + public void reportedMeterShouldContainExpectedFifteenMinuteMeanRateDimension() throws Exception { + metricRegistry.meter(ARBITRARY_METER_NAME).mark(1); + buildReportWithSleep(reporterBuilder.withFifteenMinuteMeanRate()); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("15-min-mean-rate [per-second]")); + } + + @Test + public void reportedMeterShouldContainExpectedMeanRateDimension() throws Exception { + metricRegistry.meter(ARBITRARY_METER_NAME).mark(1); + reporterBuilder.withMeanRate().build().report(); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("mean-rate [per-second]")); + } + + @Test + public void reportedHistogramShouldContainExpectedArithmeticMeanDimension() throws Exception { + metricRegistry.histogram(ARBITRARY_HISTOGRAM_NAME).update(1); + reporterBuilder.withArithmeticMean().build().report(); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_MEAN)); + } + + @Test + public void reportedHistogramShouldContainExpectedStdDevDimension() throws Exception { + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(1); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(2); + reporterBuilder.withStdDev().build().report(); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_STD_DEV)); + } + + @Test + public void reportedTimerShouldContainExpectedArithmeticMeanDimension() throws Exception { + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(3, TimeUnit.MILLISECONDS); + reporterBuilder.withArithmeticMean().build().report(); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("snapshot-mean [in-milliseconds]")); + } + + @Test + public void reportedTimerShouldContainExpectedStdDevDimension() throws Exception { + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(1, TimeUnit.MILLISECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(3, TimeUnit.MILLISECONDS); + reporterBuilder.withStdDev().withShouldAppendDropwizardTypeDimension(false).build().report(); + + final List dimensions = allDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue("snapshot-std-dev [in-milliseconds]")); + } + + @Test + public void shouldReportExpectedSingleGlobalDimension() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2").build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); + } + + @Test + public void shouldReportExpectedMultipleGlobalDimensions() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2", "Instance=stage").build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).contains(new Dimension().withName("Region").withValue("us-west-2")); + assertThat(dimensions).contains(new Dimension().withName("Instance").withValue("stage")); + } + + @Test + public void shouldNotReportDuplicateGlobalDimensions() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.withGlobalDimensions("Region=us-west-2", "Region=us-west-2").build().report(); + + final List dimensions = firstMetricDatumDimensionsFromCapturedRequest(); + + assertThat(dimensions).containsNoDuplicates(); + } + + @Test + public void shouldReportExpectedCounterValue() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + reporterBuilder.build().report(); + + final MetricDatum metricDatum = firstMetricDatumFromCapturedRequest(); + + assertThat(metricDatum.getValue()).isWithin(1.0); + assertThat(metricDatum.getUnit()).isEqualTo(Count.toString()); + } + + @Test + public void shouldNotReportUnchangedCounterValue() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + final DimensionedCloudWatchReporter dimensionedCloudWatchReporter = reporterBuilder.build(); + + dimensionedCloudWatchReporter.report(); + MetricDatum metricDatum = firstMetricDatumFromCapturedRequest(); + assertThat(metricDatum.getValue().intValue()).isEqualTo(1); + metricDataRequestCaptor.getAllValues().clear(); + + dimensionedCloudWatchReporter.report(); + + verify(mockAmazonCloudWatchAsyncClient, times(1)).putMetricDataAsync(any(PutMetricDataRequest.class)); + } + + @Test + public void shouldReportCounterValueDelta() throws Exception { + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + final DimensionedCloudWatchReporter dimensionedCloudWatchReporter = reporterBuilder.build(); + + dimensionedCloudWatchReporter.report(); + MetricDatum metricDatum = firstMetricDatumFromCapturedRequest(); + assertThat(metricDatum.getValue().intValue()).isEqualTo(2); + metricDataRequestCaptor.getAllValues().clear(); + + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + metricRegistry.counter(ARBITRARY_COUNTER_NAME).inc(); + + dimensionedCloudWatchReporter.report(); + metricDatum = firstMetricDatumFromCapturedRequest(); + assertThat(metricDatum.getValue().intValue()).isEqualTo(6); + + verify(mockAmazonCloudWatchAsyncClient, times(2)).putMetricDataAsync(any(PutMetricDataRequest.class)); + } + + @Test + public void shouldReportArithmeticMeanAfterConversionByDefaultDurationWhenReportingTimer() throws Exception { + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(1_000_000, TimeUnit.NANOSECONDS); + reporterBuilder.withArithmeticMean().build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest("snapshot-mean [in-milliseconds]"); + + assertThat(metricData.getValue().intValue()).isEqualTo(1); + assertThat(metricData.getUnit()).isEqualTo(Milliseconds.toString()); + } + + @Test + public void shouldReportStdDevAfterConversionByDefaultDurationWhenReportingTimer() throws Exception { + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(1_000_000, TimeUnit.NANOSECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(2_000_000, TimeUnit.NANOSECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(3_000_000, TimeUnit.NANOSECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(30_000_000, TimeUnit.NANOSECONDS); + reporterBuilder.withStdDev().build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest("snapshot-std-dev [in-milliseconds]"); + + assertThat(metricData.getValue().intValue()).isEqualTo(12); + assertThat(metricData.getUnit()).isEqualTo(Milliseconds.toString()); + } + + @Test + public void shouldReportSnapshotValuesAfterConversionByCustomDurationWhenReportingTimer() throws Exception { + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(1, TimeUnit.SECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(2, TimeUnit.SECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(3, TimeUnit.SECONDS); + metricRegistry.timer(ARBITRARY_TIMER_NAME).update(30, TimeUnit.SECONDS); + reporterBuilder.withStatisticSet().convertDurationsTo(TimeUnit.MICROSECONDS).build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY); + + assertThat(metricData.getStatisticValues().getSum().intValue()).isEqualTo(36_000_000); + assertThat(metricData.getStatisticValues().getMaximum().intValue()).isEqualTo(30_000_000); + assertThat(metricData.getStatisticValues().getMinimum().intValue()).isEqualTo(1_000_000); + assertThat(metricData.getStatisticValues().getSampleCount().intValue()).isEqualTo(4); + assertThat(metricData.getUnit()).isEqualTo(Microseconds.toString()); + } + + @Test + public void shouldReportArithmeticMeanWithoutConversionWhenReportingHistogram() throws Exception { + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(1); + reporterBuilder.withArithmeticMean().build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_MEAN); + + assertThat(metricData.getValue().intValue()).isEqualTo(1); + assertThat(metricData.getUnit()).isEqualTo(None.toString()); + } + + @Test + public void shouldReportStdDevWithoutConversionWhenReportingHistogram() throws Exception { + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(1); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(2); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(3); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(30); + reporterBuilder.withStdDev().build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_STD_DEV); + + assertThat(metricData.getValue().intValue()).isEqualTo(12); + assertThat(metricData.getUnit()).isEqualTo(None.toString()); + } + + @Test + public void shouldReportSnapshotValuesWithoutConversionWhenReportingHistogram() throws Exception { + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(1); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(2); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(3); + metricRegistry.histogram(DimensionedCloudWatchReporterTest.ARBITRARY_HISTOGRAM_NAME).update(30); + reporterBuilder.withStatisticSet().build().report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY); + + assertThat(metricData.getStatisticValues().getSum().intValue()).isEqualTo(36); + assertThat(metricData.getStatisticValues().getMaximum().intValue()).isEqualTo(30); + assertThat(metricData.getStatisticValues().getMinimum().intValue()).isEqualTo(1); + assertThat(metricData.getStatisticValues().getSampleCount().intValue()).isEqualTo(4); + assertThat(metricData.getUnit()).isEqualTo(None.toString()); + } + + @Test + public void shouldReportHistogramSubsequentSnapshotValues_SumMaxMinValues() throws Exception { + DimensionedCloudWatchReporter reporter = reporterBuilder.withStatisticSet().build(); + + final Histogram slidingWindowHistogram = new Histogram(new SlidingWindowReservoir(4)); + metricRegistry.register("SlidingWindowHistogram", slidingWindowHistogram); + + slidingWindowHistogram.update(1); + slidingWindowHistogram.update(2); + slidingWindowHistogram.update(30); + reporter.report(); + + final MetricDatum metricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY); + + assertThat(metricData.getStatisticValues().getMaximum().intValue()).isEqualTo(30); + assertThat(metricData.getStatisticValues().getMinimum().intValue()).isEqualTo(1); + assertThat(metricData.getStatisticValues().getSampleCount().intValue()).isEqualTo(3); + assertThat(metricData.getStatisticValues().getSum().intValue()).isEqualTo(33); + assertThat(metricData.getUnit()).isEqualTo(None.toString()); + + slidingWindowHistogram.update(4); + slidingWindowHistogram.update(100); + slidingWindowHistogram.update(5); + slidingWindowHistogram.update(6); + reporter.report(); + + final MetricDatum secondMetricData = metricDatumByDimensionFromCapturedRequest(DIMENSION_SNAPSHOT_SUMMARY); + + assertThat(secondMetricData.getStatisticValues().getMaximum().intValue()).isEqualTo(100); + assertThat(secondMetricData.getStatisticValues().getMinimum().intValue()).isEqualTo(4); + assertThat(secondMetricData.getStatisticValues().getSampleCount().intValue()).isEqualTo(4); + assertThat(secondMetricData.getStatisticValues().getSum().intValue()).isEqualTo(115); + assertThat(secondMetricData.getUnit()).isEqualTo(None.toString()); + + } + + private MetricDatum metricDatumByDimensionFromCapturedRequest(final String dimensionValue) { + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final List metricData = putMetricDataRequest.getMetricData(); + + final Optional metricDatumOptional = + metricData + .stream() + .filter(metricDatum -> metricDatum.getDimensions() + .contains(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(dimensionValue))) + .findFirst(); + + if (metricDatumOptional.isPresent()) { + return metricDatumOptional.get(); + } + + throw new IllegalStateException("Could not find MetricDatum for Dimension value: " + dimensionValue); + } + + private MetricDatum firstMetricDatumFromCapturedRequest() { + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + return putMetricDataRequest.getMetricData().get(0); + } + + private List firstMetricDatumDimensionsFromCapturedRequest() { + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final MetricDatum metricDatum = putMetricDataRequest.getMetricData().get(0); + return metricDatum.getDimensions(); + } + + private List allDimensionsFromCapturedRequest() { + final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue(); + final List metricData = putMetricDataRequest.getMetricData(); + final List all = new LinkedList<>(); + for (final MetricDatum metricDatum : metricData) { + all.addAll(metricDatum.getDimensions()); + } + return all; + } + + private void buildReportWithSleep(final DimensionedCloudWatchReporter.Builder dimensionedCloudWatchReporterBuilder) throws InterruptedException { + final DimensionedCloudWatchReporter cloudWatchReporter = dimensionedCloudWatchReporterBuilder.build(); + Thread.sleep(10); + cloudWatchReporter.report(); + } + + /** + * This is a very ugly way to fool the {@link EWMA} by reducing the default tick interval + * in {@link ExponentialMovingAverages} from {@code 5} seconds to {@code 1} millisecond in order to ensure that + * exponentially-weighted moving average rates are populated. This helps to verify that all + * the expected {@link Dimension}s are present in {@link MetricDatum}. + * + * @throws NoSuchFieldException + * @throws IllegalAccessException + * @see ExponentialMovingAverages#tickIfNecessary() + * @see MetricDatum#getDimensions() + */ + private static void reduceExponentialMovingAveragesDefaultTickInterval() throws NoSuchFieldException, IllegalAccessException { + setFinalStaticField(ExponentialMovingAverages.class, "TICK_INTERVAL", TimeUnit.MILLISECONDS.toNanos(1)); + } + + private static void setFinalStaticField(final Class clazz, final String fieldName, long value) throws NoSuchFieldException, IllegalAccessException { + final Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + final Field modifiers = field.getClass().getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, value); + } + +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 0fe5dd1ab..4a5ef02ce 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,3 +8,4 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0") addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1") \ No newline at end of file