Skip to content

Commit

Permalink
kvdb-rocksdb: support the rocksdb/multi-threaded-cf feature
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant committed Mar 29, 2024
1 parent d5e9c1d commit 4b18051
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
4 changes: 2 additions & 2 deletions kvdb-rocksdb/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ impl<'a> IterationHandler for &'a DBAndColumns {

fn iter(self, col: u32, read_opts: ReadOptions) -> Self::Iterator {
match self.cf(col as usize) {
Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start))),
Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(&cf, read_opts, IteratorMode::Start))),
Err(e) => EitherIter::B(std::iter::once(Err(e))),
}
}

fn iter_with_prefix(self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator {
match self.cf(col as usize) {
Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(
cf,
&cf,
read_opts,
IteratorMode::From(prefix, Direction::Forward),
))),
Expand Down
17 changes: 9 additions & 8 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
};

use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB,
BlockBasedOptions, ColumnFamilyDescriptor, ColumnFamilyRef, Options, ReadOptions, WriteBatch, WriteOptions, DB
};

use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB};
Expand Down Expand Up @@ -251,7 +251,7 @@ struct DBAndColumns {
}

impl DBAndColumns {
fn cf(&self, i: usize) -> io::Result<&ColumnFamily> {
fn cf(&self, i: usize) -> io::Result<ColumnFamilyRef<'_>> {
let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?;
self.db
.cf_handle(&name)
Expand Down Expand Up @@ -377,6 +377,7 @@ impl Database {
Err(_) => {
// retry and create CFs
match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) {
#[allow(unused_mut)] // warns when `multi-threaded-cf` feature is enabled on rocksdb, as `create_cf` takes an &self.
Ok(mut db) => {
for (i, name) in column_names.iter().enumerate() {
let _ = db
Expand Down Expand Up @@ -436,23 +437,23 @@ impl Database {
match op {
DBOp::Insert { col: _, key, value } => {
stats_total_bytes += key.len() + value.len();
batch.put_cf(cf, &key, &value);
batch.put_cf(&cf, &key, &value);
},
DBOp::Delete { col: _, key } => {
// We count deletes as writes.
stats_total_bytes += key.len();
batch.delete_cf(cf, &key);
batch.delete_cf(&cf, &key);
},
DBOp::DeletePrefix { col, prefix } => {
let end_prefix = kvdb::end_prefix(&prefix[..]);
let no_end = end_prefix.is_none();
let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]);
batch.delete_range_cf(&cf, &prefix[..], &end_range[..]);
if no_end {
let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] };
for result in self.iter_with_prefix(col, prefix) {
let (key, _) = result?;
batch.delete_cf(cf, &key[..]);
batch.delete_cf(&cf, &key[..]);
}
}
},
Expand All @@ -470,7 +471,7 @@ impl Database {
self.stats.tally_reads(1);
let value = cfs
.db
.get_pinned_cf_opt(cf, key, &self.read_opts)
.get_pinned_cf_opt(&cf, key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

Expand Down Expand Up @@ -521,7 +522,7 @@ impl Database {
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
let cfs = &self.inner;
let cf = cfs.cf(col as usize)?;
match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) {
match cfs.db.property_int_value_cf(&cf, ESTIMATE_NUM_KEYS) {
Ok(estimate) => Ok(estimate.unwrap_or_default()),
Err(err_string) => Err(other_io_err(err_string)),
}
Expand Down

0 comments on commit 4b18051

Please sign in to comment.