Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DELETE query planning #538

Merged
merged 14 commits into from
Dec 27, 2024
2 changes: 1 addition & 1 deletion core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ pub use storage::wal::WalFile;
pub use storage::wal::WalFileShared;
use util::parse_schema_rows;

use translate::optimizer::optimize_plan;
use translate::planner::prepare_select_plan;

pub use error::LimboError;
pub type Result<T> = std::result::Result<T, error::LimboError>;

use crate::translate::optimizer::optimize_plan;
pub use io::OpenFlags;
#[cfg(feature = "fs")]
pub use io::PlatformIO;
Expand Down
4 changes: 4 additions & 0 deletions core/pseudo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Cursor for PseudoCursor {
Ok(CursorResult::Ok(()))
}

fn delete(&mut self) -> Result<CursorResult<()>> {
unimplemented!()
}

fn get_null_flag(&self) -> bool {
false
}
Expand Down
5 changes: 5 additions & 0 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,11 @@ impl Cursor for BTreeCursor {
Ok(CursorResult::Ok(()))
}

fn delete(&mut self) -> Result<CursorResult<()>> {
println!("rowid: {:?}", self.rowid.borrow());
Ok(CursorResult::Ok(()))
}

fn set_null_flag(&mut self, flag: bool) {
self.null_flag = flag;
}
Expand Down
21 changes: 21 additions & 0 deletions core/translate/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::translate::emitter::emit_program;
use crate::translate::optimizer::optimize_plan;
use crate::translate::planner::prepare_delete_plan;
use crate::{schema::Schema, storage::sqlite3_ondisk::DatabaseHeader, vdbe::Program};
use crate::{Connection, Result};
use sqlite3_parser::ast::{Expr, Limit, QualifiedName};
use std::rc::Weak;
use std::{cell::RefCell, rc::Rc};

pub fn translate_delete(
schema: &Schema,
tbl_name: &QualifiedName,
where_clause: Option<Expr>,
limit: Option<Limit>,
database_header: Rc<RefCell<DatabaseHeader>>,
connection: Weak<Connection>,
) -> Result<Program> {
let delete_plan = prepare_delete_plan(schema, tbl_name, where_clause, limit)?;
let optimized_plan = optimize_plan(delete_plan)?;
emit_program(database_header, optimized_plan, connection)
}
233 changes: 206 additions & 27 deletions core/translate/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sqlite3_parser::ast::{self};

use crate::schema::{Column, PseudoTable, Table};
use crate::storage::sqlite3_ondisk::DatabaseHeader;
use crate::translate::plan::{IterationDirection, Search};
use crate::translate::plan::{DeletePlan, IterationDirection, Plan, Search};
use crate::types::{OwnedRecord, OwnedValue};
use crate::util::exprs_are_equivalent;
use crate::vdbe::builder::ProgramBuilder;
Expand All @@ -20,7 +20,7 @@ use super::expr::{
translate_aggregation, translate_aggregation_groupby, translate_condition_expr, translate_expr,
ConditionMetadata,
};
use super::plan::{Aggregate, BTreeTableReference, Direction, GroupBy, Plan};
use super::plan::{Aggregate, BTreeTableReference, Direction, GroupBy, SelectPlan};
use super::plan::{ResultSetColumn, SourceOperator};

// Metadata for handling LEFT JOIN operations
Expand Down Expand Up @@ -101,6 +101,15 @@ pub struct Metadata {
pub result_columns_to_skip_in_orderby_sorter: Option<Vec<usize>>,
}

/// Used to distinguish database operations
#[derive(Debug, Clone)]
pub enum OperationMode {
SELECT,
INSERT,
UPDATE,
DELETE,
}

