Skip to content

Commit

Permalink
Merge branch 'main' into support-on-demand-incremental-refresh-rebased
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Feb 8, 2024
2 parents a8f51f0 + 0af83e4 commit 66165b3
Show file tree
Hide file tree
Showing 20 changed files with 1,212 additions and 90 deletions.
6 changes: 5 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ A Flint index is ...
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- BloomFilter: skip data scan by building a bloom filter of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance
- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset

Expand All @@ -23,7 +24,8 @@ Please see the following example in which Index Building Logic and Query Rewrite
|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Partition | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;year PARTITION,<br>&nbsp;&nbsp;month PARTITION,<br>&nbsp;&nbsp;day PARTITION,<br>&nbsp;&nbsp;hour PARTITION<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;FIRST(year) AS year,<br>&nbsp;&nbsp;FIRST(month) AS month,<br>&nbsp;&nbsp;FIRST(day) AS day,<br>&nbsp;&nbsp;FIRST(hour) AS hour,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE year = 2023 AND month = 4<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE year = 2023 AND month = 4<br>)<br>WHERE year = 2023 AND month = 4 |
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100 |
| BloomFilter | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;client_ip BLOOM_FILTER<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;BLOOM_FILTER_AGG(client_ip) AS client_ip,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE client_ip = '127.0.0.1'<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true<br>)<br>WHERE client_ip = '127.0.0.1' |

### Flint Index Refresh

Expand Down Expand Up @@ -73,6 +75,7 @@ For now, Flint Index doesn't define its own data type and uses OpenSearch field
| **FlintDataType** |
|-------------------|
| boolean |
| binary |
| long |
| integer |
| short |
Expand Down Expand Up @@ -469,6 +472,7 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +31,8 @@
import org.opensearch.flint.core.metrics.reporter.DimensionedCloudWatchReporter;
import org.opensearch.flint.core.metrics.reporter.DimensionedName;
import org.opensearch.flint.core.metrics.reporter.InvalidMetricsPropertyException;
import com.fasterxml.jackson.databind.ObjectMapper;


/**
* Implementation of the Spark metrics {@link Sink} interface
Expand All @@ -38,6 +45,7 @@
* @author kmccaw
*/
public class CloudWatchSink implements Sink {
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ScheduledReporter reporter;

Expand Down Expand Up @@ -198,12 +206,26 @@ public CloudWatchSink(
metricFilter = MetricFilter.ALL;
}

final Optional<String> dimensionGroupsProperty = getProperty(properties, PropertyKeys.DIMENSION_GROUPS);
DimensionNameGroups dimensionNameGroups = null;
if (dimensionGroupsProperty.isPresent()) {
try {
dimensionNameGroups = objectMapper.readValue(dimensionGroupsProperty.get(), DimensionNameGroups.class);
} catch (IOException e) {
final String message = String.format(
"Unable to parse value (%s) for the \"%s\" CloudWatchSink metrics property.",
dimensionGroupsProperty.get(),
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message, e);
}
}

final AmazonCloudWatchAsync cloudWatchClient = AmazonCloudWatchAsyncClient.asyncBuilder()
.withCredentials(awsCredentialsProvider)
.withRegion(awsRegion)
.build();

this.reporter = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
DimensionedCloudWatchReporter.Builder builder = DimensionedCloudWatchReporter.forRegistry(metricRegistry, cloudWatchClient, namespaceProperty.get())
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.filter(metricFilter)
Expand All @@ -220,8 +242,13 @@ public CloudWatchSink(
.withStatisticSet()
.withGlobalDimensions()
.withShouldParseDimensionsFromName(shouldParseInlineDimensions)
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension)
.build();
.withShouldAppendDropwizardTypeDimension(shouldAppendDropwizardTypeDimension);

if (dimensionNameGroups != null && dimensionNameGroups.getDimensionGroups() != null) {
builder = builder.withDimensionNameGroups(dimensionNameGroups);
}

this.reporter = builder.withDimensionNameGroups(dimensionNameGroups).build();
}

@Override
Expand Down Expand Up @@ -262,6 +289,7 @@ private static class PropertyKeys {
static final String SHOULD_PARSE_INLINE_DIMENSIONS = "shouldParseInlineDimensions";
static final String SHOULD_APPEND_DROPWIZARD_TYPE_DIMENSION = "shouldAppendDropwizardTypeDimension";
static final String METRIC_FILTER_REGEX = "regex";
static final String DIMENSION_GROUPS = "dimensionGroups";
}

