Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dimension sets in config #238

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +31,8 @@
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;
import com.fasterxml.jackson.databind.ObjectMapper;


/**
* Implementation of the Spark metrics {@link Sink} interface
Expand All @@ -38,6 +45,7 @@
* @author kmccaw
*/
public class CloudWatchSink implements Sink {
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledReporter reporter;

Expand Down Expand Up @@ -198,12 +206,26 @@ public CloudWatchSink(
metricFilter = MetricFilter.ALL;
}

final Optional<String> dimensionGroupsProperty = getProperty(properties, PropertyKeys.DIMENSION_GROUPS);
DimensionNameGroups dimensionNameGroups = null;
if (dimensionGroupsProperty.isPresent()) {
try {
dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class);
} catch (IOException e) {
final String message = String.format(
"Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.",
dimensionGroupsProperty.get(),
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message, e);
}
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();

this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(metricFilter)
Expand All @@ -220,8 +242,13 @@ public CloudWatchSink(
.withStatisticSet()
.withGlobalDimensions()
.withShouldParseDimensionsFromName(shouldParseInlineDimensions)
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension)
.build();
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension);

if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) {
builder = builder.withDimensionNameGroups(dimensionNameGroups);
}

this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build();
}

@Override
Expand Down Expand Up @@ -262,6 +289,7 @@ private static class PropertyKeys {
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
static final String DIMENSION_GROUPS = "dimensionGroups";
}

/**
Expand All @@ -272,4 +300,45 @@ private static class PropertyDefaults {
static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES;
static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false;
}

/**
* Represents a container for grouping dimension names used in metrics reporting.
* This class allows for the organization and storage of dimension names into logical groups,
* facilitating the dynamic construction and retrieval of dimension information for metrics.
*/
public static class DimensionNameGroups {
// Holds the grouping of dimension names categorized under different keys.
private Map<String, List<List<String>>> dimensionGroups = new HashMap<>();

/**
* Sets the mapping of dimension groups. Each key in the map represents a category or a type
* of dimension, and the value is a list of dimension name groups, where each group is itself
* a list of dimension names that are logically grouped together.
*
* @param dimensionGroups A map of dimension groups categorized by keys, where each key maps
* to a list of dimension name groups.
*/
public void setDimensionGroups(Map<String, List<List<String>>> dimensionGroups) {
if (dimensionGroups == null) {
final String message = String.format(
"Undefined value for the \"%s\" CloudWatchSink metrics property.",
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message);
}
this.dimensionGroups = dimensionGroups;
}

/**
* Retrieves the current mapping of dimension groups. The structure of the returned map is such
* that each key represents a specific category or type of dimension, and the corresponding value
* is a list of dimension name groups. Each group is a list of dimension names that are logically
* grouped together.
*
* @return A map representing the groups of dimension names categorized by keys. Each key maps
* to a list of lists, where each inner list is a group of related dimension names.
*/
public Map<String, List<List<String>>> getDimensionGroups() {
return dimensionGroups;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.reporter;

import java.util.Map;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.cloudwatch.model.Dimension;

/**
* Utility class for creating and managing CloudWatch dimensions for metrics reporting in Flint.
* It facilitates the construction of dimensions based on different system properties and environment
* variables, supporting the dynamic tagging of metrics with relevant information like job ID,
* application ID, and more.
*/
public class DimensionUtils {
private static final String DIMENSION_JOB_ID = "jobId";
private static final String DIMENSION_APPLICATION_ID = "applicationId";
private static final String DIMENSION_APPLICATION_NAME = "applicationName";
private static final String DIMENSION_DOMAIN_ID = "domainId";
private static final String DIMENSION_INSTANCE_ROLE = "instanceRole";
private static final String UNKNOWN = "UNKNOWN";

// Maps dimension names to functions that generate Dimension objects based on specific logic or environment variables
private static final Map<String, Function<String[], Dimension>> dimensionBuilders = Map.of(
DIMENSION_INSTANCE_ROLE, DimensionUtils::getInstanceRoleDimension,
DIMENSION_JOB_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_JOB_ID", DIMENSION_JOB_ID),
DIMENSION_APPLICATION_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", DIMENSION_APPLICATION_ID),
DIMENSION_APPLICATION_NAME, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_APPLICATION_NAME", DIMENSION_APPLICATION_NAME),
DIMENSION_DOMAIN_ID, ignored -> getEnvironmentVariableDimension("FLINT_CLUSTER_NAME", DIMENSION_DOMAIN_ID)
);

/**
* Constructs a CloudWatch Dimension object based on the provided dimension name. If a specific
* builder exists for the dimension name, it is used; otherwise, a default dimension is constructed.
*
* @param dimensionName The name of the dimension to construct.
* @param parts Additional information that might be required by specific dimension builders.
* @return A CloudWatch Dimension object.
*/
public static Dimension constructDimension(String dimensionName, String[] metricNameParts) {
if (!doesNameConsistsOfMetricNameSpace(metricNameParts)) {
throw new IllegalArgumentException("The provided metric name parts do not consist of a valid metric namespace.");
}
return dimensionBuilders.getOrDefault(dimensionName, ignored -> getDefaultDimension(dimensionName))
.apply(metricNameParts);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
public static boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

/**
* Generates a Dimension object representing the instance role (either executor or driver) based on the
* metric name parts provided.
*
* @param parts An array where the second element indicates the role by being numeric (executor) or not (driver).
* @return A Dimension object with the instance role.
*/
private static Dimension getInstanceRoleDimension(String[] parts) {
String value = StringUtils.isNumeric(parts[1]) ? "executor" : parts[1];
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(value);
}

/**
* Constructs a Dimension object using a system environment variable. If the environment variable is not found,
* it uses a predefined "UNKNOWN" value.
*
* @param envVarName The name of the environment variable to use for the dimension's value.
* @param dimensionName The name of the dimension.
* @return A Dimension object populated with the appropriate name and value.
*/
private static Dimension getEnvironmentVariableDimension(String envVarName, String dimensionName) {
String value = System.getenv().getOrDefault(envVarName, UNKNOWN);
return new Dimension().withName(dimensionName).withValue(value);
}

/**
* Provides a generic mechanism to construct a Dimension object with an environment variable value
* or a default value if the environment variable is not set.
*
* @param dimensionName The name of the dimension for which to retrieve the value.
* @return A Dimension object populated with the dimension name and its corresponding value.
*/
private static Dimension getDefaultDimension(String dimensionName) {
return getEnvironmentVariableDimension(dimensionName, dimensionName);
}
}
Loading
Loading