Skip to content

Commit

Permalink
Merge pull request #39 from cube-js/parquet-encryption
Browse files Browse the repository at this point in the history
Parquet encryption
  • Loading branch information
srh authored Sep 5, 2024
2 parents ba5455c + 681c30b commit d6081a5
Show file tree
Hide file tree
Showing 11 changed files with 774 additions and 84 deletions.
8 changes: 6 additions & 2 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ edition = "2018"

[dependencies]
# update note: pin `parquet-format` to specific version until it does not break at minor
# version, see ARROW-11187.
parquet-format = "~2.6.1"
# version, see ARROW-11187. update: since this comment, it is now pinned at ~4.0.0 and
# upstream arrow-rs parquet vendors it
parquet-format = "~4.0.0"
aes-gcm = "0.10.3"
byteorder = "1"
thrift = "0.13"
snap = { version = "1.0", optional = true }
Expand All @@ -45,7 +47,9 @@ arrow = { path = "../arrow", version = "5.0.0", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
serde = { version = "1.0.115", features = ["derive"] }
rand = "0.8"
sha3 = "0.10.8"

[dev-dependencies]
criterion = "0.3"
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ pub enum Encoding {
///
/// The ids are encoded using the RLE encoding.
RLE_DICTIONARY,

/// Encoding for floating-point data.
///
/// K byte-streams are created where K is the size in bytes of the data type.
/// The individual bytes of an FP value are scattered to the corresponding stream and
/// the streams are concatenated.
/// This itself does not reduce the size of the data but can lead to better compression
/// afterwards.
BYTE_STREAM_SPLIT,
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -701,6 +710,7 @@ impl convert::From<parquet::Encoding> for Encoding {
parquet::Encoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY,
parquet::Encoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY,
parquet::Encoding::RleDictionary => Encoding::RLE_DICTIONARY,
parquet::Encoding::ByteStreamSplit => Encoding::BYTE_STREAM_SPLIT,
}
}
}
Expand All @@ -716,6 +726,7 @@ impl convert::From<Encoding> for parquet::Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DeltaLengthByteArray,
Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DeltaByteArray,
Encoding::RLE_DICTIONARY => parquet::Encoding::RleDictionary,
Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::ByteStreamSplit,
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ impl CompressedPage {
self.uncompressed_size
}

