diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 53aec0c7..45e2de80 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -1,7 +1,6 @@ //! Functionality to create and execute scans (reads) over data stored in a delta table use std::collections::HashMap; -use std::marker::PhantomData; use std::sync::Arc; use itertools::Itertools; @@ -14,7 +13,6 @@ use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar}; use crate::scan::state::{DvInfo, Stats}; use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType}; use crate::snapshot::Snapshot; -use crate::table_changes::table_changes_scan::TableChangesScan; use crate::table_features::ColumnMappingMode; use crate::{DeltaResult, Engine, EngineData, Error, FileMeta}; @@ -25,31 +23,22 @@ mod data_skipping; pub mod log_replay; pub mod state; -pub mod marker { - use crate::{snapshot::Snapshot, table_changes::TableChanges}; - - pub trait ScanType { - type Scannable; - } - pub struct TableChangesScanType {} - impl ScanType for TableChangesScan { - type Scannable = TableChanges; - } - pub struct SnapshotScan {} - impl ScanType for SnapshotScan { - type Scannable = Snapshot; - } +pub trait Scannable { + type ScanType; + fn build_scan( + self: Arc, + schema: Option, + predicate: Option, + ) -> DeltaResult; } - /// Builder to scan a snapshot of a table. -pub struct ScanBuilder { - scannable: Arc, +pub struct ScanBuilder { + scannable: Arc, schema: Option, predicate: Option, - scan_type: PhantomData, } -impl std::fmt::Debug for ScanBuilder { +impl std::fmt::Debug for ScanBuilder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { f.debug_struct("ScanBuilder") .field("schema", &self.schema) @@ -58,14 +47,13 @@ impl std::fmt::Debug for ScanBuilder { } } -impl ScanBuilder { +impl ScanBuilder { /// Create a new [`ScanBuilder`] instance. - pub fn new(scannable: impl Into>) -> Self { + pub fn new(scannable: impl Into>) -> Self { Self { scannable: scannable.into(), schema: None, predicate: None, - scan_type: Default::default(), } } @@ -100,52 +88,48 @@ impl ScanBuilder { self.predicate = predicate.into(); self } + + pub fn build(self) -> DeltaResult { + self.scannable.build_scan(self.schema, self.predicate) + } } -impl ScanBuilder { + +impl Scannable for Snapshot { + type ScanType = Scan; /// Build the [`Scan`]. /// /// This does not scan the table at this point, but does do some work to ensure that the /// provided schema make sense, and to prepare some metadata that the scan will need. The /// [`Scan`] type itself can be used to fetch the files and associated metadata required to /// perform actual data reads. - pub fn build(self) -> DeltaResult { + fn build_scan( + self: Arc, + schema: Option, + predicate: Option, + ) -> DeltaResult { // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) - let logical_schema = self - .schema - .unwrap_or_else(|| self.scannable.schema().clone().into()); + let logical_schema = schema.unwrap_or_else(|| self.schema().clone().into()); let (all_fields, read_fields, have_partition_cols) = get_state_info( logical_schema.as_ref(), - &self.scannable.metadata().partition_columns, - self.scannable.column_mapping_mode, + &self.metadata().partition_columns, + self.column_mapping_mode, )?; let physical_schema = Arc::new(StructType::new(read_fields)); // important! before a read/write to the table we must check it is supported - self.scannable.protocol().ensure_read_supported()?; + self.protocol().ensure_read_supported()?; Ok(Scan { - snapshot: self.scannable, + snapshot: self, logical_schema, physical_schema, - predicate: self.predicate, + predicate, all_fields, have_partition_cols, }) } } -impl ScanBuilder { - /// Build the [`Scan`]. - /// - /// This does not scan the table at this point, but does do some work to ensure that the - /// provided schema make sense, and to prepare some metadata that the scan will need. The - /// [`Scan`] type itself can be used to fetch the files and associated metadata required to - /// perform actual data reads. - pub fn build(self) -> DeltaResult { - Ok(TableChangesScan {}) - } -} - /// A vector of this type is returned from calling [`Scan::execute`]. Each [`ScanResult`] contains /// the raw [`EngineData`] as read by the engines [`crate::ParquetHandler`], and a boolean /// mask. Rows can be dropped from a scan due to deletion vectors, so we communicate back both diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 874ef8d9..112fb1cb 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -129,12 +129,12 @@ impl Snapshot { } /// Create a [`ScanBuilder`] for an `Arc`. - pub fn scan_builder(self: Arc) -> ScanBuilder { + pub fn scan_builder(self: Arc) -> ScanBuilder { ScanBuilder::new(self) } /// Consume this `Snapshot` to create a [`ScanBuilder`] - pub fn into_scan_builder(self) -> ScanBuilder { + pub fn into_scan_builder(self) -> ScanBuilder { ScanBuilder::new(self) } } diff --git a/kernel/src/table_changes/table_changes_scan.rs b/kernel/src/table_changes/table_changes_scan.rs index fafcbaf7..9ba97d1b 100644 --- a/kernel/src/table_changes/table_changes_scan.rs +++ b/kernel/src/table_changes/table_changes_scan.rs @@ -1 +1,18 @@ +use std::sync::Arc; + +use crate::{scan::Scannable, schema::SchemaRef, DeltaResult, ExpressionRef}; + +use super::TableChanges; + pub struct TableChangesScan {} + +impl Scannable for TableChanges { + type ScanType = TableChangesScan; + fn build_scan( + self: Arc, + _schema: Option, + _predicate: Option, + ) -> DeltaResult { + todo!() + } +}