Skip to content

Commit

Permalink
Merge branch 'cdf_scan_file' into cdf_read_phase
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 30, 2024
2 parents 40dca95 + 7f1098b commit 6e37d99
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ use crate::actions::{
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::TypedGetData;
use crate::expressions::{column_expr, column_name, Expression};
use crate::expressions::column_name;
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::scan_row_schema;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::table_properties::TableProperties;
Expand All @@ -24,6 +23,8 @@ use crate::{
};
use itertools::Itertools;

use super::scan_file::{scan_row_schema, transform_to_scan_row_expression};

pub struct TableChangesScanData {
pub data: Box<dyn EngineData>,
pub selection_vector: Vec<bool>,
Expand Down Expand Up @@ -62,21 +63,6 @@ pub(crate) fn table_changes_action_iter(
Ok(result)
}

// Gets the expression for generating the engine data in [`TableChangesScanData`].
//
// TODO: This expression is temporary. In the future it will also select `cdc` and `remove` actions
// fields.
fn get_add_transform_expr() -> Expression {
Expression::Struct(vec![
column_expr!("add.path"),
column_expr!("add.size"),
column_expr!("add.modificationTime"),
column_expr!("add.stats"),
column_expr!("add.deletionVector"),
Expression::Struct(vec![column_expr!("add.partitionValues")]),
])
}

/// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`].
/// The scanner operates in two phases that _must_ be performed in the following order:
/// 1. Prepare phase [`LogReplayScanner::prepare_phase`]: This performs one iteration over every
Expand Down Expand Up @@ -203,7 +189,7 @@ impl LogReplayScanner {
remove_dvs,
commit_file,
json_handler,
timestamp: _,
timestamp,
filter,
expression_handler,
schema: _,
Expand All @@ -214,6 +200,13 @@ impl LogReplayScanner {
let action_iter =
json_handler.read_json_files(&[commit_file.location.clone()], schema, None)?;

let commit_version = commit_file.version.try_into().map_err(Error::generic)?;
let evaluator = expression_handler.get_evaluator(
get_log_add_schema().clone(),
transform_to_scan_row_expression(timestamp, commit_version),
scan_row_schema().into(),
);

let result = action_iter.map(move |actions| -> DeltaResult<_> {
let actions = actions?;

Expand All @@ -229,13 +222,7 @@ impl LogReplayScanner {
FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action);
visitor.visit_rows_of(actions.as_ref())?;

let data = expression_handler
.get_evaluator(
get_log_add_schema().clone(),
get_add_transform_expr(),
scan_row_schema().into(),
)
.evaluate(actions.as_ref())?;
let data = evaluator.evaluate(actions.as_ref())?;
Ok(TableChangesScanData {
data,
selection_vector: visitor.selection_vector,
Expand Down

0 comments on commit 6e37d99

Please sign in to comment.