Skip to content

Commit

Permalink
Merge branch 'main' into implement-bloom-filter-query-rewrite-no-push…
Browse files Browse the repository at this point in the history
…down

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Feb 8, 2024
2 parents 6f2aceb + f446de0 commit cf3ff2a
Show file tree
Hide file tree
Showing 27 changed files with 969 additions and 231 deletions.
24 changes: 22 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ Please see the following example in which Index Building Logic and Query Rewrite
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100 |
| BloomFilter | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;client_ip BLOOM_FILTER<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;BLOOM_FILTER_AGG(client_ip) AS client_ip,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE client_ip = '127.0.0.1'<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true<br>)<br>WHERE client_ip = '127.0.0.1' |

### Flint Index Refresh

- **Auto Refresh:**
- This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences.
- **Manual Refresh:**
- Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs.
- **Full Refresh:**
- Initiates a comprehensive update of the Flint Index, fetching all available data and ensuring the most up-to-date information is displayed.
- **Incremental Refresh:**
- Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage.

The refresh mode is influenced by the index options specified during index creation, particularly the `auto_refresh` and `incremental_refresh` options. These options collectively define the behavior of the refresh mode when creating an index as below. Find more details in [Create Index Options](#create-index-options).

| Refresh Mode | auto_refresh | incremental_refresh |
|---------------------|--------------|---------------------|
| Auto Refresh | true | |
| Full Refresh | false | false |
| Incremental Refresh | false | true |

### Flint Index Specification

#### Metadata
Expand Down Expand Up @@ -263,9 +282,10 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +31,8 @@
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;
import com.fasterxml.jackson.databind.ObjectMapper;


/**
* Implementation of the Spark metrics {@link Sink} interface
Expand All @@ -38,6 +45,7 @@
* @author kmccaw
*/
public class CloudWatchSink implements Sink {
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledReporter reporter;

Expand Down Expand Up @@ -198,12 +206,26 @@ public CloudWatchSink(
metricFilter = MetricFilter.ALL;
}

final Optional<String> dimensionGroupsProperty = getProperty(properties, PropertyKeys.DIMENSION_GROUPS);
DimensionNameGroups dimensionNameGroups = null;
if (dimensionGroupsProperty.isPresent()) {
try {
dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class);
} catch (IOException e) {
final String message = String.format(
"Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.",
dimensionGroupsProperty.get(),
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message, e);
}
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();

this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(metricFilter)
Expand All @@ -220,8 +242,13 @@ public CloudWatchSink(
.withStatisticSet()
.withGlobalDimensions()
.withShouldParseDimensionsFromName(shouldParseInlineDimensions)
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension)
.build();
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension);

if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) {
builder = builder.withDimensionNameGroups(dimensionNameGroups);
}

this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build();
}

@Override
Expand Down Expand Up @@ -262,6 +289,7 @@ private static class PropertyKeys {
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
static final String DIMENSION_GROUPS = "dimensionGroups";
}

