diff --git a/core/btree.rs b/core/btree.rs index a2504ff5..266335c8 100644 --- a/core/btree.rs +++ b/core/btree.rs @@ -99,7 +99,8 @@ impl Cursor { let page_idx = mem_page.page_idx; let page = self.pager.read_page(page_idx)?; assert!(page.is_uptodate()); - let page = &page.contents; + let page = page.contents.read().unwrap(); + let page = page.as_ref().unwrap(); if mem_page.cell_idx() >= page.cells.len() { let parent = mem_page.parent.clone(); match page.header.right_most_pointer { diff --git a/core/io/linux.rs b/core/io/linux.rs index 338f75f4..09aec0a9 100644 --- a/core/io/linux.rs +++ b/core/io/linux.rs @@ -51,6 +51,7 @@ impl File { } ring.submit_and_wait(1)?; let cqe = ring.completion().next().expect("completion queue is empty"); + c.complete(); Ok(()) } } \ No newline at end of file diff --git a/core/io/mod.rs b/core/io/mod.rs index 78f98804..b96c2388 100644 --- a/core/io/mod.rs +++ b/core/io/mod.rs @@ -1,8 +1,24 @@ use cfg_block::cfg_block; use std::pin::Pin; +pub type Complete = dyn Fn(&Buffer) + Send + Sync; + pub struct Completion<'a> { pub buf: &'a mut Buffer, + pub complete: Box, +} + +impl<'a> Completion<'a> { + pub fn new(buf: &'a mut Buffer, complete: Box) -> Self { + Self { + buf, + complete: Box::new(complete), + } + } + + pub fn complete(&self) { + (self.complete)(self.buf); + } } pub struct Buffer { diff --git a/core/pager.rs b/core/pager.rs index 87d200b0..07a6c3e7 100644 --- a/core/pager.rs +++ b/core/pager.rs @@ -6,12 +6,12 @@ use crate::Storage; use concurrent_lru::unsharded::LruCache; use std::sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }; pub struct Page { flags: AtomicUsize, - pub contents: BTreePage, + pub contents: RwLock>, } /// Page is up-to-date. @@ -22,10 +22,10 @@ const PAGE_LOCKED: usize = 0b010; const PAGE_ERROR: usize = 0b100; impl Page { - pub fn new(contents: BTreePage) -> Page { + pub fn new() -> Page { Page { flags: AtomicUsize::new(0), - contents, + contents: RwLock::new(None), } } @@ -88,11 +88,16 @@ impl Pager { pub fn read_page(&self, page_idx: usize) -> anyhow::Result> { let handle = self.page_cache.get_or_try_init(page_idx, 1, |_idx| { let mut buffer_pool = self.buffer_pool.lock().unwrap(); - let page = - sqlite3_ondisk::read_btree_page(&self.storage, &mut buffer_pool, page_idx).unwrap(); - let page = Page::new(page); - page.set_uptodate(); - Ok::, anyhow::Error>(Arc::new(page)) + let page = Arc::new(Page::new()); + page.set_locked(); + sqlite3_ondisk::read_btree_page( + &self.storage, + &mut buffer_pool, + page.clone(), + page_idx, + ) + .unwrap(); + Ok::, anyhow::Error>(page) })?; Ok(handle.value().clone()) } diff --git a/core/sqlite3_ondisk.rs b/core/sqlite3_ondisk.rs index e62b1a50..7f5828a6 100644 --- a/core/sqlite3_ondisk.rs +++ b/core/sqlite3_ondisk.rs @@ -25,10 +25,12 @@ /// For more information, see: https://www.sqlite.org/fileformat.html use crate::buffer_pool::BufferPool; use crate::io::{Buffer, Completion}; +use crate::pager::Page; use crate::types::{Record, Value}; use crate::Storage; use anyhow::{anyhow, Result}; use std::borrow::BorrowMut; +use std::sync::Arc; /// The size of the database header in bytes. pub const DATABASE_HEADER_SIZE: usize = 100; @@ -62,8 +64,10 @@ pub struct DatabaseHeader { pub fn read_database_header(storage: &Storage) -> Result { let mut buf = Buffer::allocate(512); + let complete = Box::new(|_buf: &Buffer| {}); let mut c = Completion { buf: buf.borrow_mut(), + complete, }; storage.get(1, &mut c)?; let buf = buf.as_slice(); @@ -136,44 +140,59 @@ pub struct BTreePage { pub fn read_btree_page( storage: &Storage, buffer_pool: &mut BufferPool, + page: Arc, page_idx: usize, -) -> Result { +) -> Result<()> { let mut buf = buffer_pool.get(); - let page = buf.borrow_mut().data_mut(); - let mut c = Completion { buf: page }; + let buf = buf.borrow_mut().data_mut(); + let complete = Box::new(move |buf: &Buffer| { + let page = page.clone(); + let page_idx = page_idx.clone(); + read_btree_page_complete(page_idx, buf, page).unwrap(); + }); + let mut c = Completion { buf, complete }; storage.get(page_idx, &mut c)?; + // TODO: return completion + Ok(()) +} + +fn read_btree_page_complete(page_idx: usize, buf: &Buffer, page: Arc) -> Result<()> { let mut pos = if page_idx == 1 { DATABASE_HEADER_SIZE } else { 0 }; - let page = page.as_slice(); + let buf = buf.as_slice(); let mut header = BTreePageHeader { - page_type: page[pos].try_into()?, - _first_freeblock_offset: u16::from_be_bytes([page[pos + 1], page[pos + 2]]), - num_cells: u16::from_be_bytes([page[pos + 3], page[pos + 4]]), - _cell_content_area: u16::from_be_bytes([page[pos + 5], page[pos + 6]]), - _num_frag_free_bytes: page[pos + 7], + page_type: buf[pos].try_into()?, + _first_freeblock_offset: u16::from_be_bytes([buf[pos + 1], buf[pos + 2]]), + num_cells: u16::from_be_bytes([buf[pos + 3], buf[pos + 4]]), + _cell_content_area: u16::from_be_bytes([buf[pos + 5], buf[pos + 6]]), + _num_frag_free_bytes: buf[pos + 7], right_most_pointer: None, }; pos += 8; if header.page_type == PageType::IndexInterior || header.page_type == PageType::TableInterior { header.right_most_pointer = Some(u32::from_be_bytes([ - page[pos], - page[pos + 1], - page[pos + 2], - page[pos + 3], + buf[pos], + buf[pos + 1], + buf[pos + 2], + buf[pos + 3], ])); pos += 4; } let mut cells = Vec::new(); for _ in 0..header.num_cells { - let cell_pointer = u16::from_be_bytes([page[pos], page[pos + 1]]); + let cell_pointer = u16::from_be_bytes([buf[pos], buf[pos + 1]]); pos += 2; - let cell = read_btree_cell(page, &header.page_type, cell_pointer as usize)?; + let cell = read_btree_cell(buf, &header.page_type, cell_pointer as usize)?; cells.push(cell); } - Ok(BTreePage { header, cells }) + let inner = BTreePage { header, cells }; + page.contents.write().unwrap().replace(inner); + page.set_uptodate(); + page.clear_locked(); + Ok(()) } #[derive(Debug)] diff --git a/core/storage.rs b/core/storage.rs index 7f49e220..869528d3 100644 --- a/core/storage.rs +++ b/core/storage.rs @@ -42,6 +42,7 @@ impl StorageIO for FileStorage { assert!((page_size & (page_size - 1)) == 0); let pos = (page_idx - 1) * page_size; self.file.pread(pos, c)?; + c.complete(); Ok(()) } }