diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index e5cdb1b8..fc60cd9a 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -15,7 +15,7 @@ use crate::utils::require; use crate::{DeltaResult, EngineData, Error, RowVisitor}; // The type of action associated with a [`ScanFile`] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub(crate) enum ScanFileType { Add, Remove, @@ -23,14 +23,12 @@ pub(crate) enum ScanFileType { } /// Represents all the metadata needed to read a Change Data Feed. It has the following fields: -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub(crate) struct ScanFile { /// The type of action this file belongs to. This may be one of add, remove, or cdc. pub tpe: ScanFileType, /// a `&str` which is the path to the file pub path: String, - /// an `i64` which is the size of the file - pub size: i64, /// a [`DvInfo`] struct, which allows getting the selection vector for this file pub dv_info: DvInfo, /// a `HashMap` which are partition values @@ -94,7 +92,7 @@ struct ScanFileVisitor<'a, T> { impl RowVisitor for ScanFileVisitor<'_, T> { fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { - let expected_getters = 21; + let expected_getters = 18; require!( getters.len() == expected_getters, Error::InternalError(format!( @@ -107,53 +105,47 @@ impl RowVisitor for ScanFileVisitor<'_, T> { // skip skipped rows continue; } - let timestamp = getters[19].get(row_index, "scanFile.timestamp")?; - let commit_version: i64 = getters[20].get(row_index, "scanFile.commit_version")?; + let timestamp = getters[16].get(row_index, "scanFile.timestamp")?; + let commit_version: i64 = getters[17].get(row_index, "scanFile.commit_version")?; let commit_version: u64 = commit_version.try_into().map_err(Error::generic)?; // Since path column is required, use it to detect presence of an Add action if let Some(path) = getters[0].get_opt(row_index, "scanFile.add.path")? { - let size = getters[1].get(row_index, "scanFile.add.size")?; - let deletion_vector = visit_deletion_vector_at(row_index, &getters[2..=6])?; + let deletion_vector = visit_deletion_vector_at(row_index, &getters[1..=5])?; let dv_info = DvInfo { deletion_vector }; let partition_values = - getters[7].get(row_index, "scanFile.add.fileConstantValues.partitionValues")?; + getters[6].get(row_index, "scanFile.add.fileConstantValues.partitionValues")?; let scan_file = ScanFile { tpe: ScanFileType::Add, path, - size, dv_info, partition_values, commit_version, timestamp, }; (self.callback)(&mut self.context, scan_file) - } else if let Some(path) = getters[8].get_opt(row_index, "scanFile.remove.path")? { - let size = getters[9].get(row_index, "scanFile.remove.size")?; - let deletion_vector = visit_deletion_vector_at(row_index, &getters[10..=14])?; + } else if let Some(path) = getters[7].get_opt(row_index, "scanFile.remove.path")? { + let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..=12])?; let dv_info = DvInfo { deletion_vector }; - let partition_values = getters[15].get( + let partition_values = getters[13].get( row_index, "scanFile.remove.fileConstantValues.partitionValues", )?; let scan_file = ScanFile { tpe: ScanFileType::Remove, path, - size, dv_info, partition_values, commit_version, timestamp, }; (self.callback)(&mut self.context, scan_file) - } else if let Some(path) = getters[16].get_opt(row_index, "scanFile.cdc.path")? { - let size = getters[17].get(row_index, "scanFile.cdc.size")?; - let partition_values = getters[18] + } else if let Some(path) = getters[14].get_opt(row_index, "scanFile.cdc.path")? { + let partition_values = getters[15] .get(row_index, "scanFile.cdc.fileConstantValues.partitionValues")?; let scan_file = ScanFile { tpe: ScanFileType::Cdc, path, - size, dv_info: DvInfo { deletion_vector: None, }, @@ -187,7 +179,6 @@ impl RowVisitor for ScanFileVisitor<'_, T> { /// { /// add: { /// path: string, -/// size: long, /// deletionVector: { /// storageType: string, /// pathOrInlineDv: string, @@ -201,7 +192,6 @@ impl RowVisitor for ScanFileVisitor<'_, T> { /// } /// remove: { /// path: string, -/// size: long, /// deletionVector: { /// storageType: string, /// pathOrInlineDv: string, @@ -215,7 +205,6 @@ impl RowVisitor for ScanFileVisitor<'_, T> { /// } /// cdc: { /// path: string, -/// size: long, /// fileConstantValues: { /// partitionValues: map /// } @@ -280,7 +269,6 @@ pub(crate) fn transform_to_scan_row_expression(timestamp: i64, commit_number: i6 ]), Expression::struct_from([ column_expr!("cdc.path"), - column_expr!("cdc.size"), Expression::struct_from([column_expr!("cdc.partitionValues")]), ]), timestamp.into(), @@ -289,4 +277,144 @@ pub(crate) fn transform_to_scan_row_expression(timestamp: i64, commit_number: i6 } #[cfg(test)] -mod tests {} +mod tests { + use std::collections::HashMap; + + use itertools::Itertools; + + use super::ScanFileType; + use super::{ + scan_row_schema, transform_to_scan_row_expression, visit_scan_files, ScanCallback, ScanFile, + }; + use crate::actions::deletion_vector::DeletionVectorDescriptor; + use crate::actions::{get_log_schema, Add, Cdc, Remove}; + use crate::engine::sync::SyncEngine; + use crate::log_segment::LogSegment; + use crate::scan::state::DvInfo; + use crate::utils::test_utils::MockTable; + use crate::{DeltaResult, Engine}; + + #[tokio::test] + async fn schema_transform_correct() { + let engine = SyncEngine::new(); + let mut mock_table = MockTable::new(); + + let add_dv = DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(), + offset: Some(1), + size_in_bytes: 36, + cardinality: 2, + }; + let add_partition_values = HashMap::from([("a".to_string(), "b".to_string())]); + let add = Add { + path: "fake_path_1".into(), + deletion_vector: Some(add_dv.clone()), + partition_values: add_partition_values, + ..Default::default() + }; + + let rm_dv = DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(), + offset: Some(1), + size_in_bytes: 38, + cardinality: 3, + }; + let rm_partition_values = Some(HashMap::from([("c".to_string(), "d".to_string())])); + + let remove = Remove { + path: "fake_path_2".into(), + deletion_vector: Some(rm_dv), + partition_values: rm_partition_values, + ..Default::default() + }; + + let cdc_partition_values = HashMap::from([("x".to_string(), "y".to_string())]); + let cdc = Cdc { + path: "fake_path_3".into(), + partition_values: cdc_partition_values, + ..Default::default() + }; + + mock_table + .commit(&[ + add.clone().into(), + remove.clone().into(), + cdc.clone().into(), + ]) + .await; + + let table_root = url::Url::from_directory_path(mock_table.table_root()).unwrap(); + let log_root = table_root.join("_delta_log/").unwrap(); + let log_segment = + LogSegment::for_table_changes(engine.get_file_system_client().as_ref(), log_root, 0, 0) + .unwrap(); + let commit = log_segment.ascending_commit_files[0].clone(); + + let actions = engine + .get_json_handler() + .read_json_files(&[commit.location.clone()], get_log_schema().clone(), None) + .unwrap(); + + let timestamp = 1234_i64; + let commit_version = 42_u64; + + let scan_file: Vec<_> = actions + .map_ok(|actions| { + engine + .get_expression_handler() + .get_evaluator( + get_log_schema().clone(), + transform_to_scan_row_expression(1234, 42), + scan_row_schema().into(), + ) + .evaluate(actions.as_ref()) + .unwrap() + }) + .map(|data| -> DeltaResult<_> { + let data = data?; + let selection_vector = vec![true; data.len()]; + let callback: ScanCallback> = + |context, scan_file| context.push(scan_file); + visit_scan_files(data.as_ref(), &selection_vector, vec![], callback) + }) + .flatten_ok() + .try_collect() + .unwrap(); + + let expected_scan_files = vec![ + ScanFile { + tpe: ScanFileType::Add, + path: add.path, + dv_info: DvInfo { + deletion_vector: add.deletion_vector, + }, + partition_values: add.partition_values, + commit_version, + timestamp, + }, + ScanFile { + tpe: ScanFileType::Remove, + path: remove.path, + dv_info: DvInfo { + deletion_vector: remove.deletion_vector, + }, + partition_values: remove.partition_values.unwrap(), + commit_version, + timestamp, + }, + ScanFile { + tpe: ScanFileType::Cdc, + path: cdc.path, + dv_info: DvInfo { + deletion_vector: None, + }, + partition_values: cdc.partition_values, + commit_version, + timestamp, + }, + ]; + assert_eq!(expected_scan_files, scan_files); + } +}