Skip to content

Commit

Permalink
Update javadoc and user manual
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Feb 5, 2024
1 parent d6b3d10 commit 9355532
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 14 deletions.
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ A Flint index is ...
- Partition: skip data scan by maintaining and filtering partitioned column value per file.
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- BloomFilter: skip data scan by building a bloom filter of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance
- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset

Expand Down Expand Up @@ -54,6 +55,7 @@ For now, Flint Index doesn't define its own data type and uses OpenSearch field
| **FlintDataType** |
|-------------------|
| boolean |
| binary |
| long |
| integer |
| short |
Expand Down Expand Up @@ -447,6 +449,7 @@ flint.skippingIndex()
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.addBloomFilter("client_ip")
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,37 @@
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.flint.core.field.bloomfilter.classic;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;

/**
* Bit array.
*/
class BitArray {
private final long[] data;
private long bitCount;
Expand All @@ -26,14 +51,27 @@ class BitArray {
this.bitCount = bitCount;
}

/**
* @return array length in bits
*/
long bitSize() {
return (long) data.length * Long.SIZE;
}

/**
* @param index bit index
* @return whether bits at the given index is set
*/
boolean get(long index) {
return (data[(int) (index >>> 6)] & (1L << index)) != 0;
}

/**
* Set bits at the given index.
*
* @param index bit index
* @return bit changed or not
*/
boolean set(long index) {
if (!get(index)) {
data[(int) (index >>> 6)] |= (1L << index);
Expand All @@ -43,6 +81,11 @@ boolean set(long index) {
return false;
}

/**
* Put another array in this bit array.
*
* @param array other bit array
*/
void putAll(BitArray array) {
assert data.length == array.data.length : "BitArrays must be of equal length when merging";
long bitCount = 0;
Expand All @@ -53,13 +96,24 @@ void putAll(BitArray array) {
this.bitCount = bitCount;
}

/**
* Serialize and write out this bit array to the given output stream.
*
* @param out output stream
*/
void writeTo(DataOutputStream out) throws IOException {
out.writeInt(data.length);
for (long datum : data) {
out.writeLong(datum);
}
}

/**
* Deserialize and read bit array from the given input stream.
*
* @param in input stream
* @return bit array
*/
static BitArray readFrom(DataInputStream in) throws IOException {
int numWords = in.readInt();
long[] data = new long[numWords];
Expand All @@ -79,4 +133,17 @@ private static int numWords(long numBits) {
}
return (int) numWords;
}

@Override
public boolean equals(Object other) {
if (this == other) return true;
if (!(other instanceof BitArray)) return false;
BitArray that = (BitArray) other;
return Arrays.equals(data, that.data);
}

@Override
public int hashCode() {
return Arrays.hashCode(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.flint.core.field.bloomfilter.classic;

import java.io.DataInputStream;
Expand All @@ -13,7 +34,13 @@
import org.opensearch.flint.core.field.bloomfilter.BloomFilter;

/**
* Classic bloom filter implementation.
* Classic bloom filter implementation inspired by [[org.apache.spark.util.sketch.BloomFilterImpl]]
* but only keep minimal functionality. Bloom filter is serialized in the following format:
* <p>
* 1) Version number, always 1 (32 bit)
* 2) Number of hash functions (32 bit)
* 3) Total number of words of the underlying bit array (32 bit)
* 4) The words/longs (numWords * 64 bit)
*/
public class ClassicBloomFilter implements BloomFilter {

Expand Down Expand Up @@ -100,7 +127,6 @@ public static BloomFilter readFrom(InputStream in) throws IOException {
if (version != Version.V1.getVersionNumber()) {
throw new IOException("Unexpected Bloom filter version number (" + version + ")");
}

int numHashFunctions = dis.readInt();
BitArray bits = BitArray.readFrom(dis);
return new ClassicBloomFilter(bits, numHashFunctions);
Expand All @@ -114,4 +140,21 @@ private static int optimalNumOfHashFunctions(long n, long m) {
private static long optimalNumOfBits(long n, double p) {
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof ClassicBloomFilter)) {
return false;
}
ClassicBloomFilter that = (ClassicBloomFilter) other;
return this.numHashFunctions == that.numHashFunctions && this.bits.equals(that.bits);
}

@Override
public int hashCode() {
return bits.hashCode() * 31 + numHashFunctions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,43 @@
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opensearch.flint.core.field.bloomfilter.classic;

/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
*/
class Murmur3_x86_32 {
private static final int C1 = 0xcc9e2d51;
private static final int C2 = 0x1b873593;

/**
* Calculate hash for the given input long.
*
* @param input long value
* @param seed seed
* @return hash value
*/
static int hashLong(long input, int seed) {
int low = (int) input;
int high = (int) (input >>> 32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@

package org.opensearch.flint.core.field.bloomfilter.classic;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.junit.Test;
import org.opensearch.flint.core.field.bloomfilter.BloomFilter;

public class ClassicBloomFilterTest {

Expand Down Expand Up @@ -47,4 +53,17 @@ public void shouldReturnFalsePositiveLessThanConfigured() {
assertTrue(actualFalsePositiveRate <= ACCEPTABLE_FALSE_POSITIVE_RATE,
"Actual false positive rate is higher than expected");
}

@Test
public void shouldBeTheSameAfterWriteToAndReadFrom() throws IOException {
bloomFilter.put(123L);
bloomFilter.put(456L);
bloomFilter.put(789L);

ByteArrayOutputStream out = new ByteArrayOutputStream();
bloomFilter.writeTo(out);
InputStream in = new ByteArrayInputStream(out.toByteArray());
BloomFilter newBloomFilter = ClassicBloomFilter.readFrom(in);
assertEquals(bloomFilter, newBloomFilter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate,
import org.apache.spark.sql.types.{BinaryType, DataType}

/**
* Aggregate function that build bloom filter and serialize to binary as result. Copy from Spark
* built-in BloomFilterAggregate because it: 1) it accepts number of bits as argument instead of
* FPP 2) it calls static method BloomFilter.create and thus cannot change to other implementation
* 3) it is a Scala case class that cannot be extend and overridden
* An aggregate function that builds a bloom filter and serializes it to binary as the result.
* This implementation is a customized version inspired by Spark's built-in BloomFilterAggregate.
* Spark's implementation only accepts number of bits, uses BloomFilterImpl and cannot be extended
* due to Scala case class restriction.
*
* @param child
* child expression of
* @param bloomFilter
* @param mutableAggBufferOffset
* @param inputAggBufferOffset
* child expression that generate Long values for creating a bloom filter
* @param expectedNumItems
* expected maximum unique number of items
* @param fpp
* false positive probability
*/
case class BloomFilterAgg(
child: Expression,
Expand All @@ -45,13 +46,13 @@ case class BloomFilterAgg(

override def children: Seq[Expression] = Seq(child)

override def createAggregationBuffer(): BloomFilter =
override def createAggregationBuffer(): BloomFilter = {
new ClassicBloomFilter(expectedNumItems, fpp)
}

override def update(buffer: BloomFilter, inputRow: InternalRow): BloomFilter = {
val value = child.eval(inputRow)
// Ignore null values.
if (value == null) {
if (value == null) { // Ignore null values
return buffer
}
buffer.put(value.asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class BloomFilterSkippingStrategy(
Seq(
new BloomFilterAgg(xxhash64(col(columnName)).expr, expectedNumItems, fpp)
.toAggregateExpression()
) // TODO: always xxhash64 ?
) // TODO: use xxhash64() for now
}

override def rewritePredicate(predicate: Expression): Option[Expression] = None
Expand Down

0 comments on commit 9355532

Please sign in to comment.