Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet encryption #39

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ edition = "2018"

[dependencies]
# update note: pin `parquet-format` to specific version until it does not break at minor
# version, see ARROW-11187.
parquet-format = "~2.6.1"
# version, see ARROW-11187. update: since this comment, it is now pinned at ~4.0.0 and
# upstream arrow-rs parquet vendors it
parquet-format = "~4.0.0"
aes-gcm = "0.10.3"
byteorder = "1"
thrift = "0.13"
snap = { version = "1.0", optional = true }
Expand All @@ -45,7 +47,9 @@ arrow = { path = "../arrow", version = "5.0.0", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
serde = { version = "1.0.115", features = ["derive"] }
rand = "0.8"
sha3 = "0.10.8"

[dev-dependencies]
criterion = "0.3"
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ pub enum Encoding {
///
/// The ids are encoded using the RLE encoding.
RLE_DICTIONARY,

/// Encoding for floating-point data.
///
/// K byte-streams are created where K is the size in bytes of the data type.
/// The individual bytes of an FP value are scattered to the corresponding stream and
/// the streams are concatenated.
/// This itself does not reduce the size of the data but can lead to better compression
/// afterwards.
BYTE_STREAM_SPLIT,
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -701,6 +710,7 @@ impl convert::From<parquet::Encoding> for Encoding {
parquet::Encoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY,
parquet::Encoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY,
parquet::Encoding::RleDictionary => Encoding::RLE_DICTIONARY,
parquet::Encoding::ByteStreamSplit => Encoding::BYTE_STREAM_SPLIT,
}
}
}
Expand All @@ -716,6 +726,7 @@ impl convert::From<Encoding> for parquet::Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DeltaLengthByteArray,
Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DeltaByteArray,
Encoding::RLE_DICTIONARY => parquet::Encoding::RleDictionary,
Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::ByteStreamSplit,
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ impl CompressedPage {
self.uncompressed_size
}

/// Returns compressed size in bytes.
/// Returns compressed size (but not encrypted size) in bytes.
///
/// Note that it is assumed that buffer is compressed, but it may not be. In this
/// case compressed size will be equal to uncompressed size.
pub fn compressed_size(&self) -> usize {
/// Note that it is assumed that buffer is compressed, but it may not be. In this case
/// compressed size will be equal to uncompressed size.
///
/// Other so-called "(total_)?compressed_size" fields include encryption overhead, when
/// applicable, which this does not.
pub fn compressed_unencrypted_size(&self) -> usize {
self.compressed_page.buffer().len()
}

Expand Down Expand Up @@ -206,7 +209,7 @@ pub trait PageWriter {
///
/// This method is called for every compressed page we write into underlying buffer,
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec>;

/// Writes column chunk metadata into the output stream/sink.
///
Expand Down Expand Up @@ -299,7 +302,7 @@ mod tests {

assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.compressed_unencrypted_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
Expand Down
67 changes: 54 additions & 13 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub struct ColumnWriterImpl<T: DataType> {
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter>,
page_ordinal: usize,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
encoder: Box<dyn Encoder<T>>,
Expand All @@ -185,6 +186,8 @@ pub struct ColumnWriterImpl<T: DataType> {
total_bytes_written: u64,
total_rows_written: u64,
total_uncompressed_size: u64,
// Includes encryption overhead -- the thrift definition field includes encryption overhead, and
// we keep its name here.
total_compressed_size: u64,
total_num_values: u64,
dictionary_page_offset: Option<u64>,
Expand Down Expand Up @@ -231,10 +234,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
)
.unwrap();

// We start counting pages from zero.
let page_ordinal: usize = 0;

Self {
descr,
props,
page_writer,
page_ordinal,
has_dictionary,
dict_encoder,
encoder: fallback_encoder,
Expand Down Expand Up @@ -824,7 +831,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Writes compressed data page into underlying sink and updates global metrics.
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
let page_ordinal = self.page_ordinal;
let aad_page_ordinal: Option<u16> = Some(page_ordinal as u16);
self.page_ordinal += 1;
let page_spec = self.page_writer.write_page(page, aad_page_ordinal)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -858,7 +868,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
CompressedPage::new(dict_page, uncompressed_size)
};

let page_spec = self.page_writer.write_page(compressed_page)?;
let page_spec = self.page_writer.write_page(compressed_page, None)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -1026,10 +1036,10 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
mod tests {
use rand::distributions::uniform::SampleUniform;

use crate::column::{
use crate::{column::{
page::PageReader,
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
};
}, file::encryption::USUAL_ENCRYPTION_OVERHEAD};
use crate::file::{
properties::WriterProperties, reader::SerializedPageReader,
writer::SerializedPageWriter,
Expand Down Expand Up @@ -1642,21 +1652,24 @@ mod tests {
);
}

const TEST_ROW_GROUP_ORDINAL: i16 = 1234;
const TEST_COLUMN_ORDINAL: u16 = 135;

#[test]
fn test_column_writer_add_data_pages_with_dict() {
// ARROW-5129: Test verifies that we add data page in case of dictionary encoding
// and no fallback occurred so far.
let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
.set_write_batch_size(3) // write 3 values at a time
.build(),
);
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props.clone());
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _) = writer.close().unwrap();

Expand All @@ -1665,8 +1678,12 @@ mod tests {
let mut page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
data.len() as i64,
Compression::UNCOMPRESSED,
props.dictionary_enabled(&ColumnPath::from("col")),
Int32Type::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1803,7 +1820,7 @@ mod tests {
) {
let file = get_temp_file(file_name, &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));

let max_def_level = match def_levels {
Some(buf) => *buf.iter().max().unwrap_or(&0i16),
Expand All @@ -1823,11 +1840,12 @@ mod tests {
max_batch_size = cmp::max(max_batch_size, levels.len());
}

let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(
page_writer,
max_def_level,
max_rep_level,
Arc::new(props),
props.clone(),
);

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
Expand All @@ -1838,8 +1856,12 @@ mod tests {
let page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
column_metadata.num_values(),
column_metadata.compression(),
props.dictionary_enabled(&ColumnPath::from("col")),
T::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1977,20 +1999,39 @@ mod tests {

/// Returns page writer that collects pages without serializing them.
fn get_test_page_writer() -> Box<dyn PageWriter> {
Box::new(TestPageWriter {})
Box::new(TestPageWriter {simulate_encrypted: false, last_page_ordinal: None})
}

struct TestPageWriter {}
struct TestPageWriter {
/// Always false, currently -- enabling would just affect return values that get fed into
/// test assertions.
simulate_encrypted: bool,
last_page_ordinal: Option<u16>,
}

impl PageWriter for TestPageWriter {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec> {
// We're a bit loose in this assertion -- the caller could write or not write a dictionary page.
match aad_page_ordinal {
Some(n) if n != 0 => {
assert_eq!(self.last_page_ordinal, Some(n - 1));
}
_ => {
assert_eq!(None, self.last_page_ordinal);
}
}
self.last_page_ordinal = aad_page_ordinal;

// Note, the normal PageWriteSpec result would include PageMetaData overhead, and these
// values are thus not perfectly faked, but the only thing that looks at them are test
// assertions.
let mut res = PageWriteSpec::new();
res.page_type = page.page_type();
res.uncompressed_size = page.uncompressed_size();
res.compressed_size = page.compressed_size();
res.compressed_size = self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.compressed_unencrypted_size();
res.num_values = page.num_values();
res.offset = 0;
res.bytes_written = page.data().len() as u64;
res.bytes_written = (self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.data().len()) as u64;
Ok(res)
}

Expand Down
Loading
Loading