diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 775ac825a2e4..eec7faf09d06 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false } twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } +sysinfo = { version = "0.30.12", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -114,12 +115,19 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd", "zstd-sys"] +# Display memory in example/write_parquet.rs +sysinfo = ["dep:sysinfo"] [[example]] name = "read_parquet" required-features = ["arrow"] path = "./examples/read_parquet.rs" +[[example]] +name = "write_parquet" +required-features = ["cli", "sysinfo"] +path = "./examples/write_parquet.rs" + [[example]] name = "async_read_parquet" required-features = ["arrow", "async"] diff --git a/parquet/examples/write_parquet.rs b/parquet/examples/write_parquet.rs new file mode 100644 index 000000000000..d2ef550df840 --- /dev/null +++ b/parquet/examples/write_parquet.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fs::File; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use arrow::array::{StructArray, UInt64Builder}; +use arrow::datatypes::DataType::UInt64; +use arrow::datatypes::{Field, Schema}; +use clap::{Parser, ValueEnum}; +use parquet::arrow::ArrowWriter as ParquetWriter; +use parquet::basic::Encoding; +use parquet::errors::Result; +use parquet::file::properties::{BloomFilterPosition, WriterProperties}; +use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System}; + +#[derive(ValueEnum, Clone)] +enum BloomFilterPositionArg { + End, + AfterRowGroup, +} + +#[derive(Parser)] +#[command(version)] +/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage. +struct Args { + #[arg(long, default_value_t = 1000)] + /// Number of batches to write + iterations: u64, + + #[arg(long, default_value_t = 1000000)] + /// Number of rows in each batch + batch: u64, + + #[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)] + /// Where to write Bloom Filters + bloom_filter_position: BloomFilterPositionArg, + + /// Path to the file to write + path: PathBuf, +} + +fn now() -> String { + chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string() +} + +fn mem(system: &mut System) -> String { + let pid = Pid::from(std::process::id() as usize); + system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory()); + system + .process(pid) + .map(|proc| format!("{}MB", proc.memory() / 1_000_000)) + .unwrap_or("N/A".to_string()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + + let bloom_filter_position = match args.bloom_filter_position { + BloomFilterPositionArg::End => BloomFilterPosition::End, + BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup, + }; + + let properties = WriterProperties::builder() + .set_column_bloom_filter_enabled("id".into(), true) + .set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED) + .set_bloom_filter_position(bloom_filter_position) + .build(); + let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)])); + // Create parquet file that will be read. + let file = File::create(args.path).unwrap(); + let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?; + + let mut system = + System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything())); + eprintln!( + "{} Writing {} batches of {} rows. RSS = {}", + now(), + args.iterations, + args.batch, + mem(&mut system) + ); + + let mut array_builder = UInt64Builder::new(); + let mut last_log = Instant::now(); + for i in 0..args.iterations { + if Instant::now() - last_log > Duration::new(10, 0) { + last_log = Instant::now(); + eprintln!( + "{} Iteration {}/{}. RSS = {}", + now(), + i + 1, + args.iterations, + mem(&mut system) + ); + } + for j in 0..args.batch { + array_builder.append_value(i + j); + } + writer.write( + &StructArray::new( + schema.fields().clone(), + vec![Arc::new(array_builder.finish())], + None, + ) + .into(), + )?; + } + writer.flush()?; + writer.close()?; + + eprintln!("{} Done. RSS = {}", now(), mem(&mut system)); + + Ok(()) +} diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 0beb93f80a5f..800751ff964b 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -43,7 +43,7 @@ use crate::column::writer::{ }; use crate::data_type::{ByteArray, FixedLenByteArray}; use crate::errors::{ParquetError, Result}; -use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; +use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; @@ -199,7 +199,7 @@ impl ArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.writer.flushed_row_groups() } @@ -1053,7 +1053,9 @@ mod tests { use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader::read_pages_locations; - use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion}; + use crate::file::properties::{ + BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, + }; use crate::file::serialized_reader::ReadOptionsBuilder; use crate::file::{ reader::{FileReader, SerializedFileReader}, @@ -1701,6 +1703,7 @@ mod tests { values: ArrayRef, schema: SchemaRef, bloom_filter: bool, + bloom_filter_position: BloomFilterPosition, } impl RoundTripOptions { @@ -1711,6 +1714,7 @@ mod tests { values, schema: Arc::new(schema), bloom_filter: false, + bloom_filter_position: BloomFilterPosition::AfterRowGroup, } } } @@ -1730,6 +1734,7 @@ mod tests { values, schema, bloom_filter, + bloom_filter_position, } = options; let encodings = match values.data_type() { @@ -1770,6 +1775,7 @@ mod tests { .set_dictionary_page_size_limit(dictionary_size.max(1)) .set_encoding(*encoding) .set_bloom_filter_enabled(bloom_filter) + .set_bloom_filter_position(bloom_filter_position) .build(); files.push(roundtrip_opts(&expected_batch, props)) @@ -2127,6 +2133,22 @@ mod tests { values_required::(many_vecs_iter); } + #[test] + fn i32_column_bloom_filter_at_end() { + let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); + let mut options = RoundTripOptions::new(array, false); + options.bloom_filter = true; + options.bloom_filter_position = BloomFilterPosition::End; + + let files = one_column_roundtrip_with_options(options); + check_bloom_filter( + files, + "col".to_string(), + (0..SMALL_SIZE as i32).collect(), + (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(), + ); + } + #[test] fn i32_column_bloom_filter() { let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32)); diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 0bedf1fcb731..28efbdc7c66e 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -54,7 +54,7 @@ use crate::{ arrow::arrow_writer::ArrowWriterOptions, arrow::ArrowWriter, errors::{ParquetError, Result}, - file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties}, + file::{metadata::RowGroupMetaData, properties::WriterProperties}, format::{FileMetaData, KeyValue}, }; use arrow_array::RecordBatch; @@ -172,7 +172,7 @@ impl AsyncArrowWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { self.sync_writer.flushed_row_groups() } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index fb8f798fd3ac..255fe1b7b253 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -333,6 +333,11 @@ impl RowGroupMetaData { &self.columns } + /// Returns mutable slice of column chunk metadata. + pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] { + &mut self.columns + } + /// Number of rows in this row group. pub fn num_rows(&self) -> i64 { self.num_rows diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 87d84cef80aa..7fc73bd56fe2 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; /// Default value for [`WriterProperties::max_row_group_size`] pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Default value for [`WriterProperties::bloom_filter_position`] +pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup; /// Default value for [`WriterProperties::created_by`] pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")); /// Default value for [`WriterProperties::column_index_truncate_length`] @@ -86,6 +88,24 @@ impl FromStr for WriterVersion { } } +/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should +/// write Bloom filters +/// +/// Basic constant, which is not part of the Thrift definition. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BloomFilterPosition { + /// Write Bloom Filters of each row group right after the row group + /// + /// This saves memory by writing it as soon as it is computed, at the cost + /// of data locality for readers + AfterRowGroup, + /// Write Bloom Filters at the end of the file + /// + /// This allows better data locality for readers, at the cost of memory usage + /// for writers. + End, +} + /// Reference counted writer properties. pub type WriterPropertiesPtr = Arc; @@ -130,6 +150,7 @@ pub struct WriterProperties { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, pub(crate) key_value_metadata: Option>, @@ -217,6 +238,11 @@ impl WriterProperties { self.max_row_group_size } + /// Returns maximum number of rows in a row group. + pub fn bloom_filter_position(&self) -> BloomFilterPosition { + self.bloom_filter_position + } + /// Returns configured writer version. pub fn writer_version(&self) -> WriterVersion { self.writer_version @@ -338,6 +364,7 @@ pub struct WriterPropertiesBuilder { data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, + bloom_filter_position: BloomFilterPosition, writer_version: WriterVersion, created_by: String, key_value_metadata: Option>, @@ -357,6 +384,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, + bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION, writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, @@ -376,6 +404,7 @@ impl WriterPropertiesBuilder { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, + bloom_filter_position: self.bloom_filter_position, writer_version: self.writer_version, created_by: self.created_by, key_value_metadata: self.key_value_metadata, @@ -487,6 +516,12 @@ impl WriterPropertiesBuilder { self } + /// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`) + pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self { + self.bloom_filter_position = value; + self + } + /// Sets "created by" property (defaults to `parquet-rs version `). pub fn set_created_by(mut self, value: String) -> Self { self.created_by = value; @@ -1052,6 +1087,7 @@ mod tests { ); assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE); assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE); + assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION); assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION); assert_eq!(props.created_by(), DEFAULT_CREATED_BY); assert_eq!(props.key_value_metadata(), None); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 7806384cdb52..eb633f31c477 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -34,8 +34,9 @@ use crate::column::{ }; use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; +use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr}; use crate::file::reader::ChunkReader; -use crate::file::{metadata::*, properties::WriterPropertiesPtr, PARQUET_MAGIC}; +use crate::file::{metadata::*, PARQUET_MAGIC}; use crate::schema::types::{self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, TypePtr}; /// A wrapper around a [`Write`] that keeps track of the number @@ -115,9 +116,10 @@ pub type OnCloseColumnChunk<'a> = Box Result<() /// - the row group metadata /// - the column index for each column chunk /// - the offset index for each column chunk -pub type OnCloseRowGroup<'a> = Box< +pub type OnCloseRowGroup<'a, W> = Box< dyn FnOnce( - RowGroupMetaDataPtr, + &'a mut TrackedWrite, + RowGroupMetaData, Vec>, Vec>, Vec>, @@ -143,7 +145,7 @@ pub struct SerializedFileWriter { schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - row_groups: Vec, + row_groups: Vec, bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, @@ -197,18 +199,29 @@ impl SerializedFileWriter { self.row_group_index += 1; + let bloom_filter_position = self.properties().bloom_filter_position(); let row_groups = &mut self.row_groups; let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = - |metadata, row_group_bloom_filter, row_group_column_index, row_group_offset_index| { - row_groups.push(metadata); - row_bloom_filters.push(row_group_bloom_filter); - row_column_indexes.push(row_group_column_index); - row_offset_indexes.push(row_group_offset_index); - Ok(()) + let on_close = move |buf, + mut metadata, + row_group_bloom_filter, + row_group_column_index, + row_group_offset_index| { + row_bloom_filters.push(row_group_bloom_filter); + row_column_indexes.push(row_group_column_index); + row_offset_indexes.push(row_group_offset_index); + // write bloom filters out immediately after the row group if requested + match bloom_filter_position { + BloomFilterPosition::AfterRowGroup => { + write_bloom_filters(buf, row_bloom_filters, &mut metadata)? + } + BloomFilterPosition::End => (), }; + row_groups.push(metadata); + Ok(()) + }; let row_group_writer = SerializedRowGroupWriter::new( self.descr.clone(), @@ -221,7 +234,7 @@ impl SerializedFileWriter { } /// Returns metadata for any flushed row groups - pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] { + pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] { &self.row_groups } @@ -273,34 +286,6 @@ impl SerializedFileWriter { Ok(()) } - /// Serialize all the bloom filter to the file - fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { - // iter row group - // iter each column - // write bloom filter to the file - for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { - match &self.bloom_filters[row_group_idx][column_idx] { - Some(bloom_filter) => { - let start_offset = self.buf.bytes_written(); - bloom_filter.write(&mut self.buf)?; - let end_offset = self.buf.bytes_written(); - // set offset and index for bloom filter - let column_chunk_meta = column_chunk - .meta_data - .as_mut() - .expect("can't have bloom filter without column metadata"); - column_chunk_meta.bloom_filter_offset = Some(start_offset as i64); - column_chunk_meta.bloom_filter_length = - Some((end_offset - start_offset) as i32); - } - None => {} - } - } - } - Ok(()) - } - /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -331,6 +316,11 @@ impl SerializedFileWriter { self.finished = true; let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + // write out any remaining bloom filters after all row groups + for row_group in &mut self.row_groups { + write_bloom_filters(&mut self.buf, &mut self.bloom_filters, row_group)?; + } + let mut row_groups = self .row_groups .as_slice() @@ -338,7 +328,6 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); - self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -443,6 +432,40 @@ impl SerializedFileWriter { } } +/// Serialize all the bloom filters of the given row group to the given buffer, +/// and returns the updated row group metadata. +fn write_bloom_filters( + buf: &mut TrackedWrite, + bloom_filters: &mut [Vec>], + row_group: &mut RowGroupMetaData, +) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + + let row_group_idx: u16 = row_group + .ordinal() + .expect("Missing row group ordinal") + .try_into() + .expect("Negative row group ordinal"); + let row_group_idx = row_group_idx as usize; + for (column_idx, column_chunk) in row_group.columns_mut().iter_mut().enumerate() { + if let Some(bloom_filter) = bloom_filters[row_group_idx][column_idx].take() { + let start_offset = buf.bytes_written(); + bloom_filter.write(&mut *buf)?; + let end_offset = buf.bytes_written(); + // set offset and index for bloom filter + *column_chunk = column_chunk + .clone() + .into_builder() + .set_bloom_filter_offset(Some(start_offset as i64)) + .set_bloom_filter_length(Some((end_offset - start_offset) as i32)) + .build()?; + } + } + Ok(()) +} + /// Parquet row group writer API. /// Provides methods to access column writers in an iterator-like fashion, order is /// guaranteed to match the order of schema leaves (column descriptors). @@ -468,7 +491,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { offset_indexes: Vec>, row_group_index: i16, file_offset: i64, - on_close: Option>, + on_close: Option>, } impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { @@ -485,7 +508,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, row_group_index: i16, - on_close: Option>, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); let file_offset = buf.bytes_written() as i64; @@ -669,12 +692,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_file_offset(self.file_offset) .build()?; - let metadata = Arc::new(row_group_metadata); - self.row_group_metadata = Some(metadata.clone()); + self.row_group_metadata = Some(Arc::new(row_group_metadata.clone())); if let Some(on_close) = self.on_close.take() { on_close( - metadata, + self.buf, + row_group_metadata, self.bloom_filters, self.column_indexes, self.offset_indexes, @@ -1446,7 +1469,7 @@ mod tests { assert_eq!(flushed.len(), idx + 1); assert_eq!(Some(idx as i16), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); - assert_eq!(flushed[idx].as_ref(), last_group.as_ref()); + assert_eq!(&flushed[idx], last_group.as_ref()); } let file_metadata = file_writer.close().unwrap();