/// Initialize the program with basic setup and return initial metadata and labels
fn prologue() -> Result<(ProgramBuilder, Metadata, BranchOffset, BranchOffset)> {
let mut program = ProgramBuilder::new();
Expand Down Expand Up @@ -166,6 +175,17 @@ pub fn emit_program(
database_header: Rc<RefCell<DatabaseHeader>>,
mut plan: Plan,
connection: Weak<Connection>,
) -> Result<Program> {
match plan {
Plan::Select(plan) => emit_program_for_select(database_header, plan, connection),
Plan::Delete(plan) => emit_program_for_delete(database_header, plan, connection),
}
}

fn emit_program_for_select(
database_header: Rc<RefCell<DatabaseHeader>>,
mut plan: SelectPlan,
connection: Weak<Connection>,
) -> Result<Program> {
let (mut program, mut metadata, init_label, start_offset) = prologue()?;

Expand Down Expand Up @@ -201,7 +221,12 @@ pub fn emit_program(
if let Some(ref mut group_by) = plan.group_by {
init_group_by(&mut program, group_by, &plan.aggregates, &mut metadata)?;
}
init_source(&mut program, &plan.source, &mut metadata)?;
init_source(
&mut program,
&plan.source,
&mut metadata,
&OperationMode::SELECT,
)?;

// Set up main query execution loop
open_loop(
Expand Down Expand Up @@ -272,6 +297,63 @@ pub fn emit_program(
Ok(program.build(database_header, connection))
}

fn emit_program_for_delete(
database_header: Rc<RefCell<DatabaseHeader>>,
mut plan: DeletePlan,
connection: Weak<Connection>,
) -> Result<Program> {
let (mut program, mut metadata, init_label, start_offset) = prologue()?;

// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
let skip_loops_label = if plan.contains_constant_false_condition {
let skip_loops_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::Goto {
target_pc: skip_loops_label,
},
skip_loops_label,
);
Some(skip_loops_label)
} else {
None
};

// Initialize cursors and other resources needed for query execution
init_source(
&mut program,
&plan.source,
&mut metadata,
&OperationMode::DELETE,
)?;

// Set up main query execution loop
open_loop(
&mut program,
&mut plan.source,
&plan.referenced_tables,
&mut metadata,
)?;

emit_delete_insns(&mut program, &plan.source, &plan.limit, &metadata)?;

// Clean up and close the main execution loop
close_loop(
&mut program,
&plan.source,
&mut metadata,
&plan.referenced_tables,
)?;

if let Some(skip_loops_label) = skip_loops_label {
program.resolve_label(skip_loops_label, program.offset());
}

// Finalize program
epilogue(&mut program, &mut metadata, init_label, start_offset)?;

Ok(program.build(database_header, connection))
}

/// Initialize resources needed for ORDER BY processing
fn init_order_by(
program: &mut ProgramBuilder,
Expand Down Expand Up @@ -385,6 +467,7 @@ fn init_source(
program: &mut ProgramBuilder,
source: &SourceOperator,
metadata: &mut Metadata,
mode: &OperationMode,
) -> Result<()> {
match source {
SourceOperator::Join {
Expand All @@ -402,10 +485,10 @@ fn init_source(
};
metadata.left_joins.insert(*id, lj_metadata);
}
init_source(program, left, metadata)?;
init_source(program, right, metadata)?;
init_source(program, left, metadata, mode)?;
init_source(program, right, metadata, mode)?;

return Ok(());
Ok(())
}
SourceOperator::Scan {
id,
Expand All @@ -419,13 +502,28 @@ fn init_source(
let root_page = table_reference.table.root_page;
let next_row_label = program.allocate_label();
metadata.next_row_labels.insert(*id, next_row_label);
program.emit_insn(Insn::OpenReadAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenReadAwait);

return Ok(());
match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenReadAwait {});
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}

Ok(())
}
SourceOperator::Search {
id,
Expand All @@ -442,27 +540,54 @@ fn init_source(

metadata.next_row_labels.insert(*id, next_row_label);

program.emit_insn(Insn::OpenReadAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenReadAwait);
match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenReadAwait {});
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}

if let Search::IndexSearch { index, .. } = search {
let index_cursor_id = program
.alloc_cursor_id(Some(index.name.clone()), Some(Table::Index(index.clone())));
program.emit_insn(Insn::OpenReadAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenReadAwait);

match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenReadAwait);
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}
}

return Ok(());
}
SourceOperator::Nothing => {
return Ok(());
Ok(())
}
SourceOperator::Nothing => Ok(()),
}
}

Expand Down Expand Up @@ -811,7 +936,7 @@ pub enum InnerLoopEmitTarget<'a> {
/// At this point the cursors for all tables have been opened and rewound.
fn inner_loop_emit(
program: &mut ProgramBuilder,
plan: &mut Plan,
plan: &mut SelectPlan,
metadata: &mut Metadata,
) -> Result<()> {
// if we have a group by, we emit a record into the group by sorter.
Expand Down Expand Up @@ -1121,6 +1246,60 @@ fn close_loop(
}
}

fn emit_delete_insns(
program: &mut ProgramBuilder,
source: &SourceOperator,
limit: &Option<usize>,
metadata: &Metadata,
) -> Result<()> {
let cursor_id = match source {
SourceOperator::Scan {
table_reference, ..
} => program.resolve_cursor_id(&table_reference.table_identifier),
SourceOperator::Search {
table_reference,
search,
..
} => match search {
Search::RowidEq { .. } | Search::RowidSearch { .. } => {
program.resolve_cursor_id(&table_reference.table_identifier)
}
Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
},
_ => return Ok(()),
};

// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
dest: key_reg,
});
program.emit_insn(Insn::DeleteAsync { cursor_id });
program.emit_insn(Insn::DeleteAwait { cursor_id });
if let Some(limit) = limit {
let limit_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
value: *limit as i64,
dest: limit_reg,
});
program.mark_last_insn_constant();
let jump_label_on_limit_reached = metadata
.termination_label_stack
.last()
.expect("termination_label_stack should not be empty.");
program.emit_insn_with_label_dependency(
Insn::DecrJumpZero {
reg: limit_reg,
target_pc: *jump_label_on_limit_reached,
},
*jump_label_on_limit_reached,
)
}

Ok(())
}

/// Emits the bytecode for processing a GROUP BY clause.
/// This is called when the main query execution loop has finished processing,
/// and we now have data in the GROUP BY sorter.
Expand Down
Loading
Loading