Skip to content

Commit

Permalink
add scan file visitor test
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 29, 2024
1 parent 8c85361 commit e384be6
Showing 1 changed file with 153 additions and 25 deletions.
178 changes: 153 additions & 25 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor};

// The type of action associated with a [`ScanFile`]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum ScanFileType {
Add,
Remove,
Cdc,
}

/// Represents all the metadata needed to read a Change Data Feed. It has the following fields:
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub(crate) struct ScanFile {
/// The type of action this file belongs to. This may be one of add, remove, or cdc.
pub tpe: ScanFileType,
/// a `&str` which is the path to the file
pub path: String,
/// an `i64` which is the size of the file
pub size: i64,
/// a [`DvInfo`] struct, which allows getting the selection vector for this file
pub dv_info: DvInfo,
/// a `HashMap<String, String>` which are partition values
Expand Down Expand Up @@ -94,7 +92,7 @@ struct ScanFileVisitor<'a, T> {

impl<T> RowVisitor for ScanFileVisitor<'_, T> {
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let expected_getters = 21;
let expected_getters = 18;
require!(
getters.len() == expected_getters,
Error::InternalError(format!(
Expand All @@ -107,53 +105,47 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
// skip skipped rows
continue;
}
let timestamp = getters[19].get(row_index, "scanFile.timestamp")?;
let commit_version: i64 = getters[20].get(row_index, "scanFile.commit_version")?;
let timestamp = getters[16].get(row_index, "scanFile.timestamp")?;
let commit_version: i64 = getters[17].get(row_index, "scanFile.commit_version")?;
let commit_version: u64 = commit_version.try_into().map_err(Error::generic)?;

// Since path column is required, use it to detect presence of an Add action
if let Some(path) = getters[0].get_opt(row_index, "scanFile.add.path")? {
let size = getters[1].get(row_index, "scanFile.add.size")?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[2..=6])?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[1..=5])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[7].get(row_index, "scanFile.add.fileConstantValues.partitionValues")?;
getters[6].get(row_index, "scanFile.add.fileConstantValues.partitionValues")?;
let scan_file = ScanFile {
tpe: ScanFileType::Add,
path,
size,
dv_info,
partition_values,
commit_version,
timestamp,
};
(self.callback)(&mut self.context, scan_file)
} else if let Some(path) = getters[8].get_opt(row_index, "scanFile.remove.path")? {
let size = getters[9].get(row_index, "scanFile.remove.size")?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[10..=14])?;
} else if let Some(path) = getters[7].get_opt(row_index, "scanFile.remove.path")? {
let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..=12])?;
let dv_info = DvInfo { deletion_vector };
let partition_values = getters[15].get(
let partition_values = getters[13].get(
row_index,
"scanFile.remove.fileConstantValues.partitionValues",
)?;
let scan_file = ScanFile {
tpe: ScanFileType::Remove,
path,
size,
dv_info,
partition_values,
commit_version,
timestamp,
};
(self.callback)(&mut self.context, scan_file)
} else if let Some(path) = getters[16].get_opt(row_index, "scanFile.cdc.path")? {
let size = getters[17].get(row_index, "scanFile.cdc.size")?;
let partition_values = getters[18]
} else if let Some(path) = getters[14].get_opt(row_index, "scanFile.cdc.path")? {
let partition_values = getters[15]
.get(row_index, "scanFile.cdc.fileConstantValues.partitionValues")?;
let scan_file = ScanFile {
tpe: ScanFileType::Cdc,
path,
size,
dv_info: DvInfo {
deletion_vector: None,
},
Expand Down Expand Up @@ -187,7 +179,6 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
/// {
/// add: {
/// path: string,
/// size: long,
/// deletionVector: {
/// storageType: string,
/// pathOrInlineDv: string,
Expand All @@ -201,7 +192,6 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
/// }
/// remove: {
/// path: string,
/// size: long,
/// deletionVector: {
/// storageType: string,
/// pathOrInlineDv: string,
Expand All @@ -215,7 +205,6 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
/// }
/// cdc: {
/// path: string,
/// size: long,
/// fileConstantValues: {
/// partitionValues: map<string, string>
/// }
Expand Down Expand Up @@ -280,7 +269,6 @@ pub(crate) fn transform_to_scan_row_expression(timestamp: i64, commit_number: i6
]),
Expression::struct_from([
column_expr!("cdc.path"),
column_expr!("cdc.size"),
Expression::struct_from([column_expr!("cdc.partitionValues")]),
]),
timestamp.into(),
Expand All @@ -289,4 +277,144 @@ pub(crate) fn transform_to_scan_row_expression(timestamp: i64, commit_number: i6
}

#[cfg(test)]
mod tests {}
mod tests {
use std::collections::HashMap;

use itertools::Itertools;

use super::ScanFileType;
use super::{
scan_row_schema, transform_to_scan_row_expression, visit_scan_files, ScanCallback, ScanFile,
};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{get_log_schema, Add, Cdc, Remove};
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::state::DvInfo;
use crate::utils::test_utils::MockTable;
use crate::{DeltaResult, Engine};

#[tokio::test]
async fn schema_transform_correct() {
let engine = SyncEngine::new();
let mut mock_table = MockTable::new();

let add_dv = DeletionVectorDescriptor {
storage_type: "u".to_string(),
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
};
let add_partition_values = HashMap::from([("a".to_string(), "b".to_string())]);
let add = Add {
path: "fake_path_1".into(),
deletion_vector: Some(add_dv.clone()),
partition_values: add_partition_values,
..Default::default()
};

let rm_dv = DeletionVectorDescriptor {
storage_type: "u".to_string(),
path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(),
offset: Some(1),
size_in_bytes: 38,
cardinality: 3,
};
let rm_partition_values = Some(HashMap::from([("c".to_string(), "d".to_string())]));

let remove = Remove {
path: "fake_path_2".into(),
deletion_vector: Some(rm_dv),
partition_values: rm_partition_values,
..Default::default()
};

let cdc_partition_values = HashMap::from([("x".to_string(), "y".to_string())]);
let cdc = Cdc {
path: "fake_path_3".into(),
partition_values: cdc_partition_values,
..Default::default()
};

mock_table
.commit(&[
add.clone().into(),
remove.clone().into(),
cdc.clone().into(),
])
.await;

let table_root = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let log_root = table_root.join("_delta_log/").unwrap();
let log_segment =
LogSegment::for_table_changes(engine.get_file_system_client().as_ref(), log_root, 0, 0)
.unwrap();
let commit = log_segment.ascending_commit_files[0].clone();

let actions = engine
.get_json_handler()
.read_json_files(&[commit.location.clone()], get_log_schema().clone(), None)
.unwrap();

let timestamp = 1234_i64;
let commit_version = 42_u64;

let scan_file: Vec<_> = actions
.map_ok(|actions| {
engine
.get_expression_handler()
.get_evaluator(
get_log_schema().clone(),
transform_to_scan_row_expression(1234, 42),
scan_row_schema().into(),
)
.evaluate(actions.as_ref())
.unwrap()
})
.map(|data| -> DeltaResult<_> {
let data = data?;
let selection_vector = vec![true; data.len()];
let callback: ScanCallback<Vec<ScanFile>> =
|context, scan_file| context.push(scan_file);
visit_scan_files(data.as_ref(), &selection_vector, vec![], callback)
})
.flatten_ok()
.try_collect()
.unwrap();

let expected_scan_files = vec![
ScanFile {
tpe: ScanFileType::Add,
path: add.path,
dv_info: DvInfo {
deletion_vector: add.deletion_vector,
},
partition_values: add.partition_values,
commit_version,
timestamp,
},
ScanFile {
tpe: ScanFileType::Remove,
path: remove.path,
dv_info: DvInfo {
deletion_vector: remove.deletion_vector,
},
partition_values: remove.partition_values.unwrap(),
commit_version,
timestamp,
},
ScanFile {
tpe: ScanFileType::Cdc,
path: cdc.path,
dv_info: DvInfo {
deletion_vector: None,
},
partition_values: cdc.partition_values,
commit_version,
timestamp,
},
];
assert_eq!(expected_scan_files, scan_files);
}
}

0 comments on commit e384be6

Please sign in to comment.