Skip to content

Commit

Permalink
Add scan_data_to_scan_file phase
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 29, 2024
1 parent 21b7e85 commit 8d3d843
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! [`ScanFileVisitor`]. The visitor may read from a log with the schema [`scan_row_schema`].
//! You can convert a log to this schema using the [`transform_to_scan_row_expression`].

use itertools::Itertools;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};

Expand All @@ -14,6 +15,30 @@ use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructFie
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error, RowVisitor};

use super::log_replay::TableChangesScanData;

#[allow(unused)]
pub(crate) fn scan_data_to_scan_file(
scan_data: impl Iterator<Item = DeltaResult<TableChangesScanData>>,
) -> impl Iterator<Item = DeltaResult<(ScanFile, Option<Arc<HashMap<String, DvInfo>>>)>> {
scan_data
.map(|scan_data| -> DeltaResult<_> {
let scan_data = scan_data?;
let callback: ScanCallback<Vec<ScanFile>> =
|context, scan_file| context.push(scan_file);
let result = visit_scan_files(
scan_data.data.as_ref(),
&scan_data.selection_vector,
vec![],
callback,
)?
.into_iter()
.map(move |scan_file| (scan_file, scan_data.remove_dv.clone()));
Ok(result)
})
.flatten_ok()
}

// The type of action associated with a [`ScanFile`]
#[allow(unused)]
#[derive(Debug, Clone, PartialEq)]
Expand Down

0 comments on commit 8d3d843

Please sign in to comment.