Skip to content

Commit

Permalink
Asynchronous I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
penberg committed Dec 28, 2023
1 parent 5b8a056 commit 335d1e3
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 26 deletions.
3 changes: 2 additions & 1 deletion core/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ impl Cursor {
let page_idx = mem_page.page_idx;
let page = self.pager.read_page(page_idx)?;
assert!(page.is_uptodate());
let page = &page.contents;
let page = page.contents.read().unwrap();
let page = page.as_ref().unwrap();
if mem_page.cell_idx() >= page.cells.len() {
let parent = mem_page.parent.clone();
match page.header.right_most_pointer {
Expand Down
1 change: 1 addition & 0 deletions core/io/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl File {
}
ring.submit_and_wait(1)?;
let cqe = ring.completion().next().expect("completion queue is empty");
c.complete();
Ok(())
}
}
16 changes: 16 additions & 0 deletions core/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
use cfg_block::cfg_block;
use std::pin::Pin;

pub type Complete = dyn Fn(&Buffer) + Send + Sync;

pub struct Completion<'a> {
pub buf: &'a mut Buffer,
pub complete: Box<Complete>,
}

impl<'a> Completion<'a> {
pub fn new(buf: &'a mut Buffer, complete: Box<Complete>) -> Self {
Self {
buf,
complete: Box::new(complete),
}
}

pub fn complete(&self) {
(self.complete)(self.buf);
}
}

pub struct Buffer {
Expand Down
23 changes: 14 additions & 9 deletions core/pager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use crate::Storage;
use concurrent_lru::unsharded::LruCache;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Arc, Mutex, RwLock,
};

pub struct Page {
flags: AtomicUsize,
pub contents: BTreePage,
pub contents: RwLock<Option<BTreePage>>,
}

/// Page is up-to-date.
Expand All @@ -22,10 +22,10 @@ const PAGE_LOCKED: usize = 0b010;
const PAGE_ERROR: usize = 0b100;

impl Page {
pub fn new(contents: BTreePage) -> Page {
pub fn new() -> Page {
Page {
flags: AtomicUsize::new(0),
contents,
contents: RwLock::new(None),
}
}

Expand Down Expand Up @@ -88,11 +88,16 @@ impl Pager {
pub fn read_page(&self, page_idx: usize) -> anyhow::Result<Arc<Page>> {
let handle = self.page_cache.get_or_try_init(page_idx, 1, |_idx| {
let mut buffer_pool = self.buffer_pool.lock().unwrap();
let page =
sqlite3_ondisk::read_btree_page(&self.storage, &mut buffer_pool, page_idx).unwrap();
let page = Page::new(page);
page.set_uptodate();
Ok::<Arc<Page>, anyhow::Error>(Arc::new(page))
let page = Arc::new(Page::new());
page.set_locked();
sqlite3_ondisk::read_btree_page(
&self.storage,
&mut buffer_pool,
page.clone(),
page_idx,
)
.unwrap();
Ok::<Arc<Page>, anyhow::Error>(page)
})?;
Ok(handle.value().clone())
}
Expand Down
51 changes: 35 additions & 16 deletions core/sqlite3_ondisk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
/// For more information, see: https://www.sqlite.org/fileformat.html
use crate::buffer_pool::BufferPool;
use crate::io::{Buffer, Completion};
use crate::pager::Page;
use crate::types::{Record, Value};
use crate::Storage;
use anyhow::{anyhow, Result};
use std::borrow::BorrowMut;
use std::sync::Arc;

/// The size of the database header in bytes.
pub const DATABASE_HEADER_SIZE: usize = 100;
Expand Down Expand Up @@ -62,8 +64,10 @@ pub struct DatabaseHeader {

pub fn read_database_header(storage: &Storage) -> Result<DatabaseHeader> {
let mut buf = Buffer::allocate(512);
let complete = Box::new(|_buf: &Buffer| {});
let mut c = Completion {
buf: buf.borrow_mut(),
complete,
};
storage.get(1, &mut c)?;
let buf = buf.as_slice();
Expand Down Expand Up @@ -136,44 +140,59 @@ pub struct BTreePage {
pub fn read_btree_page(
storage: &Storage,
buffer_pool: &mut BufferPool,
page: Arc<Page>,
page_idx: usize,
) -> Result<BTreePage> {
) -> Result<()> {
let mut buf = buffer_pool.get();
let page = buf.borrow_mut().data_mut();
let mut c = Completion { buf: page };
let buf = buf.borrow_mut().data_mut();
let complete = Box::new(move |buf: &Buffer| {
let page = page.clone();
let page_idx = page_idx.clone();
read_btree_page_complete(page_idx, buf, page).unwrap();
});
let mut c = Completion { buf, complete };
storage.get(page_idx, &mut c)?;
// TODO: return completion
Ok(())
}

fn read_btree_page_complete(page_idx: usize, buf: &Buffer, page: Arc<Page>) -> Result<()> {
let mut pos = if page_idx == 1 {
DATABASE_HEADER_SIZE
} else {
0
};
let page = page.as_slice();
let buf = buf.as_slice();
let mut header = BTreePageHeader {
page_type: page[pos].try_into()?,
_first_freeblock_offset: u16::from_be_bytes([page[pos + 1], page[pos + 2]]),
num_cells: u16::from_be_bytes([page[pos + 3], page[pos + 4]]),
_cell_content_area: u16::from_be_bytes([page[pos + 5], page[pos + 6]]),
_num_frag_free_bytes: page[pos + 7],
page_type: buf[pos].try_into()?,
_first_freeblock_offset: u16::from_be_bytes([buf[pos + 1], buf[pos + 2]]),
num_cells: u16::from_be_bytes([buf[pos + 3], buf[pos + 4]]),
_cell_content_area: u16::from_be_bytes([buf[pos + 5], buf[pos + 6]]),
_num_frag_free_bytes: buf[pos + 7],
right_most_pointer: None,
};
pos += 8;
if header.page_type == PageType::IndexInterior || header.page_type == PageType::TableInterior {
header.right_most_pointer = Some(u32::from_be_bytes([
page[pos],
page[pos + 1],
page[pos + 2],
page[pos + 3],
buf[pos],
buf[pos + 1],
buf[pos + 2],
buf[pos + 3],
]));
pos += 4;
}
let mut cells = Vec::new();
for _ in 0..header.num_cells {
let cell_pointer = u16::from_be_bytes([page[pos], page[pos + 1]]);
let cell_pointer = u16::from_be_bytes([buf[pos], buf[pos + 1]]);
pos += 2;
let cell = read_btree_cell(page, &header.page_type, cell_pointer as usize)?;
let cell = read_btree_cell(buf, &header.page_type, cell_pointer as usize)?;
cells.push(cell);
}
Ok(BTreePage { header, cells })
let inner = BTreePage { header, cells };
page.contents.write().unwrap().replace(inner);
page.set_uptodate();
page.clear_locked();
Ok(())
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions core/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl StorageIO for FileStorage {
assert!((page_size & (page_size - 1)) == 0);
let pos = (page_idx - 1) * page_size;
self.file.pread(pos, c)?;
c.complete();
Ok(())
}
}
Expand Down

0 comments on commit 335d1e3

Please sign in to comment.