Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DELETE query planning #538

Merged
merged 14 commits into from
Dec 27, 2024
4 changes: 2 additions & 2 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ pub use storage::wal::WalFile;
pub use storage::wal::WalFileShared;
use util::parse_schema_rows;

use translate::optimizer::optimize_plan;
use translate::planner::prepare_select_plan;

pub use error::LimboError;
pub type Result<T> = std::result::Result<T, error::LimboError>;

use crate::translate::optimizer::optimize_select_plan;
pub use io::OpenFlags;
#[cfg(feature = "fs")]
pub use io::PlatformIO;
Expand Down Expand Up @@ -267,7 +267,7 @@ impl Connection {
match stmt {
ast::Stmt::Select(select) => {
let plan = prepare_select_plan(&*self.schema.borrow(), select)?;
let plan = optimize_plan(plan)?;
let plan = optimize_select_plan(plan)?;
println!("{}", plan);
}
_ => todo!(),
Expand Down
4 changes: 4 additions & 0 deletions core/pseudo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ impl Cursor for PseudoCursor {
Ok(CursorResult::Ok(()))
}

fn delete(&mut self) -> Result<CursorResult<()>> {
unimplemented!()
}

fn get_null_flag(&self) -> bool {
false
}
Expand Down
5 changes: 5 additions & 0 deletions core/storage/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,11 @@ impl Cursor for BTreeCursor {
Ok(CursorResult::Ok(()))
}

fn delete(&mut self) -> Result<CursorResult<()>> {
println!("rowid: {:?}", self.rowid.borrow());
Ok(CursorResult::Ok(()))
}

fn set_null_flag(&mut self, flag: bool) {
self.null_flag = flag;
}
Expand Down
21 changes: 21 additions & 0 deletions core/translate/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::translate::emitter::emit_program_for_delete;
use crate::translate::optimizer::optimize_delete_plan;
use crate::translate::planner::prepare_delete_plan;
use crate::{schema::Schema, storage::sqlite3_ondisk::DatabaseHeader, vdbe::Program};
use crate::{Connection, Result};
use sqlite3_parser::ast::{Expr, QualifiedName, ResultColumn};
use std::rc::Weak;
use std::{cell::RefCell, rc::Rc};

pub fn translate_delete(
schema: &Schema,
tbl_name: &QualifiedName,
where_clause: Option<Expr>,
_returning: &Option<Vec<ResultColumn>>,
database_header: Rc<RefCell<DatabaseHeader>>,
connection: Weak<Connection>,
) -> Result<Program> {
let delete_plan = prepare_delete_plan(schema, tbl_name, where_clause)?;
let optimized_plan = optimize_delete_plan(delete_plan)?;
emit_program_for_delete(database_header, optimized_plan, connection)
}
211 changes: 187 additions & 24 deletions core/translate/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ pub struct Metadata {
pub result_columns_to_skip_in_orderby_sorter: Option<Vec<usize>>,
}

/// Used to distinguish database operations
#[derive(Debug, Clone)]
pub enum OperationMode {
SELECT,
INSERT,
UPDATE,
DELETE,
}

/// Initialize the program with basic setup and return initial metadata and labels
fn prologue() -> Result<(ProgramBuilder, Metadata, BranchOffset, BranchOffset)> {
let mut program = ProgramBuilder::new();
Expand Down Expand Up @@ -201,7 +210,12 @@ pub fn emit_program(
if let Some(ref mut group_by) = plan.group_by {
init_group_by(&mut program, group_by, &plan.aggregates, &mut metadata)?;
}
init_source(&mut program, &plan.source, &mut metadata)?;
init_source(
&mut program,
&plan.source,
&mut metadata,
&OperationMode::SELECT,
)?;

// Set up main query execution loop
open_loop(
Expand Down Expand Up @@ -272,6 +286,63 @@ pub fn emit_program(
Ok(program.build(database_header, connection))
}

pub fn emit_program_for_delete(
database_header: Rc<RefCell<DatabaseHeader>>,
mut plan: Plan,
connection: Weak<Connection>,
) -> Result<Program> {
let (mut program, mut metadata, init_label, start_offset) = prologue()?;

// No rows will be read from source table loops if there is a constant false condition eg. WHERE 0
let skip_loops_label = if plan.contains_constant_false_condition {
let skip_loops_label = program.allocate_label();
program.emit_insn_with_label_dependency(
Insn::Goto {
target_pc: skip_loops_label,
},
skip_loops_label,
);
Some(skip_loops_label)
} else {
None
};

// Initialize cursors and other resources needed for query execution
init_source(
&mut program,
&plan.source,
&mut metadata,
&OperationMode::DELETE,
)?;

// Set up main query execution loop
open_loop(
&mut program,
&mut plan.source,
&plan.referenced_tables,
&mut metadata,
)?;

emit_delete_insns(&mut program, &plan.source)?;

// Clean up and close the main execution loop
close_loop(
&mut program,
&plan.source,
&mut metadata,
&plan.referenced_tables,
)?;

if let Some(skip_loops_label) = skip_loops_label {
program.resolve_label(skip_loops_label, program.offset());
}

// Finalize program
epilogue(&mut program, &mut metadata, init_label, start_offset)?;

Ok(program.build(database_header, connection))
}

/// Initialize resources needed for ORDER BY processing
fn init_order_by(
program: &mut ProgramBuilder,
Expand Down Expand Up @@ -385,6 +456,7 @@ fn init_source(
program: &mut ProgramBuilder,
source: &SourceOperator,
metadata: &mut Metadata,
mode: &OperationMode,
) -> Result<()> {
match source {
SourceOperator::Join {
Expand All @@ -402,10 +474,10 @@ fn init_source(
};
metadata.left_joins.insert(*id, lj_metadata);
}
init_source(program, left, metadata)?;
init_source(program, right, metadata)?;
init_source(program, left, metadata, mode)?;
init_source(program, right, metadata, mode)?;

return Ok(());
Ok(())
}
SourceOperator::Scan {
id,
Expand All @@ -419,13 +491,28 @@ fn init_source(
let root_page = table_reference.table.root_page;
let next_row_label = program.allocate_label();
metadata.next_row_labels.insert(*id, next_row_label);
program.emit_insn(Insn::OpenReadAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenReadAwait);

return Ok(());
match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenReadAwait {});
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id,
root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}

Ok(())
}
SourceOperator::Search {
id,
Expand All @@ -442,27 +529,54 @@ fn init_source(

metadata.next_row_labels.insert(*id, next_row_label);

program.emit_insn(Insn::OpenReadAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenReadAwait);
match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenReadAwait {});
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: table_cursor_id,
root_page: table_reference.table.root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}

if let Search::IndexSearch { index, .. } = search {
let index_cursor_id = program
.alloc_cursor_id(Some(index.name.clone()), Some(Table::Index(index.clone())));
program.emit_insn(Insn::OpenReadAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenReadAwait);

match mode {
OperationMode::SELECT => {
program.emit_insn(Insn::OpenReadAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenReadAwait);
}
OperationMode::DELETE => {
program.emit_insn(Insn::OpenWriteAsync {
cursor_id: index_cursor_id,
root_page: index.root_page,
});
program.emit_insn(Insn::OpenWriteAwait {});
}
_ => {
unimplemented!()
}
}
}

return Ok(());
}
SourceOperator::Nothing => {
return Ok(());
Ok(())
}
SourceOperator::Nothing => Ok(()),
}
}

Expand Down Expand Up @@ -1121,6 +1235,55 @@ fn close_loop(
}
}

fn emit_delete_insns(program: &mut ProgramBuilder, source: &SourceOperator) -> Result<()> {
match source {
SourceOperator::Scan {
id,
table_reference,
iter_dir,
..
} => {
let cursor_id = program.resolve_cursor_id(&table_reference.table_identifier);

// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
dest: key_reg,
});
program.emit_insn(Insn::DeleteAsync { cursor_id });
program.emit_insn(Insn::DeleteAwait { cursor_id });

Ok(())
}
SourceOperator::Search {
id,
table_reference,
search,
..
} => {
let cursor_id = match search {
Search::RowidEq { .. } | Search::RowidSearch { .. } => {
program.resolve_cursor_id(&table_reference.table_identifier)
}
Search::IndexSearch { index, .. } => program.resolve_cursor_id(&index.name),
};

// Emit the instructions to delete the row
let key_reg = program.alloc_register();
program.emit_insn(Insn::RowId {
cursor_id,
dest: key_reg,
});
program.emit_insn(Insn::DeleteAsync { cursor_id });
program.emit_insn(Insn::DeleteAwait { cursor_id });

Ok(())
}
_ => Ok(()),
}
}

