Skip to content

Commit

Permalink
fluent api
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jul 4, 2024
1 parent 85c39c6 commit afa975d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 72 deletions.
116 changes: 46 additions & 70 deletions parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,44 @@ pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

/// Encodes the [`ParquetMetaData`] to bytes.
/// The format of the returned bytes is the Thift compact binary protocol, as
/// specified by the [Parquet Spec].
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn encode_metadata(metadata: &ParquetMetaData) -> Result<Vec<u8>> {
let column_orders = encode_column_orders(metadata.file_metadata().column_orders());
let schema = types::to_thrift(&metadata.file_metadata().schema().clone())?;

let t_file_metadata = TFileMetaData {
version: metadata.file_metadata().version(),
num_rows: metadata.file_metadata().num_rows(),
created_by: metadata.file_metadata().created_by().map(str::to_string),
key_value_metadata: metadata.file_metadata().key_value_metadata().cloned(),
schema,
row_groups: metadata
.row_groups()
.iter()
.map(|rg| rg.to_thrift())
.collect(),
column_orders,
encryption_algorithm: None,
footer_signing_key_metadata: None,
};
let mut buf = Vec::new();
let mut protocol = TCompactOutputProtocol::new(&mut buf);
t_file_metadata
.write_to_out_protocol(&mut protocol)
.map_err(|e| ParquetError::General(format!("Could not serialize metadata: {e}")))?;
Ok(buf)
pub struct ParquetMetadataEncoder {}

impl ParquetMetadataEncoder {
/// Creates a new ParquetMetadataEncoder.
pub fn new() -> Self {
Self {}
}

/// Encodes the [`ParquetMetaData`] to bytes.
/// The format of the returned bytes is the Thift compact binary protocol, as
/// specified by the [Parquet Spec].
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn encode_metadata<W: std::io::Write>(&self, metadata: &ParquetMetaData, out: W) -> Result<()> {
let column_orders = encode_column_orders(metadata.file_metadata().column_orders());
let schema = types::to_thrift(&metadata.file_metadata().schema().clone())?;

let t_file_metadata = TFileMetaData {
version: metadata.file_metadata().version(),
num_rows: metadata.file_metadata().num_rows(),
created_by: metadata.file_metadata().created_by().map(str::to_string),
key_value_metadata: metadata.file_metadata().key_value_metadata().cloned(),
schema,
row_groups: metadata
.row_groups()
.iter()
.map(|rg| rg.to_thrift())
.collect(),
column_orders,
encryption_algorithm: None,
footer_signing_key_metadata: None,
};
let mut protocol = TCompactOutputProtocol::new(out);
t_file_metadata
.write_to_out_protocol(&mut protocol)
.map_err(|e| ParquetError::General(format!("Could not serialize metadata: {e}")))?;
Ok(())
}
}

/// Decodes the Parquet footer returning the metadata length in bytes
Expand Down Expand Up @@ -333,48 +341,16 @@ mod tests {
}
let data = Bytes::from(buf);
let options = ArrowReaderOptions::new()
// Load the page index when reading metadata to cache
// so it is available to interpret row selections
.with_page_index(true);
// TODO: implement encoding of page index
.with_page_index(false);
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data, options).unwrap();
let metadata = reader.metadata().clone();

let encoded = encode_metadata(&metadata).unwrap();
let decoded = decode_metadata(&encoded).unwrap();
assert_eq!(
metadata.file_metadata().version(),
decoded.file_metadata().version()
);
assert_eq!(
metadata.file_metadata().num_rows(),
decoded.file_metadata().num_rows()
);
assert_eq!(
metadata.file_metadata().created_by(),
decoded.file_metadata().created_by()
);
assert_eq!(
metadata.file_metadata().key_value_metadata(),
decoded.file_metadata().key_value_metadata()
);
assert_eq!(
metadata.file_metadata().schema(),
decoded.file_metadata().schema()
);
assert_eq!(
metadata.file_metadata().column_orders(),
decoded.file_metadata().column_orders()
);
assert_eq!(metadata.row_groups().len(), decoded.row_groups().len());
for (a, b) in metadata
.row_groups()
.iter()
.zip(decoded.row_groups().iter())
{
assert_eq!(a, b);
}
// TODO: add encoding and decoding of column and offset indexes (aka page indexes)
// assert_eq!(metadata.column_index(), decoded.column_index());
// assert_eq!(metadata.offset_index(), decoded.offset_index());
let encoder = ParquetMetadataEncoder::new();
let mut encoded = vec![];
encoder.encode_metadata(&metadata, &mut encoded).unwrap();
let decoded = Arc::new(decode_metadata(&encoded).unwrap());

assert_eq!(metadata, decoded);
}
}
4 changes: 2 additions & 2 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub type ParquetOffsetIndex = Vec<Vec<Vec<PageLocation>>>;
///
/// [`parquet.thrift`]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
/// [`parse_metadata`]: crate::file::footer::parse_metadata
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetMetaData {
/// File level metadata
file_metadata: FileMetaData,
Expand Down Expand Up @@ -222,7 +222,7 @@ pub type FileMetaDataPtr = Arc<FileMetaData>;
/// File level metadata for a Parquet file.
///
/// Includes the version of the file, metadata, number of rows, schema, and column orders
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct FileMetaData {
version: i32,
num_rows: i64,
Expand Down

0 comments on commit afa975d

Please sign in to comment.