Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reading Parquet maps that lack a values field #6730

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4065,4 +4065,72 @@ mod tests {
}
}
}

#[test]
fn test_map_no_value() {
let schema = "
message spark_schema {
REQUIRED group my_map (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
}
}
REQUIRED group my_list (LIST) {
REPEATED group list {
REQUIRED INT32 element;
}
}
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());

// Write Parquet file to buffer
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column my_map.key_value.key
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Write column my_list.list.element
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");

// Read Parquet file from buffer
let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
.unwrap()
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 2);
// map and list columns should now be equivalent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means the parquet reader will read MAP without values as an ListArray -- I think this seems like the intention so 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because an arrow map must have non-null values IIUC.

let c0 = out.column(0).as_list::<i32>();
let c1 = out.column(1).as_list::<i32>();
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
}
7 changes: 6 additions & 1 deletion parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,13 @@ impl Visitor {
return Err(arrow_err!("Child of map field must be repeated"));
}

// According to the specification the values are optional (#1642).
// In this case, return the keys as a list.
if map_key_value.get_fields().len() == 1 {
return self.visit_list(map_type, context);
}

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(arrow_err!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
Expand Down
132 changes: 107 additions & 25 deletions parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ impl TreeBuilder {
Repetition::REPEATED,
"Invalid map type: {field:?}"
);
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
// Parquet spec allows no value. In that case treat as a list. #1642
if key_value_type.get_fields().len() != 1 {
// If not a list, then there can only be 2 fields in the struct
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
}

path.push(String::from(key_value_type.name()));

Expand All @@ -239,25 +243,35 @@ impl TreeBuilder {
row_group_reader,
)?;

let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
if key_value_type.get_fields().len() == 1 {
path.pop();
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
)
} else {
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;

path.pop();
path.pop();

Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
}
}
// A repeated field that is neither contained by a `LIST`- or
// `MAP`-annotated group nor annotated by `LIST` or `MAP`
Expand Down Expand Up @@ -813,7 +827,7 @@ impl Iterator for ReaderIter {
mod tests {
use super::*;

use crate::data_type::Int64Type;
use crate::data_type::{Int32Type, Int64Type};
use crate::file::reader::SerializedFileReader;
use crate::file::writer::SerializedFileWriter;
use crate::record::api::RowAccessor;
Expand Down Expand Up @@ -1459,8 +1473,7 @@ mod tests {
}

#[test]
#[should_panic(expected = "Invalid map type")]
fn test_file_reader_rows_invalid_map_type() {
fn test_file_reader_rows_nested_map_type() {
let schema = "
message spark_schema {
OPTIONAL group a (MAP) {
Expand Down Expand Up @@ -1823,6 +1836,75 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_map_no_value() {
let schema = "
message spark_schema {
REQUIRED group my_map (MAP) {
REPEATED group key_value {
REQUIRED INT32 key;
}
}
REQUIRED group my_list (LIST) {
REPEATED group list {
REQUIRED INT32 element;
}
}
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());

// Write Parquet file to buffer
//let mut buffer = std::fs::File::create("/Users/seidl/map_no_value.pq").unwrap();
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();

// Write column my_map.key_value.key
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Write column my_list.list.element
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();

// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");

// Read Parquet file from buffer
let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();

// the two columns should be equivalent lists by this point
for row in rows {
let cols = row.into_columns();
assert_eq!(cols[0].1, cols[1].1);
}
}

fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
Expand Down
Loading