From 19707ef9b9ec578f678105f5d3daa7f847eff241 Mon Sep 17 00:00:00 2001 From: Shannon Wells Date: Tue, 28 Nov 2023 21:51:10 +0000 Subject: [PATCH] Support bloom filters in lists/nested columns (#105) For bloom filters, treat column names as the full path and not just the first part of the path in the schema. Closes #98 with @rlaferla Co-authored-by: Wil Wade --- lib/bloomFilterIO/bloomFilterReader.ts | 4 +- lib/reader.ts | 40 ++- lib/writer.ts | 202 +++++++------- test/bloomFilterIntegration.ts | 368 ++++++++++++++++--------- 4 files changed, 373 insertions(+), 241 deletions(-) diff --git a/lib/bloomFilterIO/bloomFilterReader.ts b/lib/bloomFilterIO/bloomFilterReader.ts index e6f080ad..c396b26e 100644 --- a/lib/bloomFilterIO/bloomFilterReader.ts +++ b/lib/bloomFilterIO/bloomFilterReader.ts @@ -117,11 +117,11 @@ export const siftAllByteOffsets = ( }; export const getBloomFiltersFor = async ( - columnNames: Array, + paths: Array, envelopeReader: InstanceType ) => { const columnChunkDataCollection = envelopeReader.getAllColumnChunkDataFor( - columnNames + paths ); const bloomFilterOffsetData = siftAllByteOffsets(columnChunkDataCollection); const offsetByteValues = bloomFilterOffsetData.map( diff --git a/lib/reader.ts b/lib/reader.ts index 456ca29d..1be3ad48 100644 --- a/lib/reader.ts +++ b/lib/reader.ts @@ -6,11 +6,25 @@ import * as parquet_schema from './schema'; import * as parquet_codec from './codec'; import * as parquet_compression from './compression'; import * as parquet_types from './types'; -import BufferReader , { BufferReaderOptions } from './bufferReader'; +import BufferReader, {BufferReaderOptions} from './bufferReader'; import * as bloomFilterReader from './bloomFilterIO/bloomFilterReader'; import fetch from 'cross-fetch'; -import { ParquetCodec, Parameter,PageData, SchemaDefinition, ParquetType, FieldDefinition, ParquetField, ClientS3, ClientParameters, FileMetaDataExt, NewPageHeader, RowGroupExt, ColumnChunkExt } from './declare'; -import { Cursor, Options } from './codec/types'; +import { + ParquetCodec, + Parameter, + PageData, + SchemaDefinition, + ParquetType, + FieldDefinition, + ParquetField, + ClientS3, + ClientParameters, + FileMetaDataExt, + NewPageHeader, + RowGroupExt, + ColumnChunkExt +} from './declare'; +import {Cursor, Options} from './codec/types'; const { getBloomFiltersFor, @@ -35,7 +49,7 @@ const PARQUET_RDLVL_ENCODING = 'RLE'; /** * A parquet cursor is used to retrieve rows from a parquet file in order */ -class ParquetCursor { +class ParquetCursor { metadata: FileMetaDataExt; envelopeReader: ParquetEnvelopeReader; @@ -44,13 +58,14 @@ class ParquetCursor { rowGroup: Array; rowGroupIndex: number; cursorIndex: number; + /** * Create a new parquet reader from the file metadata and an envelope reader. * It is usually not recommended to call this constructor directly except for * advanced and internal use cases. Consider using getCursor() on the * ParquetReader instead */ - constructor( metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, schema: parquet_schema.ParquetSchema, columnList: Array>) { + constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, schema: parquet_schema.ParquetSchema, columnList: Array>) { this.metadata = metadata; this.envelopeReader = envelopeReader; this.schema = schema; @@ -72,9 +87,9 @@ class ParquetCursor { } let rowBuffer = await this.envelopeReader.readRowGroup( - this.schema, - this.metadata.row_groups[this.rowGroupIndex], - this.columnList); + this.schema, + this.metadata.row_groups[this.rowGroupIndex], + this.columnList); this.rowGroup = parquet_shredder.materializeRecords(this.schema, rowBuffer); this.rowGroupIndex++; @@ -363,7 +378,7 @@ export class ParquetReader { } decodePages(buffer: Buffer, opts: Options) { - return decodePages(buffer,opts); + return decodePages(buffer, opts); } } @@ -375,6 +390,7 @@ export class ParquetReader { * rows from a parquet file use the ParquetReader instead */ let ParquetEnvelopeReaderIdCounter = 0; + export class ParquetEnvelopeReader { readFn: (offset: number, length: number, file?: string) => Promise; close: () => unknown; @@ -393,7 +409,7 @@ export class ParquetEnvelopeReader { return Promise.reject('external references are not supported'); } - return parquet_util.fread(fileDescriptor, offset, length); + return parquet_util.fread(fileDescriptor, offset, length); }; let closeFn = parquet_util.fclose.bind(undefined, fileDescriptor); @@ -407,7 +423,7 @@ export class ParquetEnvelopeReader { return Promise.reject('external references are not supported'); } - return Promise.resolve(buffer.slice(offset,offset+length)); + return Promise.resolve(buffer.slice(offset, offset + length)); }; let closeFn = () => ({}); @@ -415,7 +431,7 @@ export class ParquetEnvelopeReader { } static async openS3(client: ClientS3, params: ClientParameters, options?: BufferReaderOptions) { - let fileStat = async () => client.headObject(params).promise().then((d: {ContentLength: number}) => d.ContentLength); + let fileStat = async () => client.headObject(params).promise().then((d: { ContentLength: number }) => d.ContentLength); let readFn = async (offset: number, length: number, file?: string) => { if (file) { diff --git a/lib/writer.ts b/lib/writer.ts index 78830373..64b29812 100644 --- a/lib/writer.ts +++ b/lib/writer.ts @@ -1,14 +1,22 @@ import stream from 'stream' -import parquet_thrift, { ConvertedType } from '../gen-nodejs/parquet_types' +import parquet_thrift, {ConvertedType} from '../gen-nodejs/parquet_types' import * as parquet_shredder from './shred' import * as parquet_util from './util' import * as parquet_codec from './codec' import * as parquet_compression from './compression' import * as parquet_types from './types' import * as bloomFilterWriter from "./bloomFilterIO/bloomFilterWriter" -import { WriterOptions, ParquetCodec, ParquetField, ColumnMetaDataExt, RowGroupExt, Page, FieldDefinition } from './declare' -import { Options } from './codec/types' -import { ParquetSchema } from './schema' +import { + WriterOptions, + ParquetCodec, + ParquetField, + ColumnMetaDataExt, + RowGroupExt, + Page, + FieldDefinition +} from './declare' +import {Options} from './codec/types' +import {ParquetSchema} from './schema' import Int64 from 'node-int64' import SplitBlockBloomFilter from './bloom/sbbf' @@ -67,9 +75,9 @@ export class ParquetWriter { } let envelopeWriter = await ParquetEnvelopeWriter.openStream( - schema, - outputStream, - opts); + schema, + outputStream, + opts); return new ParquetWriter(schema, envelopeWriter, opts); } @@ -134,7 +142,7 @@ export class ParquetWriter { if (this.envelopeWriter) { if (this.rowBuffer.rowCount! > 0 || this.rowBuffer.rowCount! >= this.rowGroupSize) { - await encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2, bloomFilters: this.envelopeWriter.bloomFilters}); + await encodePages(this.schema, this.rowBuffer, {useDataPageV2: this.envelopeWriter.useDataPageV2, bloomFilters: this.envelopeWriter.bloomFilters}); await this.envelopeWriter.writeRowGroup(this.rowBuffer); this.rowBuffer = {}; @@ -214,7 +222,7 @@ export class ParquetEnvelopeWriter { this.offset = fileOffset; this.rowCount = new Int64(0); this.rowGroups = []; - this.pageSize = opts.pageSize || PARQUET_DEFAULT_PAGE_SIZE; + this.pageSize = opts.pageSize || PARQUET_DEFAULT_PAGE_SIZE; this.useDataPageV2 = ("useDataPageV2" in opts) ? opts.useDataPageV2! : true; this.pageIndex = opts.pageIndex!; this.bloomFilters = {}; @@ -242,14 +250,14 @@ export class ParquetEnvelopeWriter { */ async writeRowGroup(records: parquet_shredder.RecordBuffer) { let rgroup = await encodeRowGroup( - this.schema, - records, - { - baseOffset: this.offset, - pageSize: this.pageSize, - useDataPageV2: this.useDataPageV2, - pageIndex: this.pageIndex - }); + this.schema, + records, + { + baseOffset: this.offset, + pageSize: this.pageSize, + useDataPageV2: this.useDataPageV2, + pageIndex: this.pageIndex + }); this.rowCount.setValue(this.rowCount.valueOf() + records.rowCount!); this.rowGroups.push(rgroup.metadata); @@ -259,11 +267,16 @@ export class ParquetEnvelopeWriter { writeBloomFilters() { this.rowGroups.forEach(group => { group.columns.forEach(column => { - const columnName = column.meta_data?.path_in_schema[0]; - if (!columnName || columnName in this.bloomFilters === false) return; + if (!column.meta_data?.path_in_schema.length) { + return + } + const filterName = column.meta_data?.path_in_schema.join(','); + if (!(filterName in this.bloomFilters)) { + return; + } const serializedBloomFilterData = - bloomFilterWriter.getSerializedBloomFilterData(this.bloomFilters[columnName]); + bloomFilterWriter.getSerializedBloomFilterData(this.bloomFilters[filterName]); bloomFilterWriter.setFilterOffset(column, this.offset); @@ -276,7 +289,7 @@ export class ParquetEnvelopeWriter { * Write the columnIndices and offsetIndices */ writeIndex() { - this.schema.fieldList.forEach( (c,i) => { + this.schema.fieldList.forEach((c, i) => { this.rowGroups.forEach(group => { let column = group.columns[i]; if (!column) return; @@ -313,7 +326,7 @@ export class ParquetEnvelopeWriter { } return this.writeSection( - encodeFooter(this.schema, this.rowCount, this.rowGroups, userMetadata)); + encodeFooter(this.schema, this.rowCount, this.rowGroups, userMetadata)); }; /** @@ -334,18 +347,19 @@ export class ParquetTransformer extends stream.Transform { writer: ParquetWriter; constructor(schema: ParquetSchema, opts = {}) { - super({ objectMode: true }); + super({objectMode: true}); - let writeProxy = (function(t) { - return function(b: unknown) { + let writeProxy = (function (t) { + return function (b: unknown) { t.push(b); } })(this); this.writer = new ParquetWriter( - schema, - new ParquetEnvelopeWriter(schema, writeProxy, function() {}, new Int64(0), opts), - opts); + schema, + new ParquetEnvelopeWriter(schema, writeProxy, function () { + }, new Int64(0), opts), + opts); } _transform(row: Record, _encoding: string, callback: Function) { @@ -365,7 +379,7 @@ export class ParquetTransformer extends stream.Transform { _flush(callback: (foo: any, bar?: any) => any) { this.writer.close() - .then(d => callback(null, d), callback); + .then(d => callback(null, d), callback); } } @@ -389,13 +403,13 @@ function encodeStatisticsValue(value: any, column: ParquetField | Options) { value = parquet_types.toPrimitive(column.originalType, value, column); } if (column.primitiveType !== 'BYTE_ARRAY') { - value = encodeValues(column.primitiveType!,'PLAIN',[value],column); + value = encodeValues(column.primitiveType!, 'PLAIN', [value], column); } return value; } -function encodeStatistics(statistics: parquet_thrift.Statistics,column: ParquetField | Options) { - statistics = Object.assign({},statistics); +function encodeStatistics(statistics: parquet_thrift.Statistics, column: ParquetField | Options) { + statistics = Object.assign({}, statistics); statistics.min_value = statistics.min_value === undefined ? null : encodeStatisticsValue(statistics.min_value, column); statistics.max_value = statistics.max_value === undefined ? null : encodeStatisticsValue(statistics.max_value, column); @@ -405,7 +419,7 @@ function encodeStatistics(statistics: parquet_thrift.Statistics,column: ParquetF return new parquet_thrift.Statistics(statistics); } -async function encodePages(schema: ParquetSchema, rowBuffer: parquet_shredder.RecordBuffer, opts: {bloomFilters: Record, useDataPageV2: boolean}) {// generic +async function encodePages(schema: ParquetSchema, rowBuffer: parquet_shredder.RecordBuffer, opts: { bloomFilters: Record, useDataPageV2: boolean }) {// generic if (!rowBuffer.pageRowCount) { return; } @@ -416,17 +430,19 @@ async function encodePages(schema: ParquetSchema, rowBuffer: parquet_shredder.Re } let page; - const values = rowBuffer.columnData![field.path.join(',')]; - if (opts.bloomFilters && (field.name in opts.bloomFilters)) { - const splitBlockBloomFilter = opts.bloomFilters[field.name]; + const columnPath = field.path.join(','); + const values = rowBuffer.columnData![columnPath]; + + if (opts.bloomFilters && (columnPath in opts.bloomFilters)) { + const splitBlockBloomFilter = opts.bloomFilters[columnPath]; values.values!.forEach(v => splitBlockBloomFilter.insert(v)); } let statistics: parquet_thrift.Statistics = {}; if (field.statistics !== false) { statistics = {}; - [...values.distinct_values!].forEach( (v,i) => { + [...values.distinct_values!].forEach((v, i) => { if (i === 0 || v > statistics.max_value!) { statistics.max_value = v; } @@ -457,7 +473,7 @@ async function encodePages(schema: ParquetSchema, rowBuffer: parquet_shredder.Re } let pages = rowBuffer.pages![field.path.join(',')]; - let lastPage = pages[pages.length-1]; + let lastPage = pages[pages.length - 1]; let first_row_index = lastPage ? lastPage.first_row_index + lastPage.count! : 0; pages.push({ page, @@ -484,37 +500,37 @@ async function encodePages(schema: ParquetSchema, rowBuffer: parquet_shredder.Re async function encodeDataPage(column: ParquetField, values: number[], rlevels: number[], dlevels: number[], statistics: parquet_thrift.Statistics) { /* encode values */ let valuesBuf = encodeValues( - column.primitiveType!, - column.encoding!, - values, { - bitWidth: column.typeLength, - ...column - }); + column.primitiveType!, + column.encoding!, + values, { + bitWidth: column.typeLength, + ...column + }); /* encode repetition and definition levels */ let rLevelsBuf = Buffer.alloc(0); if (column.rLevelMax > 0) { rLevelsBuf = encodeValues( - PARQUET_RDLVL_TYPE, - PARQUET_RDLVL_ENCODING, - rlevels, - { bitWidth: parquet_util.getBitWidth(column.rLevelMax) }); + PARQUET_RDLVL_TYPE, + PARQUET_RDLVL_ENCODING, + rlevels, + {bitWidth: parquet_util.getBitWidth(column.rLevelMax)}); } let dLevelsBuf = Buffer.alloc(0); if (column.dLevelMax > 0) { dLevelsBuf = encodeValues( - PARQUET_RDLVL_TYPE, - PARQUET_RDLVL_ENCODING, - dlevels, - { bitWidth: parquet_util.getBitWidth(column.dLevelMax) }); + PARQUET_RDLVL_TYPE, + PARQUET_RDLVL_ENCODING, + dlevels, + {bitWidth: parquet_util.getBitWidth(column.dLevelMax)}); } /* build page header */ let pageBody = Buffer.concat([rLevelsBuf, dLevelsBuf, valuesBuf]); pageBody = await parquet_compression.deflate( - column.compression!, - pageBody); + column.compression!, + pageBody); let pageHeader = new parquet_thrift.PageHeader(); pageHeader.type = parquet_thrift.PageType['DATA_PAGE']; @@ -528,9 +544,9 @@ async function encodeDataPage(column: ParquetField, values: number[], rlevels: n pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding!]; pageHeader.data_page_header.definition_level_encoding = - parquet_thrift.Encoding[PARQUET_RDLVL_ENCODING]; + parquet_thrift.Encoding[PARQUET_RDLVL_ENCODING]; pageHeader.data_page_header.repetition_level_encoding = - parquet_thrift.Encoding[PARQUET_RDLVL_ENCODING]; + parquet_thrift.Encoding[PARQUET_RDLVL_ENCODING]; /* concat page header, repetition and definition levels and values */ return Buffer.concat([parquet_util.serializeThrift(pageHeader), pageBody]); @@ -542,38 +558,38 @@ async function encodeDataPage(column: ParquetField, values: number[], rlevels: n async function encodeDataPageV2(column: ParquetField, rowCount: number, values: number[], rlevels: number[], dlevels: number[], statistics: parquet_thrift.Statistics) { /* encode values */ let valuesBuf = encodeValues( - column.primitiveType!, - column.encoding!, - values, { - bitWidth: column.typeLength, - ...column, - }); + column.primitiveType!, + column.encoding!, + values, { + bitWidth: column.typeLength, + ...column, + }); let valuesBufCompressed = await parquet_compression.deflate( - column.compression!, - valuesBuf); + column.compression!, + valuesBuf); /* encode repetition and definition levels */ let rLevelsBuf = Buffer.alloc(0); if (column.rLevelMax > 0) { rLevelsBuf = encodeValues( - PARQUET_RDLVL_TYPE, - PARQUET_RDLVL_ENCODING, - rlevels, { - bitWidth: parquet_util.getBitWidth(column.rLevelMax), - disableEnvelope: true - }); + PARQUET_RDLVL_TYPE, + PARQUET_RDLVL_ENCODING, + rlevels, { + bitWidth: parquet_util.getBitWidth(column.rLevelMax), + disableEnvelope: true + }); } let dLevelsBuf = Buffer.alloc(0); if (column.dLevelMax > 0) { dLevelsBuf = encodeValues( - PARQUET_RDLVL_TYPE, - PARQUET_RDLVL_ENCODING, - dlevels, { - bitWidth: parquet_util.getBitWidth(column.dLevelMax), - disableEnvelope: true - }); + PARQUET_RDLVL_TYPE, + PARQUET_RDLVL_ENCODING, + dlevels, { + bitWidth: parquet_util.getBitWidth(column.dLevelMax), + disableEnvelope: true + }); } /* build page header */ @@ -589,35 +605,33 @@ async function encodeDataPageV2(column: ParquetField, rowCount: number, values: } pageHeader.uncompressed_page_size = - rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length; + rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length; pageHeader.compressed_page_size = - rLevelsBuf.length + dLevelsBuf.length + valuesBufCompressed.length; + rLevelsBuf.length + dLevelsBuf.length + valuesBufCompressed.length; pageHeader.data_page_header_v2.encoding = parquet_thrift.Encoding[column.encoding!]; pageHeader.data_page_header_v2.definition_levels_byte_length = dLevelsBuf.length; pageHeader.data_page_header_v2.repetition_levels_byte_length = rLevelsBuf.length; pageHeader.data_page_header_v2.is_compressed = - column.compression !== 'UNCOMPRESSED'; + column.compression !== 'UNCOMPRESSED'; /* concat page header, repetition and definition levels and values */ return Buffer.concat([ - parquet_util.serializeThrift(pageHeader), - rLevelsBuf, - dLevelsBuf, - valuesBufCompressed]); + parquet_util.serializeThrift(pageHeader), + rLevelsBuf, + dLevelsBuf, + valuesBufCompressed]); } - - /** * Encode an array of values into a parquet column chunk */ -async function encodeColumnChunk(pages: Page[], opts: {column: ParquetField, baseOffset: number, pageSize: number, rowCount: number, useDataPageV2: boolean, pageIndex: boolean}) { +async function encodeColumnChunk(pages: Page[], opts: { column: ParquetField, baseOffset: number, pageSize: number, rowCount: number, useDataPageV2: boolean, pageIndex: boolean }) { let pagesBuf = Buffer.concat(pages.map(d => d.page)); - let num_values = pages.reduce((p,d) => p + d.num_values, 0); + let num_values = pages.reduce((p, d) => p + d.num_values, 0); let offset = opts.baseOffset; /* prepare metadata header */ @@ -663,9 +677,9 @@ async function encodeColumnChunk(pages: Page[], opts: {column: ParquetField, bas page.distinct_values.forEach((value: unknown) => distinct_values.add(value)); // If the number of values and the count of nulls are the same, this is a null page - columnIndex.null_pages.push( page.num_values === statistics.null_count.valueOf() ); - columnIndex.max_values.push( encodeStatisticsValue(page.statistics.max_value, opts.column) ); - columnIndex.min_values.push( encodeStatisticsValue(page.statistics.min_value, opts.column) ); + columnIndex.null_pages.push(page.num_values === statistics.null_count.valueOf()); + columnIndex.max_values.push(encodeStatisticsValue(page.statistics.max_value, opts.column)); + columnIndex.min_values.push(encodeStatisticsValue(page.statistics.min_value, opts.column)); } let pageLocation = new parquet_thrift.PageLocation(); @@ -695,14 +709,14 @@ async function encodeColumnChunk(pages: Page[], opts: {column: ParquetField, bas /* concat metadata header and data pages */ let metadataOffset = opts.baseOffset + pagesBuf.length; let body = Buffer.concat([pagesBuf, parquet_util.serializeThrift(metadata)]); - return { body, metadata, metadataOffset }; + return {body, metadata, metadataOffset}; } /** * Encode a list of column values into a parquet row group */ async function encodeRowGroup(schema: ParquetSchema, data: parquet_shredder.RecordBuffer, opts: WriterOptions) { - let metadata: RowGroupExt = new parquet_thrift .RowGroup(); + let metadata: RowGroupExt = new parquet_thrift.RowGroup(); metadata.num_rows = new Int64(data.rowCount!); metadata.columns = []; metadata.total_byte_size = new Int64(0); @@ -733,7 +747,7 @@ async function encodeRowGroup(schema: ParquetSchema, data: parquet_shredder.Reco body = Buffer.concat([body, cchunkData.body]); } - return { body, metadata }; + return {body, metadata}; } /** @@ -778,7 +792,7 @@ function encodeFooter(schema: ParquetSchema, rowCount: Int64, rowGroups: RowGrou } // Support Decimal - switch(schemaElem.converted_type) { + switch (schemaElem.converted_type) { case (ConvertedType.DECIMAL): schemaElem.precision = field.precision; schemaElem.scale = field.scale || 0; diff --git a/test/bloomFilterIntegration.ts b/test/bloomFilterIntegration.ts index bc43f096..aa41de17 100644 --- a/test/bloomFilterIntegration.ts +++ b/test/bloomFilterIntegration.ts @@ -1,155 +1,257 @@ -import { assert }from "chai"; +import {assert} from "chai"; import parquet from "../parquet"; + +import parquet_thrift from "../gen-nodejs/parquet_types"; +import {decodeThrift} from "../lib/util"; +import SplitBlockBloomFilter from "../lib/bloom/sbbf"; + const TEST_VTIME = new Date(); -type BloomFilters = { - name: Array - quantity: Array -}; - -const schema = new parquet.ParquetSchema({ - name: { type: "UTF8" }, - quantity: { type: "INT64", optional: true }, - price: { type: "DOUBLE" }, - date: { type: "TIMESTAMP_MICROS" }, - day: { type: "DATE" }, - finger: { type: "FIXED_LEN_BYTE_ARRAY", typeLength: 5 }, - inter: { type: "INTERVAL", statistics: false }, - stock: { - repeated: true, - fields: { - quantity: { type: "INT64", repeated: true }, - warehouse: { type: "UTF8" }, - }, - }, - colour: { type: "UTF8", repeated: true }, - meta_json: { type: "BSON", optional: true, statistics: false }, -}); +const TEST_FILE = 'fruits-bloomfilter.parquet' + +type BloomFilterColumnData = { + sbbf: SplitBlockBloomFilter, + columnName: string, + rowGroupIndex: number, +} + +const sampleColumnHeaders = async (filename: string) => { + let reader = await parquet.ParquetReader.openFile(filename); + + + let column = reader.metadata!.row_groups[0].columns[0]; + let buffer = await reader!.envelopeReader!.read(+column!.meta_data!.data_page_offset, +column!.meta_data!.total_compressed_size); + + let cursor = { + buffer: buffer, + offset: 0, + size: buffer.length + }; + + const pages = []; + + while (cursor.offset < cursor.size) { + const pageHeader = new parquet_thrift.PageHeader(); + cursor.offset += decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); + pages.push(pageHeader); + cursor.offset += pageHeader.compressed_page_size; + } + return {column, pages}; +} describe("bloom filter", async function () { let row: any; let reader: any; - let bloomFilters: BloomFilters; - - before(async function () { - const options = { - pageSize: 3, - bloomFilters: [ - { - column: "name", - numFilterBytes: 1024, - }, - { - column: "quantity", - numFilterBytes: 1024, + let bloomFilters: Record>; + + describe("a nested schema", () => { + const schema = new parquet.ParquetSchema({ + name: {type: "UTF8"}, + quantity: {type: "INT64", optional: true}, + price: {type: "DOUBLE"}, + date: {type: "TIMESTAMP_MICROS"}, + day: {type: "DATE"}, + finger: {type: "FIXED_LEN_BYTE_ARRAY", typeLength: 5}, + inter: {type: "INTERVAL", statistics: false}, + stock: { + repeated: true, + fields: { + quantity: {type: "INT64", repeated: true}, + warehouse: {type: "UTF8"}, }, - ], - }; - - let writer = await parquet.ParquetWriter.openFile( - schema, - "fruits-bloomfilter.parquet", - options - ); - - await writer.appendRow({ - name: "apples", - quantity: BigInt(10), - price: 2.6, - day: new Date("2017-11-26"), - date: new Date(TEST_VTIME.valueOf() + 1000), - finger: "FNORD", - inter: { months: 10, days: 5, milliseconds: 777 }, - colour: ["green", "red"], + }, + colour: {type: "UTF8", repeated: true}, + meta_json: {type: "BSON", optional: true, statistics: false}, }); + before(async function () { + const options = { + pageSize: 3, + bloomFilters: [ + { + column: "name", + numFilterBytes: 1024, + }, + { + column: "quantity", + numFilterBytes: 1024, + }, + { + column: "stock,warehouse", + numFilterBytes: 1024, + } + ], + }; - await writer.appendRow({ - name: "oranges", - quantity: BigInt(20), - price: 2.7, - day: new Date("2018-03-03"), - date: new Date(TEST_VTIME.valueOf() + 2000), - finger: "ABCDE", - inter: { months: 42, days: 23, milliseconds: 777 }, - colour: ["orange"], + let writer = await parquet.ParquetWriter.openFile(schema, TEST_FILE, options); + + await writer.appendRow({ + name: "apples", + quantity: BigInt(10), + price: 2.6, + day: new Date("2017-11-26"), + date: new Date(TEST_VTIME.valueOf() + 1000), + finger: "FNORD", + inter: {months: 10, days: 5, milliseconds: 777}, + colour: ["green", "red"], + }); + + await writer.appendRow({ + name: "oranges", + quantity: BigInt(20), + price: 2.7, + day: new Date("2018-03-03"), + date: new Date(TEST_VTIME.valueOf() + 2000), + finger: "ABCDE", + inter: {months: 42, days: 23, milliseconds: 777}, + colour: ["orange"], + }); + + await writer.appendRow({ + name: "kiwi", + price: 4.2, + quantity: BigInt(15), + day: new Date("2008-11-26"), + date: new Date(TEST_VTIME.valueOf() + 8000), + finger: "XCVBN", + inter: {months: 60, days: 1, milliseconds: 99}, + stock: [ + {quantity: BigInt(42), warehouse: "f"}, + {quantity: BigInt(21), warehouse: "x"}, + ], + colour: ["green", "brown", "yellow"], + meta_json: {expected_ship_date: TEST_VTIME.valueOf()}, + }); + + await writer.appendRow({ + name: "banana", + price: 3.2, + day: new Date("2017-11-26"), + date: new Date(TEST_VTIME.valueOf() + 6000), + finger: "FNORD", + inter: {months: 1, days: 15, milliseconds: 888}, + colour: ["yellow"], + meta_json: {shape: "curved"}, + }); + + await writer.close(); + reader = await parquet.ParquetReader.openFile(TEST_FILE); + row = reader.metadata.row_groups[0]; + + bloomFilters = await reader.getBloomFiltersFor(["name", "quantity", "stock,warehouse"]); }); - await writer.appendRow({ - name: "kiwi", - price: 4.2, - quantity: BigInt(15), - day: new Date("2008-11-26"), - date: new Date(TEST_VTIME.valueOf() + 8000), - finger: "XCVBN", - inter: { months: 60, days: 1, milliseconds: 99 }, - stock: [ - { quantity: BigInt(42), warehouse: "f" }, - { quantity: BigInt(21), warehouse: "x" }, - ], - colour: ["green", "brown", "yellow"], - meta_json: { expected_ship_date: TEST_VTIME.valueOf() }, + it('contains name and quantity filter', () => { + const columnsFilterNames = Object.keys(bloomFilters); + assert.deepEqual(columnsFilterNames, ['name', 'quantity', 'stock,warehouse']); }); - await writer.appendRow({ - name: "banana", - price: 3.2, - day: new Date("2017-11-26"), - date: new Date(TEST_VTIME.valueOf() + 6000), - finger: "FNORD", - inter: { months: 1, days: 15, milliseconds: 888 }, - colour: ["yellow"], - meta_json: { shape: "curved" }, + it("writes bloom filters for column: name", async function () { + const splitBlockBloomFilter = bloomFilters.name[0].sbbf; + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from("apples")), + "apples is included in name filter" + ); + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from("oranges")), + "oranges is included in name filter" + ); + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from("kiwi")), + "kiwi is included" + ); + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from("banana")), + "banana is included in name filter" + ); + assert.isFalse( + await splitBlockBloomFilter.check(Buffer.from("taco")), + "taco is NOT included in name filter" + ); }); - await writer.close(); - reader = await parquet.ParquetReader.openFile("fruits-bloomfilter.parquet"); - row = reader.metadata.row_groups[0]; + it("writes bloom filters for column: quantity", async function () { + const splitBlockBloomFilter = bloomFilters.quantity[0].sbbf; + assert.isTrue( + await splitBlockBloomFilter.check(BigInt(10)), + "10n is included in quantity filter" + ); + assert.isTrue( + await splitBlockBloomFilter.check(BigInt(15)), + "15n is included in quantity filter" + ); + assert.isFalse( + await splitBlockBloomFilter.check(BigInt(100)), + "100n is NOT included in quantity filter" + ); + }); - bloomFilters = await reader.getBloomFiltersFor(["name", "quantity"]); + it('writes bloom filters for stock,warehouse', async () => { + const splitBlockBloomFilter = bloomFilters['stock,warehouse'][0].sbbf; + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from('x')), + "x should be in the warehouse filter" + ); + assert.isTrue( + await splitBlockBloomFilter.check(Buffer.from('f')), + "f should be in the warehouse filter" + ); + assert.isFalse( + await splitBlockBloomFilter.check(Buffer.from('foo')), + "foo should not be in the warehouse filter" + ); + }) }); + describe("a simple schema with a nested list", () => { + const nestedListSchema = new parquet.ParquetSchema({ + name: {type: "UTF8"}, + querystring: { + type: "LIST", + fields: { + list: { + repeated: true, + fields: { + element: { + fields: { + key: {type: "UTF8"}, + value: {type: "UTF8"} + } + } + } + } + } + } + }); - it('contains name and quantity filter', () => { - const columnsFilterNames = Object.keys(bloomFilters); - assert.deepEqual(columnsFilterNames, ['name', 'quantity']); - }); + it("can be written, read and checked", async () => { + const file = "/tmp/issue-98.parquet"; + const nestedListFilterColumn = "querystring,list,element,key"; + const writer = await parquet.ParquetWriter.openFile(nestedListSchema, file, { + bloomFilters: [ + {column: "name"}, + {column: nestedListFilterColumn}, + ], + }); - it("writes bloom filters for column: name", async function () { - const splitBlockBloomFilter = bloomFilters.name[0].sbbf; - assert.isTrue( - await splitBlockBloomFilter.check(Buffer.from("apples")), - "apples is included in name filter" - ); - assert.isTrue( - await splitBlockBloomFilter.check(Buffer.from("oranges")), - "oranges is included in name filter" - ); - assert.isTrue( - await splitBlockBloomFilter.check(Buffer.from("kiwi")), - "kiwi is included" - ); - assert.isTrue( - await splitBlockBloomFilter.check(Buffer.from("banana")), - "banana is included in name filter" - ); - assert.isFalse( - await splitBlockBloomFilter.check(Buffer.from("taco")), - "taco is NOT included in name filter" - ); - }); + await writer.appendRow({ + name: "myquery", + querystring: { + list: [ + {element: {key: "foo", value: "bar"}}, + {element: {key: "foo2", value: "bar2"}}, + {element: {key: "foo3", value: "bar3"}} + ] + } + }); + await writer.close(); + const reader = await parquet.ParquetReader.openFile(file); + const bloomFilters: Record> = await reader.getBloomFiltersFor(["name", "querystring,list,element,key"]); + assert.isDefined(bloomFilters[nestedListFilterColumn]); - it("writes bloom filters for column: quantity", async function () { - const splitBlockBloomFilter = bloomFilters.quantity[0].sbbf; - assert.isTrue( - await splitBlockBloomFilter.check(BigInt(10)), - "10n is included in quantity filter" - ); - assert.isTrue( - await splitBlockBloomFilter.check(BigInt(15)), - "15n is included in quantity filter" - ); - assert.isFalse( - await splitBlockBloomFilter.check(BigInt(100)), - "100n is NOT included in quantity filter" - ); - }); + const bfForNestedList = bloomFilters[nestedListFilterColumn][0].sbbf; + assert.equal(bfForNestedList.getNumFilterBlocks(), 935); + + const foo2IsThere = await bfForNestedList.check("foo2"); + assert(foo2IsThere); + }) + }) });