From 7f1098b1db00620a265bbe7f93542efb1dc838a6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 29 Nov 2024 16:14:21 -0800 Subject: [PATCH] update to scan_file expression --- kernel/src/table_changes/log_replay.rs | 37 +++++++++----------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index f77d34828..6d0eab000 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -10,10 +10,9 @@ use crate::actions::{ METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, }; use crate::engine_data::TypedGetData; -use crate::expressions::{column_expr, column_name, Expression}; +use crate::expressions::column_name; use crate::path::ParsedLogPath; use crate::scan::data_skipping::DataSkippingFilter; -use crate::scan::scan_row_schema; use crate::scan::state::DvInfo; use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType}; use crate::table_properties::TableProperties; @@ -24,6 +23,8 @@ use crate::{ }; use itertools::Itertools; +use super::scan_file::{scan_row_schema, transform_to_scan_row_expression}; + pub struct TableChangesScanData { pub data: Box, pub selection_vector: Vec, @@ -62,21 +63,6 @@ pub(crate) fn table_changes_action_iter( Ok(result) } -// Gets the expression for generating the engine data in [`TableChangesScanData`]. -// -// TODO: This expression is temporary. In the future it will also select `cdc` and `remove` actions -// fields. -fn get_add_transform_expr() -> Expression { - Expression::Struct(vec![ - column_expr!("add.path"), - column_expr!("add.size"), - column_expr!("add.modificationTime"), - column_expr!("add.stats"), - column_expr!("add.deletionVector"), - Expression::Struct(vec![column_expr!("add.partitionValues")]), - ]) -} - /// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`]. /// The scanner operates in two phases that _must_ be performed in the following order: /// 1. Prepare phase [`LogReplayScanner::prepare_phase`]: This performs one iteration over every @@ -203,7 +189,7 @@ impl LogReplayScanner { remove_dvs, commit_file, json_handler, - timestamp: _, + timestamp, filter, expression_handler, schema: _, @@ -214,6 +200,13 @@ impl LogReplayScanner { let action_iter = json_handler.read_json_files(&[commit_file.location.clone()], schema, None)?; + let commit_version = commit_file.version.try_into().map_err(Error::generic)?; + let evaluator = expression_handler.get_evaluator( + get_log_add_schema().clone(), + transform_to_scan_row_expression(timestamp, commit_version), + scan_row_schema().into(), + ); + let result = action_iter.map(move |actions| -> DeltaResult<_> { let actions = actions?; @@ -229,13 +222,7 @@ impl LogReplayScanner { FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action); visitor.visit_rows_of(actions.as_ref())?; - let data = expression_handler - .get_evaluator( - get_log_add_schema().clone(), - get_add_transform_expr(), - scan_row_schema().into(), - ) - .evaluate(actions.as_ref())?; + let data = evaluator.evaluate(actions.as_ref())?; Ok(TableChangesScanData { data, selection_vector: visitor.selection_vector,