/// Emits the bytecode for processing a GROUP BY clause.
/// This is called when the main query execution loop has finished processing,
/// and we now have data in the GROUP BY sorter.
Expand Down
19 changes: 18 additions & 1 deletion core/translate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! a SELECT statement will be translated into a sequence of instructions that
//! will read rows from the database and filter them according to a WHERE clause.

pub(crate) mod delete;
pub(crate) mod emitter;
pub(crate) mod expr;
pub(crate) mod insert;
Expand All @@ -23,6 +24,7 @@ use std::str::FromStr;
use crate::schema::Schema;
use crate::storage::pager::Pager;
use crate::storage::sqlite3_ondisk::{DatabaseHeader, MIN_PAGE_CACHE_SIZE};
use crate::translate::delete::translate_delete;
use crate::vdbe::{builder::ProgramBuilder, Insn, Program};
use crate::{bail_parse_error, Connection, Result};
use insert::translate_insert;
Expand Down Expand Up @@ -68,7 +70,22 @@ pub fn translate(
ast::Stmt::CreateVirtualTable { .. } => {
bail_parse_error!("CREATE VIRTUAL TABLE not supported yet")
}
ast::Stmt::Delete { .. } => bail_parse_error!("DELETE not supported yet"),
ast::Stmt::Delete {
with,
tbl_name,
indexed,
where_clause,
returning,
order_by,
limit,
} => translate_delete(
schema,
&tbl_name,
where_clause,
&returning,
database_header,
connection,
),
ast::Stmt::Detach(_) => bail_parse_error!("DETACH not supported yet"),
ast::Stmt::DropIndex { .. } => bail_parse_error!("DROP INDEX not supported yet"),
ast::Stmt::DropTable { .. } => bail_parse_error!("DROP TABLE not supported yet"),
Expand Down
Loading
Loading