Skip to content

Commit

Permalink
Updated javadoc and IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Feb 6, 2024
1 parent cb21548 commit 1bb5484
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@
*/
public class ClassicBloomFilter implements BloomFilter {

/**
* Bit array
*/
private final BitArray bits;

/**
* Number of hash function
*/
private final int numHashFunctions;

public ClassicBloomFilter(int expectedNumItems, double fpp) {
Expand Down Expand Up @@ -120,6 +126,12 @@ public void writeTo(OutputStream out) throws IOException {
bits.writeTo(dos);
}

/**
* Deserialize and read bloom filter from an input stream.
*
* @param in input stream
* @return bloom filter
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
DataInputStream dis = new DataInputStream(in);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ import org.apache.spark.sql.types.{BinaryType, DataType}

/**
* An aggregate function that builds a bloom filter and serializes it to binary as the result.
* This implementation is a customized version inspired by Spark's built-in BloomFilterAggregate.
* Spark's implementation only accepts number of bits, uses BloomFilterImpl and cannot be extended
* due to Scala case class restriction.
* This implementation is a customized version inspired by Spark's built-in
* [[org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate]].
*
* The reason of not reusing Spark's implementation include it only accepts expected number of
* bits, it couples with its own BloomFilterImpl and most importantly it cannot be extended due to
* Scala case class restriction.
*
* @param child
* child expression that generate Long values for creating a bloom filter
Expand Down Expand Up @@ -73,8 +76,8 @@ case class BloomFilterAgg(
}

override def serialize(buffer: BloomFilter): Array[Byte] = {
// BloomFilterImpl.writeTo() writes 2 integers (version number and num hash functions), hence
// the +8
// Preallocate space. BloomFilter.writeTo() writes 2 integers (version number and
// num hash functions) first, hence +8
val size = (buffer.bitSize() / 8) + 8
require(size <= Integer.MAX_VALUE, s"actual number of bits is too large $size")
val out = new ByteArrayOutputStream(size.intValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,25 +335,9 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
flint.refreshIndex(testIndex)

// Assert index data
/*
checkAnswer(
flint.queryIndex(testIndex).select("age"),
Seq(Row(20, 30), Row(40, 60)))
*/
flint.queryIndex(testIndex).collect() should have size 2

// Assert query rewrite
/*
val query = sql(s"""
| SELECT name
| FROM $testTable
| WHERE age = 30
|""".stripMargin)
checkAnswer(query, Row("World"))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(
hasIndexFilter(col("MinMax_age_0") <= 30 && col("MinMax_age_1") >= 30))
*/
// TODO: Assert query rewrite result
}

test("should rewrite applicable query with table name without database specified") {
Expand Down

0 comments on commit 1bb5484

Please sign in to comment.