/**
Expand All @@ -272,4 +300,45 @@ private static class PropertyDefaults {
static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES;
static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false;
}

/**
* Represents a container for grouping dimension names used in metrics reporting.
* This class allows for the organization and storage of dimension names into logical groups,
* facilitating the dynamic construction and retrieval of dimension information for metrics.
*/
public static class DimensionNameGroups {
// Holds the grouping of dimension names categorized under different keys.
private Map<String, List<List<String>>> dimensionGroups = new HashMap<>();

/**
* Sets the mapping of dimension groups. Each key in the map represents a category or a type
* of dimension, and the value is a list of dimension name groups, where each group is itself
* a list of dimension names that are logically grouped together.
*
* @param dimensionGroups A map of dimension groups categorized by keys, where each key maps
* to a list of dimension name groups.
*/
public void setDimensionGroups(Map<String, List<List<String>>> dimensionGroups) {
if (dimensionGroups == null) {
final String message = String.format(
"Undefined value for the \"%s\" CloudWatchSink metrics property.",
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message);
}
this.dimensionGroups = dimensionGroups;
}

/**
* Retrieves the current mapping of dimension groups. The structure of the returned map is such
* that each key represents a specific category or type of dimension, and the corresponding value
* is a list of dimension name groups. Each group is a list of dimension names that are logically
* grouped together.
*
* @return A map representing the groups of dimension names categorized by keys. Each key maps
* to a list of lists, where each inner list is a group of related dimension names.
*/
public Map<String, List<List<String>>> getDimensionGroups() {
return dimensionGroups;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics.reporter;

import java.util.Map;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;

import com.amazonaws.services.cloudwatch.model.Dimension;

/**
* Utility class for creating and managing CloudWatch dimensions for metrics reporting in Flint.
* It facilitates the construction of dimensions based on different system properties and environment
* variables, supporting the dynamic tagging of metrics with relevant information like job ID,
* application ID, and more.
*/
public class DimensionUtils {
private static final String DIMENSION_JOB_ID = "jobId";
private static final String DIMENSION_APPLICATION_ID = "applicationId";
private static final String DIMENSION_APPLICATION_NAME = "applicationName";
private static final String DIMENSION_DOMAIN_ID = "domainId";
private static final String DIMENSION_INSTANCE_ROLE = "instanceRole";
private static final String UNKNOWN = "UNKNOWN";

// Maps dimension names to functions that generate Dimension objects based on specific logic or environment variables
private static final Map<String, Function<String[], Dimension>> dimensionBuilders = Map.of(
DIMENSION_INSTANCE_ROLE, DimensionUtils::getInstanceRoleDimension,
DIMENSION_JOB_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_JOB_ID", DIMENSION_JOB_ID),
DIMENSION_APPLICATION_ID, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", DIMENSION_APPLICATION_ID),
DIMENSION_APPLICATION_NAME, ignored -> getEnvironmentVariableDimension("SERVERLESS_EMR_APPLICATION_NAME", DIMENSION_APPLICATION_NAME),
DIMENSION_DOMAIN_ID, ignored -> getEnvironmentVariableDimension("FLINT_CLUSTER_NAME", DIMENSION_DOMAIN_ID)
);

/**
* Constructs a CloudWatch Dimension object based on the provided dimension name. If a specific
* builder exists for the dimension name, it is used; otherwise, a default dimension is constructed.
*
* @param dimensionName The name of the dimension to construct.
* @param parts Additional information that might be required by specific dimension builders.
* @return A CloudWatch Dimension object.
*/
public static Dimension constructDimension(String dimensionName, String[] metricNameParts) {
if (!doesNameConsistsOfMetricNameSpace(metricNameParts)) {
throw new IllegalArgumentException("The provided metric name parts do not consist of a valid metric namespace.");
}
return dimensionBuilders.getOrDefault(dimensionName, ignored -> getDefaultDimension(dimensionName))
.apply(metricNameParts);
}

// This tries to replicate the logic here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L137
// Since we don't have access to Spark Configuration here: we are relying on the presence of executorId as part of the metricName.
public static boolean doesNameConsistsOfMetricNameSpace(String[] metricNameParts) {
return metricNameParts.length >= 3
&& (metricNameParts[1].equals("driver") || StringUtils.isNumeric(metricNameParts[1]));
}

/**
* Generates a Dimension object representing the instance role (either executor or driver) based on the
* metric name parts provided.
*
* @param parts An array where the second element indicates the role by being numeric (executor) or not (driver).
* @return A Dimension object with the instance role.
*/
private static Dimension getInstanceRoleDimension(String[] parts) {
String value = StringUtils.isNumeric(parts[1]) ? "executor" : parts[1];
return new Dimension().withName(DIMENSION_INSTANCE_ROLE).withValue(value);
}

/**
* Constructs a Dimension object using a system environment variable. If the environment variable is not found,
* it uses a predefined "UNKNOWN" value.
*
* @param envVarName The name of the environment variable to use for the dimension's value.
* @param dimensionName The name of the dimension.
* @return A Dimension object populated with the appropriate name and value.
*/
private static Dimension getEnvironmentVariableDimension(String envVarName, String dimensionName) {
String value = System.getenv().getOrDefault(envVarName, UNKNOWN);
return new Dimension().withName(dimensionName).withValue(value);
}

/**
* Provides a generic mechanism to construct a Dimension object with an environment variable value
* or a default value if the environment variable is not set.
*
* @param dimensionName The name of the dimension for which to retrieve the value.
* @return A Dimension object populated with the dimension name and its corresponding value.
*/
private static Dimension getDefaultDimension(String dimensionName) {
return getEnvironmentVariableDimension(dimensionName, dimensionName);
}
}
Loading

0 comments on commit cf3ff2a

Please sign in to comment.