From 9f254c41544352e96203cd9621755646073f33d7 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 5 Dec 2024 15:02:31 -0500 Subject: [PATCH] DF repartition by file sizes instead of number of files (#1572) --- vortex-datafusion/src/persistent/execution.rs | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/vortex-datafusion/src/persistent/execution.rs b/vortex-datafusion/src/persistent/execution.rs index 4530b6f26d..3b4ce3edf9 100644 --- a/vortex-datafusion/src/persistent/execution.rs +++ b/vortex-datafusion/src/persistent/execution.rs @@ -136,7 +136,7 @@ impl ExecutionPlan for VortexExec { ) -> DFResult>> { let file_groups = self.file_scan_config.file_groups.clone(); - let repartitioned_file_groups = repartition_by_count(file_groups, target_partitions); + let repartitioned_file_groups = repartition_by_size(file_groups, target_partitions); let mut new_plan = self.clone(); @@ -150,20 +150,43 @@ impl ExecutionPlan for VortexExec { } } -fn repartition_by_count( +fn repartition_by_size( file_groups: Vec>, desired_partitions: usize, ) -> Vec> { let all_files = file_groups.into_iter().concat(); + let total_file_count = all_files.len(); + let total_size = all_files.iter().map(|f| f.object_meta.size).sum::(); + let target_partition_size = total_size / (desired_partitions + 1); - let approx_files_per_partition = all_files.len().div_ceil(desired_partitions); - let mut repartitioned_file_groups = Vec::default(); + let mut partitions = Vec::with_capacity(desired_partitions); - for chunk in &all_files.into_iter().chunks(approx_files_per_partition) { - repartitioned_file_groups.push(chunk.collect::>()); + let mut curr_partition_size = 0; + let mut curr_partition = Vec::default(); + + for file in all_files.into_iter() { + curr_partition_size += file.object_meta.size; + curr_partition.push(file); + + if curr_partition_size > target_partition_size { + curr_partition_size = 0; + partitions.push(std::mem::take(&mut curr_partition)); + } } - repartitioned_file_groups + // if there's anything left, we shove it into existing partitions + for (idx, file) in curr_partition.into_iter().enumerate() { + let part_idx = idx % partitions.len(); + partitions[part_idx].push(file); + } + + assert_eq!( + partitions.len(), + usize::min(total_file_count, desired_partitions), + "The final number of partitions should be smallest between the total number of files and the desired partition count." + ); + + partitions } #[cfg(test)] @@ -172,16 +195,16 @@ mod tests { #[test] fn basic_repartition_test() { - let input_file_groups = vec![vec![ - PartitionedFile::new("a", 0), - PartitionedFile::new("b", 0), - PartitionedFile::new("c", 0), - PartitionedFile::new("d", 0), - PartitionedFile::new("e", 0), + let file_groups = vec![vec![ + PartitionedFile::new("a", 100), + PartitionedFile::new("b", 25), + PartitionedFile::new("c", 25), + PartitionedFile::new("d", 25), + PartitionedFile::new("e", 50), ]]; - let file_groups = repartition_by_count(input_file_groups, 2); + let output = repartition_by_size(file_groups, 2); - assert_eq!(file_groups.len(), 2); + assert_eq!(output.len(), 2); } }