Skip to content

Commit

Permalink
Add Tx::count(), Tr::count(), and kvs::count() methods (surreal…
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiemh authored Jan 9, 2025
1 parent 8738257 commit 4065704
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 98 deletions.
7 changes: 2 additions & 5 deletions crates/core/src/cnf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,13 @@ pub static DATASTORE_CACHE_SIZE: LazyLock<usize> =
lazy_env_parse!("SURREAL_DATASTORE_CACHE_SIZE", usize, 1_000);

/// The maximum number of keys that should be scanned at once in general queries.
pub static NORMAL_FETCH_SIZE: LazyLock<u32> = lazy_env_parse!("SURREAL_NORMAL_FETCH_SIZE", u32, 50);
pub static NORMAL_FETCH_SIZE: LazyLock<u32> =
lazy_env_parse!("SURREAL_NORMAL_FETCH_SIZE", u32, 500);

/// The maximum number of keys that should be scanned at once for export queries.
pub static EXPORT_BATCH_SIZE: LazyLock<u32> =
lazy_env_parse!("SURREAL_EXPORT_BATCH_SIZE", u32, 1000);

/// The maximum number of keys that should be fetched when streaming range scans in a Scanner.
pub static MAX_STREAM_BATCH_SIZE: LazyLock<u32> =
lazy_env_parse!("SURREAL_MAX_STREAM_BATCH_SIZE", u32, 1000);

/// The maximum number of keys that should be scanned at once per concurrent indexing batch.
pub static INDEXING_BATCH_SIZE: LazyLock<u32> =
lazy_env_parse!("SURREAL_INDEXING_BATCH_SIZE", u32, 250);
Expand Down
117 changes: 90 additions & 27 deletions crates/core/src/kvs/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ pub trait Transaction {
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch(rng, *NORMAL_FETCH_SIZE, true, version).await?;
let res = self.batch_keys_vals(rng, *NORMAL_FETCH_SIZE, version).await?;
next = res.next;
for v in res.values.into_iter() {
for v in res.result.into_iter() {
out.push(v);
}
}
Expand Down Expand Up @@ -255,9 +255,9 @@ pub trait Transaction {
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch(rng, *NORMAL_FETCH_SIZE, false, None).await?;
let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?;
next = res.next;
for (k, _) in res.values.into_iter() {
for k in res.result.into_iter() {
self.del(k).await?;
}
}
Expand Down Expand Up @@ -307,15 +307,40 @@ pub trait Transaction {
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch(rng, *NORMAL_FETCH_SIZE, false, None).await?;
let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?;
next = res.next;
for (k, _) in res.values.into_iter() {
for k in res.result.into_iter() {
self.clr(k).await?;
}
}
Ok(())
}

/// Count the total number of keys within a range in the datastore.
///
/// This function fetches the total key count from the underlying datastore in grouped batches.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// Check to see if transaction is closed
if self.closed() {
return Err(Error::TxFinished);
}
// Continue with function logic
let mut len = 0;
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch_keys(rng, 10_000, None).await?;
next = res.next;
len += res.result.len();
}
Ok(len)
}

/// Retrieve all the versions for a specific range of keys from the datastore.
///
/// This function fetches all the versions for the full range of key-value pairs, in a single request to the underlying datastore.
Expand All @@ -333,15 +358,14 @@ pub trait Transaction {

/// Retrieve a batched scan over a specific range of keys in the datastore.
///
/// This function fetches keys or key-value pairs, in batches, with multiple requests to the underlying datastore.
/// This function fetches keys, in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch<K>(
async fn batch_keys<K>(
&mut self,
rng: Range<K>,
batch: u32,
values: bool,
version: Option<u64>,
) -> Result<Batch, Error>
) -> Result<Batch<Key>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
Expand All @@ -353,21 +377,13 @@ pub trait Transaction {
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
// Scan for the next batch
let res = if values {
self.scan(beg..end.clone(), batch, version).await?
} else {
self.keys(beg..end.clone(), batch, version)
.await?
.into_iter()
.map(|k| (k, vec![]))
.collect::<Vec<(Key, Val)>>()
};
let res = self.keys(beg..end.clone(), batch, version).await?;
// Check if range is consumed
if res.len() < batch as usize && batch > 0 {
Ok(Batch::new(None, res))
Ok(Batch::<Key>::new(None, res))
} else {
match res.last() {
Some((k, _)) => Ok(Batch::new(
Some(k) => Ok(Batch::<Key>::new(
Some(Range {
start: k.clone().add(0x00),
end,
Expand All @@ -377,13 +393,21 @@ pub trait Transaction {
// We have checked the length above, so
// there should be a last item in the
// vector, so we shouldn't arrive here
None => Ok(Batch::new(None, res)),
None => Ok(Batch::<Key>::new(None, res)),
}
}
}

/// Retrieve a batched scan over a specific range of keys in the datastore.
///
/// This function fetches key-value pairs, in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch_versions<K>(&mut self, rng: Range<K>, batch: u32) -> Result<Batch, Error>
async fn batch_keys_vals<K>(
&mut self,
rng: Range<K>,
batch: u32,
version: Option<u64>,
) -> Result<Batch<(Key, Val)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
Expand All @@ -394,16 +418,55 @@ pub trait Transaction {
// Continue with function logic
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
// Scan for the next batch
let res = self.scan(beg..end.clone(), batch, version).await?;
// Check if range is consumed
if res.len() < batch as usize && batch > 0 {
Ok(Batch::<(Key, Val)>::new(None, res))
} else {
match res.last() {
Some((k, _)) => Ok(Batch::<(Key, Val)>::new(
Some(Range {
start: k.clone().add(0x00),
end,
}),
res,
)),
// We have checked the length above, so
// there should be a last item in the
// vector, so we shouldn't arrive here
None => Ok(Batch::<(Key, Val)>::new(None, res)),
}
}
}

/// Retrieve a batched scan over a specific range of keys in the datastore.
///
/// This function fetches key-value-version pairs, in batches, with multiple requests to the underlying datastore.
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch_keys_vals_versions<K>(
&mut self,
rng: Range<K>,
batch: u32,
) -> Result<Batch<(Key, Val, Version, bool)>, Error>
where
K: Into<Key> + Sprintable + Debug,
{
// Check to see if transaction is closed
if self.closed() {
return Err(Error::TxFinished);
}
// Continue with function logic
let beg: Key = rng.start.into();
let end: Key = rng.end.into();
// Scan for the next batch
let res = self.scan_all_versions(beg..end.clone(), batch).await?;

// Check if range is consumed
if res.len() < batch as usize && batch > 0 {
Ok(Batch::new_versioned(None, res))
Ok(Batch::<(Key, Val, Version, bool)>::new(None, res))
} else {
match res.last() {
Some((k, _, _, _)) => Ok(Batch::new_versioned(
Some((k, _, _, _)) => Ok(Batch::<(Key, Val, Version, bool)>::new(
Some(Range {
start: k.clone().add(0x00),
end,
Expand All @@ -413,7 +476,7 @@ pub trait Transaction {
// We have checked the length above, so
// there should be a last item in the
// vector, so we shouldn't arrive here
None => Ok(Batch::new_versioned(None, res)),
None => Ok(Batch::<(Key, Val, Version, bool)>::new(None, res)),
}
}
}
Expand Down
26 changes: 5 additions & 21 deletions crates/core/src/kvs/batch.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
use super::Key;
use super::Val;
use super::Version;
use std::ops::Range;

/// A batch scan result returned from the [`Transaction::batch`] or [`Transactor::batch`] functions.
#[derive(Debug)]
pub struct Batch {
pub struct Batch<T> {
pub next: Option<Range<Key>>,
pub values: Vec<(Key, Val)>,
pub versioned_values: Vec<(Key, Val, Version, bool)>,
pub result: Vec<T>,
}

impl Batch {
impl<T> Batch<T> {
/// Create a new batch scan result.
pub fn new(next: Option<Range<Key>>, values: Vec<(Key, Val)>) -> Self {
pub fn new(next: Option<Range<Key>>, result: Vec<T>) -> Self {
Self {
next,
values,
versioned_values: vec![],
}
}

/// Create a new batch scan result with versioned values.
pub fn new_versioned(
next: Option<Range<Key>>,
versioned_values: Vec<(Key, Val, Version, bool)>,
) -> Self {
Self {
next,
values: vec![],
versioned_values,
result,
}
}
}
14 changes: 6 additions & 8 deletions crates/core/src/kvs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,23 +334,21 @@ impl Transaction {

while let Some(rng) = next {
if cfg.versions {
let batch = self.batch_versions(rng, *EXPORT_BATCH_SIZE).await?;
let batch = self.batch_keys_vals_versions(rng, *EXPORT_BATCH_SIZE).await?;
next = batch.next;
let values = batch.versioned_values;
// If there are no versioned values, return early.
if values.is_empty() {
if batch.result.is_empty() {
break;
}
self.export_versioned_data(values, chn).await?;
self.export_versioned_data(batch.result, chn).await?;
} else {
let batch = self.batch(rng, *EXPORT_BATCH_SIZE, true, None).await?;
let batch = self.batch_keys_vals(rng, *EXPORT_BATCH_SIZE, None).await?;
next = batch.next;
// If there are no values, return early.
let values = batch.values;
if values.is_empty() {
if batch.result.is_empty() {
break;
}
self.export_regular_data(values, chn).await?;
self.export_regular_data(batch.result, chn).await?;
}
// Fetch more records
continue;
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kvs/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,21 @@ impl Building {
while let Some(rng) = next {
// Get the next batch of records
let tx = self.new_read_tx().await?;
let batch = catch!(tx, tx.batch(rng, *INDEXING_BATCH_SIZE, true, None).await);
let batch = catch!(tx, tx.batch_keys_vals(rng, *INDEXING_BATCH_SIZE, None).await);
// We can release the read transaction
drop(tx);
// Set the next scan range
next = batch.next;
// Check there are records
if batch.values.is_empty() {
if batch.result.is_empty() {
// If not, we are with the initial indexing
break;
}
// Create a new context with a write transaction
let ctx = self.new_write_tx_ctx().await?;
let tx = ctx.tx();
// Index the batch
catch!(tx, self.index_initial_batch(&ctx, &tx, batch.values, &mut count).await);
catch!(tx, self.index_initial_batch(&ctx, &tx, batch.result, &mut count).await);
tx.commit().await?;
}
// Second iteration, we index/remove any records that has been added or removed since the initial indexing
Expand Down
13 changes: 9 additions & 4 deletions crates/core/src/kvs/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ impl Datastore {
// Pause and yield execution
yield_now!();
// Fetch the next batch of keys and values
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await);
let max = *NORMAL_FETCH_SIZE;
let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
next = res.next;
for (k, v) in res.values.iter() {
for (k, v) in res.result.iter() {
// Decode the data for this live query
let val: Live = v.into();
// Get the key for this node live query
Expand Down Expand Up @@ -249,9 +250,13 @@ impl Datastore {
let end = crate::key::table::lq::suffix(&ns.name, &db.name, &tb.name);
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = catch!(txn, txn.batch(rng, *NORMAL_FETCH_SIZE, true, None).await);
// Pause and yield execution
yield_now!();
// Fetch the next batch of keys and values
let max = *NORMAL_FETCH_SIZE;
let res = catch!(txn, txn.batch_keys_vals(rng, max, None).await);
next = res.next;
for (k, v) in res.values.iter() {
for (k, v) in res.result.iter() {
// Decode the LIVE query statement
let stm: LiveStatement = v.into();
// Get the node id and the live query id
Expand Down
9 changes: 2 additions & 7 deletions crates/core/src/kvs/scanner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::tx::Transaction;
use super::Key;
use super::Val;
use crate::cnf::MAX_STREAM_BATCH_SIZE;
use crate::err::Error;
use futures::stream::Stream;
use futures::Future;
Expand Down Expand Up @@ -67,12 +66,10 @@ impl Stream for Scanner<'_, (Key, Val)> {
}
// Check if there is no pending future task
if self.future.is_none() {
// Set the max number of results to fetch
let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch);
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.scan(range, num, self.version)));
self.future = Some(Box::pin(self.store.scan(range, self.batch, self.version)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {
Expand Down Expand Up @@ -134,12 +131,10 @@ impl Stream for Scanner<'_, Key> {
}
// Check if there is no pending future task
if self.future.is_none() {
// Set the max number of results to fetch
let num = std::cmp::min(*MAX_STREAM_BATCH_SIZE, self.batch);
// Clone the range to use when scanning
let range = self.range.clone();
// Prepare a future to scan for results
self.future = Some(Box::pin(self.store.keys(range, num, self.version)));
self.future = Some(Box::pin(self.store.keys(range, self.batch, self.version)));
}
// Try to resolve the future
match self.future.as_mut().unwrap().poll_unpin(cx) {
Expand Down
Loading

0 comments on commit 4065704

Please sign in to comment.