Skip to content

Commit

Permalink
Address comments from Peng and Chen
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Feb 7, 2024
1 parent 921d7b2 commit 5a4cc5d
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.codahale.metrics.ScheduledReporter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -44,6 +45,7 @@
* @author kmccaw
*/
public class CloudWatchSink implements Sink {
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledReporter reporter;

Expand Down Expand Up @@ -208,7 +210,6 @@ public CloudWatchSink(
DimensionNameGroups dimensionNameGroups = null;
if (dimensionGroupsProperty.isPresent()) {
try {
ObjectMapper objectMapper = new ObjectMapper();
dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class);
} catch (IOException e) {
final String message = String.format(
Expand All @@ -224,7 +225,7 @@ public CloudWatchSink(
.withRegion(awsRegion)
.build();

this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(metricFilter)
Expand All @@ -241,9 +242,13 @@ public CloudWatchSink(
.withStatisticSet()
.withGlobalDimensions()
.withShouldParseDimensionsFromName(shouldParseInlineDimensions)
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension)
.withDimensionGroups(dimensionNameGroups)
.build();
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension);

if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) {
builder = builder.withDimensionNameGroups(dimensionNameGroups);
}

this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build();
}

@Override
Expand Down Expand Up @@ -303,7 +308,7 @@ private static class PropertyDefaults {
*/
public static class DimensionNameGroups {
// Holds the grouping of dimension names categorized under different keys.
private Map<String, List<List<String>>> dimensionGroups;
private Map<String, List<List<String>>> dimensionGroups = new HashMap<>();

/**
* Sets the mapping of dimension groups. Each key in the map represents a category or a type
Expand All @@ -314,6 +319,12 @@ public static class DimensionNameGroups {
* to a list of dimension name groups.
*/
public void setDimensionGroups(Map<String, List<List<String>>> dimensionGroups) {
if (dimensionGroups == null) {
final String message = String.format(
"Undefined value for the \"%s\" CloudWatchSink metrics property.",
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message);
}
this.dimensionGroups = dimensionGroups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,19 @@ public class DimensionUtils {
* @param parts Additional information that might be required by specific dimension builders.
* @return A CloudWatch Dimension object.
*/
public static Dimension constructDimension(String dimensionName, String[] parts) {
public static Dimension constructDimension(String dimensionName, String[] metricNameParts) {
if (!doesNameConsistsOfMetricNameSpace(metricNameParts)) {
throw new IllegalArgumentException("The provided metric name parts do not consist of a valid metric namespace.");
}
return dimensionBuilders.getOrDefault(dimensionName, ignored -> getDefaultDimension(dimensionName))
.apply(parts);
.apply(metricNameParts);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
public static boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private MetricInfo getMetricInfo(DimensionedName dimensionedName, Set<Dimension>
String[] parts = metricName.split("\\.");

List<Set<Dimension>> dimensionSets = new ArrayList<>();
if (doesNameConsistsOfMetricNameSpace(parts)) {
if (DimensionUtils.doesNameConsistsOfMetricNameSpace(parts)) {
metricName = constructMetricName(parts);
// Get dimension sets corresponding to a specific metric source
constructDimensionSets(dimensionSets, parts);
Expand Down Expand Up @@ -415,17 +415,14 @@ private MetricInfo getMetricInfo(DimensionedName dimensionedName, Set<Dimension>
*/
private void constructDimensionSets(List<Set<Dimension>> dimensionSets, String[] parts) {
String metricSourceName = parts[2];
if (builder.dimensionNameGroups == null || !builder.dimensionNameGroups.getDimensionGroups().containsKey(metricSourceName)) {
if (builder.dimensionNameGroups == null || builder.dimensionNameGroups.getDimensionGroups() == null || !builder.dimensionNameGroups.getDimensionGroups().containsKey(metricSourceName)) {
return;
}

for (List<String> dimensionNames: builder.dimensionNameGroups.getDimensionGroups().get(metricSourceName)) {
Set<Dimension> dimensions = new LinkedHashSet<>();
for (String dimensionName: dimensionNames) {
if (!constructedDimensions.containsKey(dimensionName)) {
Dimension dimension = constructDimension(dimensionName, parts);
constructedDimensions.put(dimensionName, dimension);
}
constructedDimensions.putIfAbsent(dimensionName, constructDimension(dimensionName, parts));
dimensions.add(constructedDimensions.get(dimensionName));
}
dimensionSets.add(dimensions);
Expand All @@ -446,14 +443,6 @@ private String constructMetricName(String[] metricNameParts) {
return Stream.of(metricNameParts).skip(partsToSkip).collect(Collectors.joining("."));
}


// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
private boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured,
final String metricName,
final Snapshot snapshot,
Expand Down Expand Up @@ -824,7 +813,7 @@ public Builder withShouldAppendDropwizardTypeDimension(final boolean value) {
return this;
}

public Builder withDimensionGroups(final DimensionNameGroups dimensionNameGroups) {
public Builder withDimensionNameGroups(final DimensionNameGroups dimensionNameGroups) {
this.dimensionNameGroups = dimensionNameGroups;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ void should_throwException_when_DimensionGroupsPropertyIsInvalid() {
final Executable executable = () -> {
final CloudWatchSink cloudWatchSink = new CloudWatchSink(properties, metricRegistry, securityManager);
};
Assertions.assertThrows(InvalidMetricsPropertyException.class, executable);
InvalidMetricsPropertyException exception = Assertions.assertThrows(InvalidMetricsPropertyException.class, executable);
StringBuilder expectedMessageBuilder = new StringBuilder();
expectedMessageBuilder.append("Unable to parse value (")
.append(jsonString)
.append(") for the \"dimensionGroups\" CloudWatchSink metrics property.");
Assertions.assertEquals(expectedMessageBuilder.toString(), exception.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,65 @@

package org.opensearch.flint.core.metrics.reporter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.amazonaws.services.cloudwatch.model.Dimension;
import org.junit.jupiter.api.function.Executable;

import java.lang.reflect.Field;
import java.util.Map;

public class DimensionUtilsTest {
private static final String[] parts = {"someMetric", "123", "dummySource"};

@Test
void testConstructDimensionThrowsIllegalArgumentException() {
String dimensionName = "InvalidDimension";
String[] metricNameParts = {};

final Executable executable = () -> {
DimensionUtils.constructDimension(dimensionName, metricNameParts);
};
IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class, executable);
Assertions.assertEquals("The provided metric name parts do not consist of a valid metric namespace.", exception.getMessage());
}
@Test
public void testGetInstanceRoleDimensionWithExecutor() {
String[] parts = {"someMetric", "123"};
Dimension result = DimensionUtils.constructDimension("instanceRole", parts);
assertEquals("instanceRole", result.getName());
assertEquals("executor", result.getValue());
}

@Test
public void testGetInstanceRoleDimensionWithRoleName() {
String[] parts = {"someMetric", "driver"};
String[] parts = {"someMetric", "driver", "dummySource"};
Dimension result = DimensionUtils.constructDimension("instanceRole", parts);
assertEquals("instanceRole", result.getName());
assertEquals("driver", result.getValue());
}

@Test
public void testGetDefaultDimensionWithUnknown() {
String[] parts = {"someMetric", "123"};
Dimension result = DimensionUtils.constructDimension("nonExistentDimension", parts);
assertEquals("nonExistentDimension", result.getName());
assertEquals("UNKNOWN", result.getValue());
}

@Test
public void testGetDefaultDimensionFromSystemEnv() throws NoSuchFieldException, IllegalAccessException {
Class<?> classOfMap = System.getenv().getClass();
Field field = classOfMap.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writeableEnvironmentVariables = (Map<String, String>)field.get(System.getenv());
writeableEnvironmentVariables.put("TEST_VAR", "12345");
Dimension result = DimensionUtils.constructDimension("TEST_VAR", new String[]{});
assertEquals("TEST_VAR", result.getName());
assertEquals("12345", result.getValue());
}

@Test
public void testConstructJobIdDimension() throws NoSuchFieldException, IllegalAccessException {
public void testGetDimensionsFromSystemEnv() throws NoSuchFieldException, IllegalAccessException {
Class<?> classOfMap = System.getenv().getClass();
Field field = classOfMap.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writeableEnvironmentVariables = (Map<String, String>)field.get(System.getenv());
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "12345");
Dimension result = DimensionUtils.constructDimension("jobId", new String[]{});
assertEquals("jobId", result.getName());
assertEquals("12345", result.getValue());
writeableEnvironmentVariables.put("TEST_VAR", "dummy1");
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2");
Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts);
assertEquals("TEST_VAR", result1.getName());
assertEquals("dummy1", result1.getValue());
Dimension result2 = DimensionUtils.constructDimension("jobId", parts);
assertEquals("jobId", result2.getName());
assertEquals("dummy2", result2.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public void shouldConsumeMultipleMetricDatumWithDimensionGroups() throws Excepti

CloudWatchSink.DimensionNameGroups dimensionNameGroups = new CloudWatchSink.DimensionNameGroups();
dimensionNameGroups.setDimensionGroups(dimensionGroups);
reporterBuilder.withDimensionGroups(dimensionNameGroups).build().report();
reporterBuilder.withDimensionNameGroups(dimensionNameGroups).build().report();

final PutMetricDataRequest putMetricDataRequest = metricDataRequestCaptor.getValue();
final List<MetricDatum> metricDatums = putMetricDataRequest.getMetricData();
Expand Down

0 comments on commit 5a4cc5d

Please sign in to comment.