diff --git a/kernel/src/table_changes/data_read.rs b/kernel/src/table_changes/data_read.rs index 83b853d26..83af67961 100644 --- a/kernel/src/table_changes/data_read.rs +++ b/kernel/src/table_changes/data_read.rs @@ -1,8 +1,9 @@ use std::collections::HashMap; -use std::iter::once; +use std::iter::{empty, once}; use std::sync::Arc; use itertools::{Either, Itertools}; +use roaring::RoaringTreemap; use url::Url; use crate::actions::deletion_vector::{split_vector, treemap_to_bools}; @@ -14,16 +15,15 @@ use crate::{DeltaResult, Engine, Error, Expression, FileMeta}; use super::scan::GlobalScanState; use super::scan_file::{ScanFile, ScanFileType}; -use super::CDF_FIELDS; type ResolvedScanFile = (ScanFile, Option>); -pub(crate) struct DataReader { +pub(crate) struct ScanFileReader { global_scan_state: GlobalScanState, scan_file: ScanFile, selection_vector: Option>, } -impl DataReader { +impl ScanFileReader { pub(crate) fn new( global_scan_state: GlobalScanState, scan_file: ScanFile, @@ -127,7 +127,7 @@ impl DataReader { self.global_scan_state.read_schema.clone() } } - pub fn into_data( + pub(crate) fn into_data( mut self, engine: &dyn Engine, ) -> DeltaResult>> { @@ -165,8 +165,7 @@ impl DataReader { // trying to return a captured variable. We're going to reassign `selection_vector` // to `rest` in a moment anyway let mut sv = selection_vector.take(); - let rest = split_vector(sv.as_mut(), len, None); // TODO: Fill in len and extend - // value + let rest = split_vector(sv.as_mut(), len, None); let result = ScanResult { raw_data: logical, raw_mask: sv, @@ -185,6 +184,23 @@ pub(crate) fn resolve_scan_file_dv( let remove_dv = remove_dv.as_ref().and_then(|map| map.get(&scan_file.path)); match (&scan_file.tpe, remove_dv) { (ScanFileType::Add, Some(rm_dv)) => { + // Helper function to convert a treemap to (scan_file, selection_vector) pair. + // This returns an empty iterator if nothing is selected. + fn treemap_to_iter( + treemap: RoaringTreemap, + mut scan_file: ScanFile, + tpe: ScanFileType, + ) -> impl Iterator>)> { + if treemap.is_empty() { + Either::Left(empty()) + } else { + let added_dv = treemap_to_bools(treemap); + scan_file.tpe = tpe; + Either::Right(once((scan_file, Some(added_dv)))) + } + } + + // Retrieve the deletion vector from the add action and remove action let add_dv = scan_file .dv_info .get_treemap(engine, table_root)? @@ -193,15 +209,8 @@ pub(crate) fn resolve_scan_file_dv( .get_treemap(engine, table_root)? .unwrap_or(Default::default()); - let added_dv = treemap_to_bools(&rm_dv - &add_dv); - let added = once((scan_file.clone(), Some(added_dv))); - - let removed_dv = treemap_to_bools(add_dv - rm_dv); - let rm_scanfile = ScanFile { - tpe: ScanFileType::Remove, - ..scan_file - }; - let removed = once((rm_scanfile, Some(removed_dv))); + let added = treemap_to_iter(&rm_dv - &add_dv, scan_file.clone(), ScanFileType::Add); + let removed = treemap_to_iter(add_dv - rm_dv, scan_file, ScanFileType::Remove); Ok(Either::Right(added.chain(removed))) } @@ -215,102 +224,3 @@ pub(crate) fn resolve_scan_file_dv( (ScanFileType::Cdc, None) => Ok(Either::Left(once((scan_file, None)))), } } - -// -//// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the -//// scan, and then reuse them for each batch transform -//#[allow(clippy::too_many_arguments)] // TEMPORARY -//pub(crate) fn transform_to_logical_internal( -// engine: &dyn Engine, -// data: Box, -// global_state: &GlobalScanState, -// partition_values: &std::collections::HashMap, -// all_fields: &[ColumnType], -// _have_partition_cols: bool, -// generated_columns: &HashMap, -// read_schema: Arc, -//) -> DeltaResult> { -// // need to add back partition cols and/or fix-up mapped columns -// let all_fields: Vec = all_fields -// .iter() -// .map(|field| match field { -// ColumnType::Partition(field_idx) => { -// let field = global_state.logical_schema.fields.get_index(*field_idx); -// let Some((_, field)) = field else { -// return Err(Error::generic( -// "logical schema did not contain expected field, can't transform data", -// )); -// }; -// let name = field.physical_name(global_state.column_mapping_mode)?; -// let value_expression = -// parse_partition_value(partition_values.get(name), field.data_type())?; -// Ok(value_expression.into()) -// } -// ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()), -// ColumnType::InsertedColumn(field_idx) => { -// let field = global_state.logical_schema.fields.get_index(*field_idx); -// let Some((_, field)) = field else { -// return Err(Error::generic( -// "logical schema did not contain expected field, can't transform data", -// )); -// }; -// let Some(expr) = generated_columns.get(field.name()) else { -// return Err(Error::generic( -// "Got unexpected inserted field , can't transform data", -// )); -// }; -// Ok(expr.clone()) -// } -// }) -// .try_collect()?; -// let read_expression = Expression::Struct(all_fields); -// -// let result = engine -// .get_expression_handler() -// .get_evaluator( -// read_schema, -// read_expression, -// global_state.logical_schema.clone().into(), -// ) -// .evaluate(data.as_ref())?; -// Ok(result) -//} -// -//pub(crate) fn get_generated_columns( -// timestamp: i64, -// tpe: ScanFileType, -// commit_version: i64, -//) -> Result>, crate::Error> { -// // Both in-commit timestamps and file metadata are in milliseconds -// // -// // See: -// // [`FileMeta`] -// // [In-Commit Timestamps] : https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-in-commit-timestampsa -// let timestamp = Scalar::timestamp_from_millis(timestamp)?; -// let expressions = match tpe { -// ScanFileType::Cdc => [ -// column_expr!("_change_type"), -// Expression::literal(commit_version), -// timestamp.into(), -// ], -// ScanFileType::Add => [ -// "insert".into(), -// Expression::literal(commit_version), -// timestamp.into(), -// ], -// -// ScanFileType::Remove => [ -// "delete".into(), -// Expression::literal(commit_version), -// timestamp.into(), -// ], -// }; -// let generated_columns: Arc> = Arc::new( -// CDF_GENERATED_COLUMNS -// .iter() -// .map(ToString::to_string) -// .zip(expressions) -// .collect(), -// ); -// Ok(generated_columns) -//} diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 386b73f2c..60c3c6a73 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -1,21 +1,16 @@ -use std::collections::HashMap; -use std::iter::{self, empty, once}; use std::sync::Arc; use itertools::Itertools; -use serde::{Deserialize, Serialize}; use tracing::debug; -use crate::scan::state::DvInfo; use crate::scan::{ColumnType, ScanResult}; use crate::schema::{SchemaRef, StructType}; -use crate::table_changes::scan_file::ScanFileType; use crate::table_features::ColumnMappingMode; -use crate::{DeltaResult, Engine, Error, ExpressionRef}; +use crate::{DeltaResult, Engine, ExpressionRef}; -use super::data_read::{resolve_scan_file_dv, DataReader}; +use super::data_read::{resolve_scan_file_dv, ScanFileReader}; use super::log_replay::{table_changes_action_iter, TableChangesScanData}; -use super::scan_file::{scan_data_to_scan_file, ScanFile}; +use super::scan_file::scan_data_to_scan_file; use super::{TableChanges, CDF_FIELDS}; /// The result of building a [`TableChanges`] scan over a table. This can be used to get a change @@ -239,7 +234,7 @@ impl TableChangesScan { let (scan_file, selection_vector) = x?; let engine = engine.clone(); let reader = - DataReader::new(global_scan_state.clone(), scan_file, selection_vector); + ScanFileReader::new(global_scan_state.clone(), scan_file, selection_vector); reader.into_data(engine.as_ref()) }) .flatten_ok()