Skip to content

Commit

Permalink
Working cdf dv resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 30, 2024
1 parent 6e37d99 commit 13cb83c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 124 deletions.
140 changes: 25 additions & 115 deletions kernel/src/table_changes/data_read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::iter::once;
use std::iter::{empty, once};
use std::sync::Arc;

use itertools::{Either, Itertools};
use roaring::RoaringTreemap;
use url::Url;

use crate::actions::deletion_vector::{split_vector, treemap_to_bools};
Expand All @@ -14,16 +15,15 @@ use crate::{DeltaResult, Engine, Error, Expression, FileMeta};

use super::scan::GlobalScanState;
use super::scan_file::{ScanFile, ScanFileType};
use super::CDF_FIELDS;
type ResolvedScanFile = (ScanFile, Option<Vec<bool>>);

pub(crate) struct DataReader {
pub(crate) struct ScanFileReader {
global_scan_state: GlobalScanState,
scan_file: ScanFile,
selection_vector: Option<Vec<bool>>,
}

impl DataReader {
impl ScanFileReader {
pub(crate) fn new(
global_scan_state: GlobalScanState,
scan_file: ScanFile,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl DataReader {
self.global_scan_state.read_schema.clone()
}
}
pub fn into_data(
pub(crate) fn into_data(
mut self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
Expand Down Expand Up @@ -165,8 +165,7 @@ impl DataReader {
// trying to return a captured variable. We're going to reassign `selection_vector`
// to `rest` in a moment anyway
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None); // TODO: Fill in len and extend
// value
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
raw_mask: sv,
Expand All @@ -185,6 +184,23 @@ pub(crate) fn resolve_scan_file_dv(
let remove_dv = remove_dv.as_ref().and_then(|map| map.get(&scan_file.path));
match (&scan_file.tpe, remove_dv) {
(ScanFileType::Add, Some(rm_dv)) => {
// Helper function to convert a treemap to (scan_file, selection_vector) pair.
// This returns an empty iterator if nothing is selected.
fn treemap_to_iter(
treemap: RoaringTreemap,
mut scan_file: ScanFile,
tpe: ScanFileType,
) -> impl Iterator<Item = (ScanFile, Option<Vec<bool>>)> {
if treemap.is_empty() {
Either::Left(empty())
} else {
let added_dv = treemap_to_bools(treemap);
scan_file.tpe = tpe;
Either::Right(once((scan_file, Some(added_dv))))
}
}

// Retrieve the deletion vector from the add action and remove action
let add_dv = scan_file
.dv_info
.get_treemap(engine, table_root)?
Expand All @@ -193,15 +209,8 @@ pub(crate) fn resolve_scan_file_dv(
.get_treemap(engine, table_root)?
.unwrap_or(Default::default());

let added_dv = treemap_to_bools(&rm_dv - &add_dv);
let added = once((scan_file.clone(), Some(added_dv)));

let removed_dv = treemap_to_bools(add_dv - rm_dv);
let rm_scanfile = ScanFile {
tpe: ScanFileType::Remove,
..scan_file
};
let removed = once((rm_scanfile, Some(removed_dv)));
let added = treemap_to_iter(&rm_dv - &add_dv, scan_file.clone(), ScanFileType::Add);
let removed = treemap_to_iter(add_dv - rm_dv, scan_file, ScanFileType::Remove);

Ok(Either::Right(added.chain(removed)))
}
Expand All @@ -215,102 +224,3 @@ pub(crate) fn resolve_scan_file_dv(
(ScanFileType::Cdc, None) => Ok(Either::Left(once((scan_file, None)))),
}
}

//
//// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the
//// scan, and then reuse them for each batch transform
//#[allow(clippy::too_many_arguments)] // TEMPORARY
//pub(crate) fn transform_to_logical_internal(
// engine: &dyn Engine,
// data: Box<dyn EngineData>,
// global_state: &GlobalScanState,
// partition_values: &std::collections::HashMap<String, String>,
// all_fields: &[ColumnType],
// _have_partition_cols: bool,
// generated_columns: &HashMap<String, Expression>,
// read_schema: Arc<StructType>,
//) -> DeltaResult<Box<dyn EngineData>> {
// // need to add back partition cols and/or fix-up mapped columns
// let all_fields: Vec<Expression> = all_fields
// .iter()
// .map(|field| match field {
// ColumnType::Partition(field_idx) => {
// let field = global_state.logical_schema.fields.get_index(*field_idx);
// let Some((_, field)) = field else {
// return Err(Error::generic(
// "logical schema did not contain expected field, can't transform data",
// ));
// };
// let name = field.physical_name(global_state.column_mapping_mode)?;
// let value_expression =
// parse_partition_value(partition_values.get(name), field.data_type())?;
// Ok(value_expression.into())
// }
// ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()),
// ColumnType::InsertedColumn(field_idx) => {
// let field = global_state.logical_schema.fields.get_index(*field_idx);
// let Some((_, field)) = field else {
// return Err(Error::generic(
// "logical schema did not contain expected field, can't transform data",
// ));
// };
// let Some(expr) = generated_columns.get(field.name()) else {
// return Err(Error::generic(
// "Got unexpected inserted field , can't transform data",
// ));
// };
// Ok(expr.clone())
// }
// })
// .try_collect()?;
// let read_expression = Expression::Struct(all_fields);
//
// let result = engine
// .get_expression_handler()
// .get_evaluator(
// read_schema,
// read_expression,
// global_state.logical_schema.clone().into(),
// )
// .evaluate(data.as_ref())?;
// Ok(result)
//}
//
//pub(crate) fn get_generated_columns(
// timestamp: i64,
// tpe: ScanFileType,
// commit_version: i64,
//) -> Result<Arc<HashMap<String, Expression>>, crate::Error> {
// // Both in-commit timestamps and file metadata are in milliseconds
// //
// // See:
// // [`FileMeta`]
// // [In-Commit Timestamps] : https://github.com/delta-io/delta/blob/master/PROTOCOL.md#writer-requirements-for-in-commit-timestampsa
// let timestamp = Scalar::timestamp_from_millis(timestamp)?;
// let expressions = match tpe {
// ScanFileType::Cdc => [
// column_expr!("_change_type"),
// Expression::literal(commit_version),
// timestamp.into(),
// ],
// ScanFileType::Add => [
// "insert".into(),
// Expression::literal(commit_version),
// timestamp.into(),
// ],
//
// ScanFileType::Remove => [
// "delete".into(),
// Expression::literal(commit_version),
// timestamp.into(),
// ],
// };
// let generated_columns: Arc<HashMap<String, Expression>> = Arc::new(
// CDF_GENERATED_COLUMNS
// .iter()
// .map(ToString::to_string)
// .zip(expressions)
// .collect(),
// );
// Ok(generated_columns)
//}
13 changes: 4 additions & 9 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use std::collections::HashMap;
use std::iter::{self, empty, once};
use std::sync::Arc;

use itertools::Itertools;
use serde::{Deserialize, Serialize};
use tracing::debug;

use crate::scan::state::DvInfo;
use crate::scan::{ColumnType, ScanResult};
use crate::schema::{SchemaRef, StructType};
use crate::table_changes::scan_file::ScanFileType;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, Error, ExpressionRef};
use crate::{DeltaResult, Engine, ExpressionRef};

use super::data_read::{resolve_scan_file_dv, DataReader};
use super::data_read::{resolve_scan_file_dv, ScanFileReader};
use super::log_replay::{table_changes_action_iter, TableChangesScanData};
use super::scan_file::{scan_data_to_scan_file, ScanFile};
use super::scan_file::scan_data_to_scan_file;
use super::{TableChanges, CDF_FIELDS};

/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change
Expand Down Expand Up @@ -239,7 +234,7 @@ impl TableChangesScan {
let (scan_file, selection_vector) = x?;
let engine = engine.clone();
let reader =
DataReader::new(global_scan_state.clone(), scan_file, selection_vector);
ScanFileReader::new(global_scan_state.clone(), scan_file, selection_vector);
reader.into_data(engine.as_ref())
})
.flatten_ok()
Expand Down

0 comments on commit 13cb83c

Please sign in to comment.