/**
Expand All @@ -272,4 +300,45 @@ private static class PropertyDefaults {
static final TimeUnit POLLING_PERIOD_TIME_UNIT = TimeUnit.MINUTES;
static final boolean SHOULD_PARSE_INLINE_DIMENSIONS = false;
}

/**
* Represents a container for grouping dimension names used in metrics reporting.
* This class allows for the organization and storage of dimension names into logical groups,
* facilitating the dynamic construction and retrieval of dimension information for metrics.
*/
public static class DimensionNameGroups {
// Holds the grouping of dimension names categorized under different keys.
private Map<String, List<List<String>>> dimensionGroups = new HashMap<>();

/**
* Sets the mapping of dimension groups. Each key in the map represents a category or a type
* of dimension, and the value is a list of dimension name groups, where each group is itself
* a list of dimension names that are logically grouped together.
*
* @param dimensionGroups A map of dimension groups categorized by keys, where each key maps
* to a list of dimension name groups.
*/
public void setDimensionGroups(Map<String, List<List<String>>> dimensionGroups) {
if (dimensionGroups == null) {
final String message = String.format(
"Undefined value for the \"%s\" CloudWatchSink metrics property.",
PropertyKeys.DIMENSION_GROUPS);
throw new InvalidMetricsPropertyException(message);
}
this.dimensionGroups = dimensionGroups;
}

/**
* Retrieves the current mapping of dimension groups. The structure of the returned map is such
* that each key represents a specific category or type of dimension, and the corresponding value
* is a list of dimension name groups. Each group is a list of dimension names that are logically
* grouped together.
*
* @return A map representing the groups of dimension names categorized by keys. Each key maps
* to a list of lists, where each inner list is a group of related dimension names.
*/
public Map<String, List<List<String>>> getDimensionGroups() {
return dimensionGroups;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.field.bloomfilter;

import java.io.IOException;
import java.io.OutputStream;

/**
* Bloom filter interface inspired by [[org.apache.spark.util.sketch.BloomFilter]] but adapts to
* Flint index use and remove unnecessary API.
*/
public interface BloomFilter {

/**
* Bloom filter binary format version.
*/
enum Version {
V1(1);

private final int versionNumber;

Version(int versionNumber) {
this.versionNumber = versionNumber;
}

public int getVersionNumber() {
return versionNumber;
}
}

/**
* @return the number of bits in the underlying bit array.
*/
long bitSize();

/**
* Put an item into this bloom filter.
*
* @param item Long value item to insert
* @return true if bits changed which means the item must be first time added to the bloom filter.
* Otherwise, it maybe the first time or not.
*/
boolean put(long item);

/**
* Merge this bloom filter with another bloom filter.
*
* @param bloomFilter bloom filter to merge
* @return bloom filter after merged
*/
BloomFilter merge(BloomFilter bloomFilter);

/**
* @param item Long value item to check
* @return true if the item may exist in this bloom filter. Otherwise, it is definitely not exist.
*/
boolean mightContain(long item);

/**
* Serialize this bloom filter and write it to an output stream.
*
* @param out output stream to write
*/
void writeTo(OutputStream out) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.flint.core.field.bloomfilter.classic;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;

/**
* Bit array.
*/
class BitArray {
private final long[] data;
private long bitCount;

BitArray(long numBits) {
this(new long[numWords(numBits)]);
}

BitArray(long[] data) {
this.data = data;
long bitCount = 0;
for (long word : data) {
bitCount += Long.bitCount(word);
}
this.bitCount = bitCount;
}

/**
* @return array length in bits
*/
long bitSize() {
return (long) data.length * Long.SIZE;
}

/**
* @param index bit index
* @return whether bits at the given index is set
*/
boolean get(long index) {
return (data[(int) (index >>> 6)] & (1L << index)) != 0;
}

/**
* Set bits at the given index.
*
* @param index bit index
* @return bit changed or not
*/
boolean set(long index) {
if (!get(index)) {
data[(int) (index >>> 6)] |= (1L << index);
bitCount++;
return true;
}
return false;
}

/**
* Put another array in this bit array.
*
* @param array other bit array
*/
void putAll(BitArray array) {
assert data.length == array.data.length : "BitArrays must be of equal length when merging";
long bitCount = 0;
for (int i = 0; i < data.length; i++) {
data[i] |= array.data[i];
bitCount += Long.bitCount(data[i]);
}
this.bitCount = bitCount;
}

/**
* Serialize and write out this bit array to the given output stream.
*
* @param out output stream
*/
void writeTo(DataOutputStream out) throws IOException {
out.writeInt(data.length);
for (long datum : data) {
out.writeLong(datum);
}
}

/**
* Deserialize and read bit array from the given input stream.
*
* @param in input stream
* @return bit array
*/
static BitArray readFrom(DataInputStream in) throws IOException {
int numWords = in.readInt();
long[] data = new long[numWords];
for (int i = 0; i < numWords; i++) {
data[i] = in.readLong();
}
return new BitArray(data);
}

private static int numWords(long numBits) {
if (numBits <= 0) {
throw new IllegalArgumentException("numBits must be positive, but got " + numBits);
}
long numWords = (long) Math.ceil(numBits / 64.0);
if (numWords > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Can't allocate enough space for " + numBits + " bits");
}
return (int) numWords;
}

@Override
public boolean equals(Object other) {
if (this == other) return true;
if (!(other instanceof BitArray)) return false;
BitArray that = (BitArray) other;
return Arrays.equals(data, that.data);
}

@Override
public int hashCode() {
return Arrays.hashCode(data);
}
}
Loading

0 comments on commit 66165b3

Please sign in to comment.