Skip to content

Commit

Permalink
Add scannable trait
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Nov 20, 2024
1 parent 463bbca commit 5f6bcfb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 48 deletions.
76 changes: 30 additions & 46 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Functionality to create and execute scans (reads) over data stored in a delta table

use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -14,7 +13,6 @@ use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar};
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_changes::table_changes_scan::TableChangesScan;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};

Expand All @@ -25,31 +23,22 @@ mod data_skipping;
pub mod log_replay;
pub mod state;

pub mod marker {
use crate::{snapshot::Snapshot, table_changes::TableChanges};

pub trait ScanType {
type Scannable;
}
pub struct TableChangesScanType {}
impl ScanType for TableChangesScan {
type Scannable = TableChanges;
}
pub struct SnapshotScan {}
impl ScanType for SnapshotScan {
type Scannable = Snapshot;
}
pub trait Scannable {
type ScanType;
fn build_scan(
self: Arc<Self>,
schema: Option<SchemaRef>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Self::ScanType>;
}

/// Builder to scan a snapshot of a table.
pub struct ScanBuilder<T: marker::ScanType> {
scannable: Arc<T::Scannable>,
pub struct ScanBuilder<T: Scannable> {
scannable: Arc<T>,
schema: Option<SchemaRef>,
predicate: Option<ExpressionRef>,
scan_type: PhantomData<T>,
}

impl<T: marker::ScanType> std::fmt::Debug for ScanBuilder<T> {
impl<T: Scannable> std::fmt::Debug for ScanBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("ScanBuilder")
.field("schema", &self.schema)
Expand All @@ -58,14 +47,13 @@ impl<T: marker::ScanType> std::fmt::Debug for ScanBuilder<T> {
}
}

impl<T: marker::ScanType> ScanBuilder<T> {
impl<T: Scannable> ScanBuilder<T> {
/// Create a new [`ScanBuilder`] instance.
pub fn new(scannable: impl Into<Arc<T::Scannable>>) -> Self {
pub fn new(scannable: impl Into<Arc<T>>) -> Self {
Self {
scannable: scannable.into(),
schema: None,
predicate: None,
scan_type: Default::default(),
}
}

Expand Down Expand Up @@ -100,52 +88,48 @@ impl<T: marker::ScanType> ScanBuilder<T> {
self.predicate = predicate.into();
self
}

pub fn build(self) -> DeltaResult<T::ScanType> {
self.scannable.build_scan(self.schema, self.predicate)
}
}
impl ScanBuilder<marker::SnapshotScan> {

impl Scannable for Snapshot {
type ScanType = Scan;
/// Build the [`Scan`].
///
/// This does not scan the table at this point, but does do some work to ensure that the
/// provided schema make sense, and to prepare some metadata that the scan will need. The
/// [`Scan`] type itself can be used to fetch the files and associated metadata required to
/// perform actual data reads.
pub fn build(self) -> DeltaResult<Scan> {
fn build_scan(
self: Arc<Self>,
schema: Option<SchemaRef>,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Scan> {
// if no schema is provided, use snapshot's entire schema (e.g. SELECT *)
let logical_schema = self
.schema
.unwrap_or_else(|| self.scannable.schema().clone().into());
let logical_schema = schema.unwrap_or_else(|| self.schema().clone().into());
let (all_fields, read_fields, have_partition_cols) = get_state_info(
logical_schema.as_ref(),
&self.scannable.metadata().partition_columns,
self.scannable.column_mapping_mode,
&self.metadata().partition_columns,
self.column_mapping_mode,
)?;
let physical_schema = Arc::new(StructType::new(read_fields));

// important! before a read/write to the table we must check it is supported
self.scannable.protocol().ensure_read_supported()?;
self.protocol().ensure_read_supported()?;

Ok(Scan {
snapshot: self.scannable,
snapshot: self,
logical_schema,
physical_schema,
predicate: self.predicate,
predicate,
all_fields,
have_partition_cols,
})
}
}

impl ScanBuilder<marker::TableChangesScanType> {
/// Build the [`Scan`].
///
/// This does not scan the table at this point, but does do some work to ensure that the
/// provided schema make sense, and to prepare some metadata that the scan will need. The
/// [`Scan`] type itself can be used to fetch the files and associated metadata required to
/// perform actual data reads.
pub fn build(self) -> DeltaResult<TableChangesScan> {
Ok(TableChangesScan {})
}
}

/// A vector of this type is returned from calling [`Scan::execute`]. Each [`ScanResult`] contains
/// the raw [`EngineData`] as read by the engines [`crate::ParquetHandler`], and a boolean
/// mask. Rows can be dropped from a scan due to deletion vectors, so we communicate back both
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ impl Snapshot {
}

/// Create a [`ScanBuilder`] for an `Arc<Snapshot>`.
pub fn scan_builder(self: Arc<Self>) -> ScanBuilder {
pub fn scan_builder(self: Arc<Self>) -> ScanBuilder<Self> {
ScanBuilder::new(self)
}

/// Consume this `Snapshot` to create a [`ScanBuilder`]
pub fn into_scan_builder(self) -> ScanBuilder {
pub fn into_scan_builder(self) -> ScanBuilder<Self> {
ScanBuilder::new(self)
}
}
Expand Down
17 changes: 17 additions & 0 deletions kernel/src/table_changes/table_changes_scan.rs
Original file line number Diff line number Diff line change
@@ -1 +1,18 @@
use std::sync::Arc;

use crate::{scan::Scannable, schema::SchemaRef, DeltaResult, ExpressionRef};

use super::TableChanges;

pub struct TableChangesScan {}

impl Scannable for TableChanges {
type ScanType = TableChangesScan;
fn build_scan(
self: Arc<Self>,
_schema: Option<SchemaRef>,
_predicate: Option<ExpressionRef>,
) -> DeltaResult<Self::ScanType> {
todo!()
}
}

0 comments on commit 5f6bcfb

Please sign in to comment.