/// Returns compressed size in bytes.
/// Returns compressed size (but not encrypted size) in bytes.
///
/// Note that it is assumed that buffer is compressed, but it may not be. In this
/// case compressed size will be equal to uncompressed size.
pub fn compressed_size(&self) -> usize {
/// Note that it is assumed that buffer is compressed, but it may not be. In this case
/// compressed size will be equal to uncompressed size.
///
/// Other so-called "(total_)?compressed_size" fields include encryption overhead, when
/// applicable, which this does not.
pub fn compressed_unencrypted_size(&self) -> usize {
self.compressed_page.buffer().len()
}

Expand Down Expand Up @@ -206,7 +209,7 @@ pub trait PageWriter {
///
/// This method is called for every compressed page we write into underlying buffer,
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec>;

/// Writes column chunk metadata into the output stream/sink.
///
Expand Down Expand Up @@ -299,7 +302,7 @@ mod tests {

assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.compressed_unencrypted_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
Expand Down
67 changes: 54 additions & 13 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub struct ColumnWriterImpl<T: DataType> {
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter>,
page_ordinal: usize,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
encoder: Box<dyn Encoder<T>>,
Expand All @@ -185,6 +186,8 @@ pub struct ColumnWriterImpl<T: DataType> {
total_bytes_written: u64,
total_rows_written: u64,
total_uncompressed_size: u64,
// Includes encryption overhead -- the thrift definition field includes encryption overhead, and
// we keep its name here.
total_compressed_size: u64,
total_num_values: u64,
dictionary_page_offset: Option<u64>,
Expand Down Expand Up @@ -231,10 +234,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
)
.unwrap();

// We start counting pages from zero.
let page_ordinal: usize = 0;

Self {
descr,
props,
page_writer,
page_ordinal,
has_dictionary,
dict_encoder,
encoder: fallback_encoder,
Expand Down Expand Up @@ -824,7 +831,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Writes compressed data page into underlying sink and updates global metrics.
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
let page_ordinal = self.page_ordinal;
let aad_page_ordinal: Option<u16> = Some(page_ordinal as u16);
self.page_ordinal += 1;
let page_spec = self.page_writer.write_page(page, aad_page_ordinal)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -858,7 +868,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
CompressedPage::new(dict_page, uncompressed_size)
};

let page_spec = self.page_writer.write_page(compressed_page)?;
let page_spec = self.page_writer.write_page(compressed_page, None)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -1026,10 +1036,10 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
mod tests {
use rand::distributions::uniform::SampleUniform;

use crate::column::{
use crate::{column::{
page::PageReader,
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
};
}, file::encryption::USUAL_ENCRYPTION_OVERHEAD};
use crate::file::{
properties::WriterProperties, reader::SerializedPageReader,
writer::SerializedPageWriter,
Expand Down Expand Up @@ -1642,21 +1652,24 @@ mod tests {
);
}

const TEST_ROW_GROUP_ORDINAL: i16 = 1234;
const TEST_COLUMN_ORDINAL: u16 = 135;

#[test]
fn test_column_writer_add_data_pages_with_dict() {
// ARROW-5129: Test verifies that we add data page in case of dictionary encoding
// and no fallback occurred so far.
let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
.set_write_batch_size(3) // write 3 values at a time
.build(),
);
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props.clone());
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _) = writer.close().unwrap();

Expand All @@ -1665,8 +1678,12 @@ mod tests {
let mut page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
data.len() as i64,
Compression::UNCOMPRESSED,
props.dictionary_enabled(&ColumnPath::from("col")),
Int32Type::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1803,7 +1820,7 @@ mod tests {
) {
let file = get_temp_file(file_name, &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));

let max_def_level = match def_levels {
Some(buf) => *buf.iter().max().unwrap_or(&0i16),
Expand All @@ -1823,11 +1840,12 @@ mod tests {
max_batch_size = cmp::max(max_batch_size, levels.len());
}

let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(
page_writer,
max_def_level,
max_rep_level,
Arc::new(props),
props.clone(),
);

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
Expand All @@ -1838,8 +1856,12 @@ mod tests {
let page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
column_metadata.num_values(),
column_metadata.compression(),
props.dictionary_enabled(&ColumnPath::from("col")),
T::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1977,20 +1999,39 @@ mod tests {

/// Returns page writer that collects pages without serializing them.
fn get_test_page_writer() -> Box<dyn PageWriter> {
Box::new(TestPageWriter {})
Box::new(TestPageWriter {simulate_encrypted: false, last_page_ordinal: None})
}

struct TestPageWriter {}
struct TestPageWriter {
/// Always false, currently -- enabling would just affect return values that get fed into
/// test assertions.
simulate_encrypted: bool,
last_page_ordinal: Option<u16>,
}

impl PageWriter for TestPageWriter {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec> {
// We're a bit loose in this assertion -- the caller could write or not write a dictionary page.
match aad_page_ordinal {
Some(n) if n != 0 => {
assert_eq!(self.last_page_ordinal, Some(n - 1));
}
_ => {
assert_eq!(None, self.last_page_ordinal);
}
}
self.last_page_ordinal = aad_page_ordinal;

// Note, the normal PageWriteSpec result would include PageMetaData overhead, and these
// values are thus not perfectly faked, but the only thing that looks at them are test
// assertions.
let mut res = PageWriteSpec::new();
res.page_type = page.page_type();
res.uncompressed_size = page.uncompressed_size();
res.compressed_size = page.compressed_size();
res.compressed_size = self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.compressed_unencrypted_size();
res.num_values = page.num_values();
res.offset = 0;
res.bytes_written = page.data().len() as u64;
res.bytes_written = (self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.data().len()) as u64;
Ok(res)
}

Expand Down
Loading

0 comments on commit d6081a5

Please sign in to comment.