diff --git a/crates/core/src/cnf/mod.rs b/crates/core/src/cnf/mod.rs index 619a7969b074..2fa2ccf01baa 100644 --- a/crates/core/src/cnf/mod.rs +++ b/crates/core/src/cnf/mod.rs @@ -42,16 +42,13 @@ pub static DATASTORE_CACHE_SIZE: LazyLock = lazy_env_parse!("SURREAL_DATASTORE_CACHE_SIZE", usize, 1_000); /// The maximum number of keys that should be scanned at once in general queries. -pub static NORMAL_FETCH_SIZE: LazyLock = lazy_env_parse!("SURREAL_NORMAL_FETCH_SIZE", u32, 50); +pub static NORMAL_FETCH_SIZE: LazyLock = + lazy_env_parse!("SURREAL_NORMAL_FETCH_SIZE", u32, 500); /// The maximum number of keys that should be scanned at once for export queries. pub static EXPORT_BATCH_SIZE: LazyLock = lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000); -/// The maximum number of keys that should be fetched when streaming range scans in a Scanner. -pub static MAX_STREAM_BATCH_SIZE: LazyLock = - lazy_env_parse!("SURREAL_MAX_STREAM_BATCH_SIZE", u32, 1000); - /// The maximum number of keys that should be scanned at once per concurrent indexing batch. pub static INDEXING_BATCH_SIZE: LazyLock = lazy_env_parse!("SURREAL_INDEXING_BATCH_SIZE", u32, 250); diff --git a/crates/core/src/kvs/api.rs b/crates/core/src/kvs/api.rs index 604f9bf4e3d0..5030db0e1dfa 100644 --- a/crates/core/src/kvs/api.rs +++ b/crates/core/src/kvs/api.rs @@ -203,9 +203,9 @@ pub trait Transaction { let end: Key = rng.end.into(); let mut next = Some(beg..end); while let Some(rng) = next { - let res = self.batch(rng, *NORMAL_FETCH_SIZE, true, version).await?; + let res = self.batch_keys_vals(rng, *NORMAL_FETCH_SIZE, version).await?; next = res.next; - for v in res.values.into_iter() { + for v in res.result.into_iter() { out.push(v); } } @@ -255,9 +255,9 @@ pub trait Transaction { let end: Key = rng.end.into(); let mut next = Some(beg..end); while let Some(rng) = next { - let res = self.batch(rng, *NORMAL_FETCH_SIZE, false, None).await?; + let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?; next = res.next; - for (k, _) in res.values.into_iter() { + for k in res.result.into_iter() { self.del(k).await?; } } @@ -307,15 +307,40 @@ pub trait Transaction { let end: Key = rng.end.into(); let mut next = Some(beg..end); while let Some(rng) = next { - let res = self.batch(rng, *NORMAL_FETCH_SIZE, false, None).await?; + let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?; next = res.next; - for (k, _) in res.values.into_iter() { + for k in res.result.into_iter() { self.clr(k).await?; } } Ok(()) } + /// Count the total number of keys within a range in the datastore. + /// + /// This function fetches the total key count from the underlying datastore in grouped batches. + #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] + async fn count(&mut self, rng: Range) -> Result + where + K: Into + Sprintable + Debug, + { + // Check to see if transaction is closed + if self.closed() { + return Err(Error::TxFinished); + } + // Continue with function logic + let mut len = 0; + let beg: Key = rng.start.into(); + let end: Key = rng.end.into(); + let mut next = Some(beg..end); + while let Some(rng) = next { + let res = self.batch_keys(rng, 10_000, None).await?; + next = res.next; + len += res.result.len(); + } + Ok(len) + } + /// Retrieve all the versions for a specific range of keys from the datastore. /// /// This function fetches all the versions for the full range of key-value pairs, in a single request to the underlying datastore. @@ -333,15 +358,14 @@ pub trait Transaction { /// Retrieve a batched scan over a specific range of keys in the datastore. /// - /// This function fetches keys or key-value pairs, in batches, with multiple requests to the underlying datastore. + /// This function fetches keys, in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn batch( + async fn batch_keys( &mut self, rng: Range, batch: u32, - values: bool, version: Option, - ) -> Result + ) -> Result, Error> where K: Into + Sprintable + Debug, { @@ -353,21 +377,13 @@ pub trait Transaction { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); // Scan for the next batch - let res = if values { - self.scan(beg..end.clone(), batch, version).await? - } else { - self.keys(beg..end.clone(), batch, version) - .await? - .into_iter() - .map(|k| (k, vec![])) - .collect::>() - }; + let res = self.keys(beg..end.clone(), batch, version).await?; // Check if range is consumed if res.len() < batch as usize && batch > 0 { - Ok(Batch::new(None, res)) + Ok(Batch::::new(None, res)) } else { match res.last() { - Some((k, _)) => Ok(Batch::new( + Some(k) => Ok(Batch::::new( Some(Range { start: k.clone().add(0x00), end, @@ -377,13 +393,21 @@ pub trait Transaction { // We have checked the length above, so // there should be a last item in the // vector, so we shouldn't arrive here - None => Ok(Batch::new(None, res)), + None => Ok(Batch::::new(None, res)), } } } + /// Retrieve a batched scan over a specific range of keys in the datastore. + /// + /// This function fetches key-value pairs, in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] - async fn batch_versions(&mut self, rng: Range, batch: u32) -> Result + async fn batch_keys_vals( + &mut self, + rng: Range, + batch: u32, + version: Option, + ) -> Result, Error> where K: Into + Sprintable + Debug, { @@ -394,16 +418,55 @@ pub trait Transaction { // Continue with function logic let beg: Key = rng.start.into(); let end: Key = rng.end.into(); + // Scan for the next batch + let res = self.scan(beg..end.clone(), batch, version).await?; + // Check if range is consumed + if res.len() < batch as usize && batch > 0 { + Ok(Batch::<(Key, Val)>::new(None, res)) + } else { + match res.last() { + Some((k, _)) => Ok(Batch::<(Key, Val)>::new( + Some(Range { + start: k.clone().add(0x00), + end, + }), + res, + )), + // We have checked the length above, so + // there should be a last item in the + // vector, so we shouldn't arrive here + None => Ok(Batch::<(Key, Val)>::new(None, res)), + } + } + } + /// Retrieve a batched scan over a specific range of keys in the datastore. + /// + /// This function fetches key-value-version pairs, in batches, with multiple requests to the underlying datastore. + #[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))] + async fn batch_keys_vals_versions( + &mut self, + rng: Range, + batch: u32, + ) -> Result, Error> + where + K: Into + Sprintable + Debug, + { + // Check to see if transaction is closed + if self.closed() { + return Err(Error::TxFinished); + } + // Continue with function logic + let beg: Key = rng.start.into(); + let end: Key = rng.end.into(); // Scan for the next batch let res = self.scan_all_versions(beg..end.clone(), batch).await?; - // Check if range is consumed if res.len() < batch as usize && batch > 0 { - Ok(Batch::new_versioned(None, res)) + Ok(Batch::<(Key, Val, Version, bool)>::new(None, res)) } else { match res.last() { - Some((k, _, _, _)) => Ok(Batch::new_versioned( + Some((k, _, _, _)) => Ok(Batch::<(Key, Val, Version, bool)>::new( Some(Range { start: k.clone().add(0x00), end, @@ -413,7 +476,7 @@ pub trait Transaction { // We have checked the length above, so // there should be a last item in the // vector, so we shouldn't arrive here - None => Ok(Batch::new_versioned(None, res)), + None => Ok(Batch::<(Key, Val, Version, bool)>::new(None, res)), } } } diff --git a/crates/core/src/kvs/batch.rs b/crates/core/src/kvs/batch.rs index 15fd38efcf49..9e95e9bcc0ed 100644 --- a/crates/core/src/kvs/batch.rs +++ b/crates/core/src/kvs/batch.rs @@ -1,35 +1,19 @@ use super::Key; -use super::Val; -use super::Version; use std::ops::Range; /// A batch scan result returned from the [`Transaction::batch`] or [`Transactor::batch`] functions. #[derive(Debug)] -pub struct Batch { +pub struct Batch { pub next: Option>, - pub values: Vec<(Key, Val)>, - pub versioned_values: Vec<(Key, Val, Version, bool)>, + pub result: Vec, } -impl Batch { +impl Batch { /// Create a new batch scan result. - pub fn new(next: Option>, values: Vec<(Key, Val)>) -> Self { + pub fn new(next: Option>, result: Vec) -> Self { Self { next, - values, - versioned_values: vec![], - } - } - - /// Create a new batch scan result with versioned values. - pub fn new_versioned( - next: Option>, - versioned_values: Vec<(Key, Val, Version, bool)>, - ) -> Self { - Self { - next, - values: vec![], - versioned_values, + result, } } } diff --git a/crates/core/src/kvs/export.rs b/crates/core/src/kvs/export.rs index 6cd384bf33e4..161928836736 100644 --- a/crates/core/src/kvs/export.rs +++ b/crates/core/src/kvs/export.rs @@ -334,23 +334,21 @@ impl Transaction { while let Some(rng) = next { if cfg.versions { - let batch = self.batch_versions(rng, *EXPORT_BATCH_SIZE).await?; + let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?; next = batch.next; - let values = batch.versioned_values; // If there are no versioned values, return early. - if values.is_empty() { + if batch.result.is_empty() { break; } - self.export_versioned_data(values, chn).await?; + self.export_versioned_data(batch.result, chn).await?; } else { - let batch = self.batch(rng, *EXPORT_BATCH_SIZE, true, None).await?; + let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?; next = batch.next; // If there are no values, return early. - let values = batch.values; - if values.is_empty() { + if batch.result.is_empty() { break; } - self.export_regular_data(values, chn).await?; + self.export_regular_data(batch.result, chn).await?; } // Fetch more records continue; diff --git a/crates/core/src/kvs/index.rs b/crates/core/src/kvs/index.rs index e7c29984f5c7..33081b3ef011 100644 --- a/crates/core/src/kvs/index.rs +++ b/crates/core/src/kvs/index.rs @@ -303,13 +303,13 @@ impl Building { while let Some(rng) = next { // Get the next batch of records let tx = self.new_read_tx().await?; - let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true, None).await); + let batch = catch!(tx, tx.batch_keys_vals(rng, *INDEXING_BATCH_SIZE, None).await); // We can release the read transaction drop(tx); // Set the next scan range next = batch.next; // Check there are records - if batch.values.is_empty() { + if batch.result.is_empty() { // If not, we are with the initial indexing break; } @@ -317,7 +317,7 @@ impl Building { let ctx = self.new_write_tx_ctx().await?; let tx = ctx.tx(); // Index the batch - catch!(tx, self.index_initial_batch(&ctx, &tx, batch.values, &mut count).await); + catch!(tx, self.index_initial_batch(&ctx, &tx, batch.result, &mut count).await); tx.commit().await?; } // Second iteration, we index/remove any records that has been added or removed since the initial indexing diff --git a/crates/core/src/kvs/node.rs b/crates/core/src/kvs/node.rs index 1ddacb521344..202da76cde61 100644 --- a/crates/core/src/kvs/node.rs +++ b/crates/core/src/kvs/node.rs @@ -161,9 +161,10 @@ impl Datastore { // Pause and yield execution yield_now!(); // Fetch the next batch of keys and values - let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await); + let max = *NORMAL_FETCH_SIZE; + let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await); next = res.next; - for (k, v) in res.values.iter() { + for (k, v) in res.result.iter() { // Decode the data for this live query let val: Live = v.into(); // Get the key for this node live query @@ -249,9 +250,13 @@ impl Datastore { let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name); let mut next = Some(beg..end); while let Some(rng) = next { - let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await); + // Pause and yield execution + yield_now!(); + // Fetch the next batch of keys and values + let max = *NORMAL_FETCH_SIZE; + let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await); next = res.next; - for (k, v) in res.values.iter() { + for (k, v) in res.result.iter() { // Decode the LIVE query statement let stm: LiveStatement = v.into(); // Get the node id and the live query id diff --git a/crates/core/src/kvs/scanner.rs b/crates/core/src/kvs/scanner.rs index 461f049f9e50..ad699f41780b 100644 --- a/crates/core/src/kvs/scanner.rs +++ b/crates/core/src/kvs/scanner.rs @@ -1,7 +1,6 @@ use super::tx::Transaction; use super::Key; use super::Val; -use crate::cnf::MAX_STREAM_BATCH_SIZE; use crate::err::Error; use futures::stream::Stream; use futures::Future; @@ -67,12 +66,10 @@ impl Stream for Scanner<'_, (Key, Val)> { } // Check if there is no pending future task if self.future.is_none() { - // Set the max number of results to fetch - let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch); // Clone the range to use when scanning let range = self.range.clone(); // Prepare a future to scan for results - self.future = Some(Box::pin(self.store.scan(range, num, self.version))); + self.future = Some(Box::pin(self.store.scan(range, self.batch, self.version))); } // Try to resolve the future match self.future.as_mut().unwrap().poll_unpin(cx) { @@ -134,12 +131,10 @@ impl Stream for Scanner<'_, Key> { } // Check if there is no pending future task if self.future.is_none() { - // Set the max number of results to fetch - let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch); // Clone the range to use when scanning let range = self.range.clone(); // Prepare a future to scan for results - self.future = Some(Box::pin(self.store.keys(range, num, self.version))); + self.future = Some(Box::pin(self.store.keys(range, self.batch, self.version))); } // Try to resolve the future match self.future.as_mut().unwrap().poll_unpin(cx) { diff --git a/crates/core/src/kvs/tests/raw.rs b/crates/core/src/kvs/tests/raw.rs index d199dc016ae5..aee8bcbf1737 100644 --- a/crates/core/src/kvs/tests/raw.rs +++ b/crates/core/src/kvs/tests/raw.rs @@ -297,8 +297,9 @@ async fn batch() { tx.commit().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let res = tx.batch("test1".as_bytes().."test9".as_bytes(), u32::MAX, true, None).await.unwrap(); - let val = res.values; + let rng = "test1".as_bytes().."test9".as_bytes(); + let res = tx.batch_keys_vals(rng, u32::MAX, None).await.unwrap(); + let val = res.result; assert_eq!(val.len(), 5); assert_eq!(val[0].0, b"test1"); assert_eq!(val[0].1, b"1"); @@ -313,8 +314,9 @@ async fn batch() { tx.cancel().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true, None).await.unwrap(); - let val = res.values; + let rng = "test2".as_bytes().."test4".as_bytes(); + let res = tx.batch_keys_vals(rng, u32::MAX, None).await.unwrap(); + let val = res.result; assert_eq!(val.len(), 2); assert_eq!(val[0].0, b"test2"); assert_eq!(val[0].1, b"2"); @@ -323,8 +325,9 @@ async fn batch() { tx.cancel().await.unwrap(); // Create a readonly transaction let mut tx = ds.transaction(Read, Optimistic).await.unwrap().inner(); - let res = tx.batch("test2".as_bytes().."test4".as_bytes(), u32::MAX, true, None).await.unwrap(); - let val = res.values; + let rng = "test2".as_bytes().."test4".as_bytes(); + let res = tx.batch_keys_vals(rng, u32::MAX, None).await.unwrap(); + let val = res.result; assert_eq!(val.len(), 2); assert_eq!(val[0].0, b"test2"); assert_eq!(val[0].1, b"2"); diff --git a/crates/core/src/kvs/tr.rs b/crates/core/src/kvs/tr.rs index a3792fe072c5..a05108f96008 100644 --- a/crates/core/src/kvs/tr.rs +++ b/crates/core/src/kvs/tr.rs @@ -2,6 +2,7 @@ use super::api::Transaction; use super::Key; use super::Val; +use super::Version; use crate::cf; use crate::dbs::node::Timestamp; use crate::doc::CursorValue; @@ -468,30 +469,68 @@ impl Transactor { /// Retrieve a batched scan over a specific range of keys in the datastore. /// - /// This function fetches keys or key-value pairs, in batches, with multiple requests to the underlying datastore. + /// This function fetches keys, in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] - pub async fn batch( + pub async fn batch_keys( &mut self, rng: Range, batch: u32, - values: bool, version: Option, - ) -> Result + ) -> Result, Error> where K: Into + Debug, { let beg: Key = rng.start.into(); let end: Key = rng.end.into(); let rng = beg.as_slice()..end.as_slice(); - trace!(target: TARGET, rng = rng.sprint(), values = values, version = version, "Batch"); - expand_inner!(&mut self.inner, v => { v.batch(beg..end, batch, values, version).await }) + trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch"); + expand_inner!(&mut self.inner, v => { v.batch_keys(beg..end, batch, version).await }) + } + + /// Count the total number of keys within a range in the datastore. + /// + /// This function fetches the total count, in batches, with multiple requests to the underlying datastore. + #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] + pub async fn count(&mut self, rng: Range) -> Result + where + K: Into + Debug, + { + let beg: Key = rng.start.into(); + let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), "Count"); + expand_inner!(&mut self.inner, v => { v.count(beg..end).await }) + } + + /// Retrieve a batched scan over a specific range of keys in the datastore. + /// + /// This function fetches key-value pairs, in batches, with multiple requests to the underlying datastore. + #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] + pub async fn batch_keys_vals( + &mut self, + rng: Range, + batch: u32, + version: Option, + ) -> Result, Error> + where + K: Into + Debug, + { + let beg: Key = rng.start.into(); + let end: Key = rng.end.into(); + let rng = beg.as_slice()..end.as_slice(); + trace!(target: TARGET, rng = rng.sprint(), version = version, "Batch"); + expand_inner!(&mut self.inner, v => { v.batch_keys_vals(beg..end, batch, version).await }) } /// Retrieve a batched scan of all versions over a specific range of keys in the datastore. /// - /// This function fetches (key, value, version and deleted) pairs, in batches, with multiple requests to the underlying datastore. + /// This function fetches key-value-version pairs, in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tr", skip_all)] - pub async fn batch_versions(&mut self, rng: Range, batch: u32) -> Result + pub async fn batch_keys_vals_versions( + &mut self, + rng: Range, + batch: u32, + ) -> Result, Error> where K: Into + Debug, { @@ -499,7 +538,7 @@ impl Transactor { let end: Key = rng.end.into(); let rng = beg.as_slice()..end.as_slice(); trace!(target: TARGET, rng = rng.sprint(), "BatchVersions"); - expand_inner!(&mut self.inner, v => { v.batch_versions(beg..end, batch).await }) + expand_inner!(&mut self.inner, v => { v.batch_keys_vals_versions(beg..end, batch).await }) } /// Obtain a new change timestamp for a key diff --git a/crates/core/src/kvs/tx.rs b/crates/core/src/kvs/tx.rs index f615e8cf23c2..1f39ca7dfc71 100644 --- a/crates/core/src/kvs/tx.rs +++ b/crates/core/src/kvs/tx.rs @@ -3,6 +3,7 @@ use super::tr::Check; use super::Convert; use super::Key; use super::Val; +use super::Version; use crate::cnf::NORMAL_FETCH_SIZE; use crate::dbs::node::Node; use crate::err::Error; @@ -305,32 +306,62 @@ impl Transaction { self.lock().await.scan(rng, limit, version).await } + /// Count the total number of keys within a range in the datastore. + /// + /// This function fetches the total count, in batches, with multiple requests to the underlying datastore. + #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] + pub async fn count(&self, rng: Range) -> Result + where + K: Into + Debug, + { + self.lock().await.count(rng).await + } + /// Retrieve a batched scan over a specific range of keys in the datastore. /// - /// This function fetches the key-value pairs in batches, with multiple requests to the underlying datastore. + /// This function fetches the keys in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] - pub async fn batch( + pub async fn batch_keys( &self, rng: Range, batch: u32, - values: bool, version: Option, - ) -> Result + ) -> Result, Error> where K: Into + Debug, { - self.lock().await.batch(rng, batch, values, version).await + self.lock().await.batch_keys(rng, batch, version).await } - /// Retrieve a batched scan to scan all versions over a specific range of keys in the datastore. + /// Retrieve a batched scan over a specific range of keys in the datastore. /// /// This function fetches the key-value pairs in batches, with multiple requests to the underlying datastore. #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] - pub async fn batch_versions(&self, rng: Range, batch: u32) -> Result + pub async fn batch_keys_vals( + &self, + rng: Range, + batch: u32, + version: Option, + ) -> Result, Error> + where + K: Into + Debug, + { + self.lock().await.batch_keys_vals(rng, batch, version).await + } + + /// Retrieve a batched scan over a specific range of keys in the datastore. + /// + /// This function fetches the key-value-version pairs in batches, with multiple requests to the underlying datastore. + #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)] + pub async fn batch_keys_vals_versions( + &self, + rng: Range, + batch: u32, + ) -> Result, Error> where K: Into + Debug, { - self.lock().await.batch_versions(rng, batch).await + self.lock().await.batch_keys_vals_versions(rng, batch).await } /// Retrieve a stream over a specific range of keys in the datastore.