Skip to content

Commit

Permalink
WIP: Implement LOAD operations
Browse files Browse the repository at this point in the history
This turned out to be surprisingly tricky. The key insight is that LOAD
and STORE aren't really symmetrical, at least not in this version of the
code. Instead, %STORE(expr) gets manually inserted as a phantom
operator, but %LOAD(var_ref) only occurs as part of a variable
reference.

This means that transforms, `sql_quote!` and all our other rewriting
machinery don't even notice that LOADs exist.

This is a weird decision, and it might not be optimal in the long run,
but it works.

This brings us down to `7 tests failed, 64 passed, 3 pending`, and
all the failing test cases are places we need to expand SELECT wildcards
like `*`, `table.*`, etc., into column lists with explicit STORE
operations.
  • Loading branch information
emk committed Dec 9, 2024
1 parent edb4a05 commit 2f2b3fd
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 185 deletions.
215 changes: 108 additions & 107 deletions src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,8 @@ pub enum Expression {
Literal(Literal),
BoolValue(Keyword),
Null(Keyword),
Name(Name),
Name(NameExpression),
Store(StoreExpression),
Cast(Cast),
Is(IsExpression),
In(InExpression),
Expand Down Expand Up @@ -867,8 +868,6 @@ pub enum Expression {
FunctionCall(FunctionCall),
Index(IndexExpression),
FieldAccess(FieldAccessExpression),
Load(LoadExpression),
Store(StoreExpression),
}

impl Expression {
Expand Down Expand Up @@ -913,6 +912,111 @@ impl DatePart {
}
}

/// A "load" expression, which transforms an SQL value from a "storage" type (eg
/// "VARCHAR") to a "memory" type (eg "UUID"). Used for databases like Trino,
/// where the storage types for a given connector may be more limited than the
/// standard Trino memory types.
///
/// These are not found in the original parsed AST, but are added while
/// transforming the AST.
#[derive(Clone, Debug, Drive, DriveMut, EmitDefault, Spanned, ToTokens)]
pub struct NameExpression {
/// **If** we need to do a load conversion, this will be the inferred memory
/// type.
#[emit(skip)]
#[to_tokens(skip)]
#[drive(skip)]
pub load_to_memory_type: Option<ValueType>,

/// Our underlying expression.
pub name: Name,
}

impl Emit for NameExpression {
fn emit(&self, t: Target, f: &mut TokenWriter<'_>) -> ::std::io::Result<()> {
match t {
// Target::BigQuery => {
// f.write_token_start("%LOAD(")?;
// self.name.emit(t, f)?;
// f.write_token_start(")")
// }
Target::Trino(connector_type) if self.load_to_memory_type.is_some() => {
let bq_memory_type = self
.load_to_memory_type
.as_ref()
.expect("memory_type should have been filled in by type inference");
let trino_memory_type =
TrinoDataType::try_from(bq_memory_type).map_err(io::Error::other)?;
let transform = connector_type.storage_transform_for(&trino_memory_type);
let (prefix, suffix) = transform.load_prefix_and_suffix();

// Wrapping the expression in our prefix and suffix.
// If the expression was col_name containing '[1,2]' in Trino,
// BQ memory type -> JSON, Trino memory type -> JSON, Trino storage type -> VARCHAR
// The Trino storage type is dependent on what the connector can support.
// In this case, the wrapped version would be JSON_PARSE(col_name)
f.write_token_start(&prefix)?;
self.name.emit(t, f)?;
f.write_token_start(&suffix)
}
_ => self.name.emit(t, f),
}
}
}

/// A "store" expression, which transforms an SQL value from a "memory" type
/// (eg "UUID") to a "storage" type (eg "VARCHAR"). Used for databases like
/// Trino, where the storage types for a given connector may be more limited
/// than the standard Trino memory types.
///
/// These are not found in the original parsed AST, but are added while
/// transforming the AST.
#[derive(Clone, Debug, Drive, DriveMut, EmitDefault, Spanned, ToTokens)]
pub struct StoreExpression {
/// Inferred memory type.
#[emit(skip)]
#[to_tokens(skip)]
#[drive(skip)]
pub memory_type: Option<ValueType>,

/// Our underlying expression.
pub expression: Box<Expression>,
}

impl Emit for StoreExpression {
fn emit(&self, t: Target, f: &mut TokenWriter<'_>) -> ::std::io::Result<()> {
match t {
Target::BigQuery => {
f.write_token_start("%STORE(")?;
self.expression.emit(t, f)?;
f.write_token_start(")")
}
Target::Trino(connector_type) => {
let bq_memory_type = self
.memory_type
.as_ref()
.expect("memory_type should have been filled in by type inference");

// If our bq_memory_type is NULL, we don't need to do any transforms because
// NULL is NULL in both storage and memory types and dbcrossbar_trino doesn't
// support NULL as a memory type.
if let ValueType::Simple(SimpleType::Null) = bq_memory_type {
self.expression.emit(t, f)
} else {
let trino_memory_type =
TrinoDataType::try_from(bq_memory_type).map_err(io::Error::other)?;
let transform = connector_type.storage_transform_for(&trino_memory_type);
let (prefix, suffix) = transform.store_prefix_and_suffix();

f.write_token_start(&prefix)?;
self.expression.emit(t, f)?;
f.write_token_start(&suffix)
}
}
}
}
}

/// A cast expression.
#[derive(Clone, Debug, Drive, DriveMut, Emit, EmitDefault, Spanned, ToTokens)]
pub struct Cast {
Expand Down Expand Up @@ -1645,109 +1749,6 @@ pub struct FieldAccessExpression {
pub field_name: Ident,
}

/// A "load" expression, which transforms an SQL value from a "storage" type (eg
/// "VARCHAR") to a "memory" type (eg "UUID"). Used for databases like Trino,
/// where the storage types for a given connector may be more limited than the
/// standard Trino memory types.
///
/// These are not found in the original parsed AST, but are added while
/// transforming the AST.
#[derive(Clone, Debug, Drive, DriveMut, EmitDefault, Spanned, ToTokens)]
pub struct LoadExpression {
/// Inferred memory type.
#[emit(skip)]
#[to_tokens(skip)]
#[drive(skip)]
pub memory_type: Option<ValueType>,

/// Our underlying expression.
pub expression: Box<Expression>,
}

impl Emit for LoadExpression {
fn emit(&self, t: Target, f: &mut TokenWriter<'_>) -> ::std::io::Result<()> {
match t {
Target::BigQuery => {
f.write_token_start("%LOAD(")?;
self.expression.emit(t, f)?;
f.write_token_start(")")
}
Target::Trino(connector_type) => {
let bq_memory_type = self
.memory_type
.as_ref()
.expect("memory_type should have been filled in by type inference");
let trino_memory_type =
TrinoDataType::try_from(bq_memory_type).map_err(io::Error::other)?;
let transform = connector_type.storage_transform_for(&trino_memory_type);
let (prefix, suffix) = transform.load_prefix_and_suffix();

// Wrapping the expression in our prefix and suffix.
// If the expression was col_name containing '[1,2]' in Trino,
// BQ memory type -> JSON, Trino memory type -> JSON, Trino storage type -> VARCHAR
// The Trino storage type is dependent on what the connector can support.
// In this case, the wrapped version would be JSON_PARSE(col_name)
f.write_token_start(&prefix)?;
self.expression.emit(t, f)?;
f.write_token_start(&suffix)
}
}
}
}

/// A "store" expression, which transforms an SQL value from a "memory" type
/// (eg "UUID") to a "storage" type (eg "VARCHAR"). Used for databases like
/// Trino, where the storage types for a given connector may be more limited
/// than the standard Trino memory types.
///
/// These are not found in the original parsed AST, but are added while
/// transforming the AST.
#[derive(Clone, Debug, Drive, DriveMut, EmitDefault, Spanned, ToTokens)]
pub struct StoreExpression {
/// Inferred memory type.
#[emit(skip)]
#[to_tokens(skip)]
#[drive(skip)]
pub memory_type: Option<ValueType>,

/// Our underlying expression.
pub expression: Box<Expression>,
}

impl Emit for StoreExpression {
fn emit(&self, t: Target, f: &mut TokenWriter<'_>) -> ::std::io::Result<()> {
match t {
Target::BigQuery => {
f.write_token_start("%STORE(")?;
self.expression.emit(t, f)?;
f.write_token_start(")")
}
Target::Trino(connector_type) => {
let bq_memory_type = self
.memory_type
.as_ref()
.expect("memory_type should have been filled in by type inference");

// If our bq_memory_type is NULL, we don't need to do any transforms because
// NULL is NULL in both storage and memory types and dbcrossbar_trino doesn't
// support NULL as a memory type.
if let ValueType::Simple(SimpleType::Null) = bq_memory_type {
self.expression.emit(t, f)
} else {
let trino_memory_type =
TrinoDataType::try_from(bq_memory_type).map_err(io::Error::other)?;
let transform = connector_type.storage_transform_for(&trino_memory_type);
let (prefix, suffix) = transform.store_prefix_and_suffix();

f.write_token_start(&prefix)?;
self.expression.emit(t, f)?;
f.write_token_start(&suffix)
}
}
}
}
}

/// An `AS` alias.
#[derive(Clone, Debug, Drive, DriveMut, Emit, EmitDefault, Spanned, ToTokens)]
pub struct Alias {
Expand Down Expand Up @@ -2446,7 +2447,7 @@ peg::parser! {
// Things from here down might start with arbitrary identifiers, so
// we need to be careful about the order.
function_call:function_call() { Expression::FunctionCall(function_call) }
column_name:name() { Expression::Name(column_name) }
column_name:name() { Expression::Name(NameExpression { load_to_memory_type: None, name: column_name }) }
}

rule interval_expression() -> IntervalExpression
Expand Down
9 changes: 1 addition & 8 deletions src/infer/contains_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl ContainsAggregate for ast::Expression {
ast::Expression::BoolValue(_) => false,
ast::Expression::Null(_) => false,
ast::Expression::Name(_) => false,
ast::Expression::Store(store_expr) => store_expr.contains_aggregate(scope),
ast::Expression::Cast(cast) => cast.contains_aggregate(scope),
ast::Expression::Is(is) => is.contains_aggregate(scope),
ast::Expression::In(in_expr) => in_expr.contains_aggregate(scope),
Expand All @@ -91,8 +92,6 @@ impl ContainsAggregate for ast::Expression {
// Putting an aggregate here would be very weird. Do not allow it
// until forced to do so.
ast::Expression::FieldAccess(_) => false,
ast::Expression::Load(load_expr) => load_expr.contains_aggregate(scope),
ast::Expression::Store(store_expr) => store_expr.contains_aggregate(scope),
}
}
}
Expand Down Expand Up @@ -269,12 +268,6 @@ impl ContainsAggregate for ast::IndexOffset {
}
}

impl ContainsAggregate for ast::LoadExpression {
fn contains_aggregate(&self, scope: &ColumnSetScope) -> bool {
self.expression.contains_aggregate(scope)
}
}

impl ContainsAggregate for ast::StoreExpression {
fn contains_aggregate(&self, scope: &ColumnSetScope) -> bool {
self.expression.contains_aggregate(scope)
Expand Down
Loading

0 comments on commit 2f2b3fd

Please sign in to comment.