Skip to content

Commit

Permalink
refactor: add mget_id_value_compat()
Browse files Browse the repository at this point in the history
`mget_id_value_compat([names])` fetch a list of `name->id->value` from
meta-service with exact 2 mget() operations.

This method can be used to optimize batch get operations. For example to
get a list of `(DatabaseNameIdent, DatabaseId, DatabaseMeta)`.

This method requires the id type to be a `Id<xxxId>` type.

- Part of databendlabs#16421
  • Loading branch information
drmingdrmer committed Sep 9, 2024
1 parent 9ac1368 commit f2433f2
Showing 1 changed file with 196 additions and 0 deletions.
196 changes: 196 additions & 0 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;

use databend_common_meta_app::data_id::DataId;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::primitive::Id;
use databend_common_meta_app::tenant_key::ident::TIdent;
use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_app::KeyWithTenant;
Expand Down Expand Up @@ -305,3 +306,198 @@ where
IdRsc::ValueType: FromToProto + Send + Sync + 'static,
{
}

/// Similar to `NameIdValueApi`, but is compatibility with non-DataId ids.
#[tonic::async_trait]
pub trait NameIdValueApiCompat<K, IdTyp>: KVApi<Error = MetaError>
where
K: kvapi::Key<ValueType = Id<IdTyp>>,
K: KeyWithTenant,
K: Send + Sync + 'static,
Id<IdTyp>: FromToProto + Send + Sync + 'static,
IdTyp: kvapi::Key + Clone + Send + Sync + 'static,
IdTyp::ValueType: FromToProto + Send + Sync + 'static,
{
/// mget by names, returns a list of `(name, id, value)`
///
/// Returns an iterator of `(name, id, SeqV<value>)` tuples.
/// This function mget all the ids by names,
/// then get the `id->value` mapping and finally zip these two together.
///
/// If `name->id` or `id->value` mapping does not exist, it will be skipped.
/// Thus, the returned iterator may have less items than the input names.
// Using `async fn` does not allow using `impl Iterator` in the return type.
// Thus we use `impl Future` instead.
fn mget_id_value_compat(
&self,
names: impl IntoIterator<Item = K> + Send,
) -> impl Future<
Output = Result<impl Iterator<Item = (K, IdTyp, SeqV<IdTyp::ValueType>)>, MetaError>,
> + Send {
async move {
let strm = self.get_pb_stream(names).await?;
let name_ids = strm.try_collect::<Vec<_>>().await?;

let name_ids = name_ids
.into_iter()
.filter_map(|(k, seq_v)| seq_v.map(|x| (k, x.data.into_inner())))
.collect::<Vec<_>>();

let id_idents = name_ids
.iter()
.map(|(_k, id)| id.clone())
.collect::<Vec<_>>();

let strm = self.get_pb_values(id_idents).await?;
let seq_metas = strm.try_collect::<Vec<_>>().await?;

let name_id_values =
name_ids
.into_iter()
.zip(seq_metas)
.filter_map(|((k, id), opt_seq_meta)| {
opt_seq_meta.map(|seq_meta| (k, id, seq_meta))
});

Ok(name_id_values)
}
}
}

impl<K, IdTyp, T> NameIdValueApiCompat<K, IdTyp> for T
where
T: KVApi<Error = MetaError> + ?Sized,
K: kvapi::Key<ValueType = Id<IdTyp>>,
K: KeyWithTenant,
K: Send + Sync + 'static,
Id<IdTyp>: FromToProto + Send + Sync + 'static,
IdTyp: kvapi::Key + Clone + Send + Sync + 'static,
IdTyp::ValueType: FromToProto + Send + Sync + 'static,
{
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use async_trait::async_trait;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::DatabaseMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
use futures::StreamExt;
use prost::Message;

use crate::name_id_value_api::NameIdValueApiCompat;

struct Foo {
kvs: BTreeMap<String, SeqV>,
}

#[async_trait]
impl KVApi for Foo {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
unimplemented!()
}

async fn get_kv_stream(
&self,
keys: &[String],
) -> Result<KVStream<Self::Error>, Self::Error> {
let mut res = Vec::with_capacity(keys.len());
for (_i, key) in keys.iter().enumerate() {
let k = key.clone();
let v = self.kvs.get(key).cloned();

let item = StreamItem::new(k, v.map(|v| v.into()));
res.push(Ok(item));
}

let strm = futures::stream::iter(res);
Ok(strm.boxed())
}

async fn list_kv(&self, _prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
unimplemented!()
}

async fn transaction(&self, _txn: TxnRequest) -> Result<TxnReply, Self::Error> {
unimplemented!()
}
}

/// If the backend stream returns early, the returned stream should be filled with error item at the end.
#[tokio::test]
async fn test_mget_id_values() -> anyhow::Result<()> {
let db_meta = |i: u64| DatabaseMeta {
engine: i.to_string(),
engine_options: Default::default(),
options: Default::default(),
created_on: Default::default(),
updated_on: Default::default(),
comment: "".to_string(),
drop_on: None,
};

let v = db_meta(1).to_pb()?.encode_to_vec();

let db_id = |i| DatabaseId::new(i).to_string().as_bytes().to_vec();

let foo = Foo {
kvs: vec![
(s("__fd_database/tenant_foo/a"), SeqV::new(1, db_id(1))),
(s("__fd_database/tenant_foo/b"), SeqV::new(2, db_id(2))),
(s("__fd_database/tenant_foo/c"), SeqV::new(3, db_id(3))),
(s("__fd_database_by_id/1"), SeqV::new(4, v.clone())),
(s("__fd_database_by_id/2"), SeqV::new(5, v.clone())),
]
.into_iter()
.collect(),
};

// MGet key value pairs
{
let tenant = Tenant::new_literal("tenant_foo");
let it = foo
.mget_id_value_compat([
DatabaseNameIdent::new(&tenant, "a"),
DatabaseNameIdent::new(&tenant, "b"),
DatabaseNameIdent::new(&tenant, "c"),
DatabaseNameIdent::new(&tenant, "d"), // No such key
])
.await?;

let got = it.collect::<Vec<_>>();
assert_eq!(got, vec![
(
DatabaseNameIdent::new(&tenant, "a"),
DatabaseId::new(1),
SeqV::new(4, db_meta(1))
),
(
DatabaseNameIdent::new(&tenant, "b"),
DatabaseId::new(2),
SeqV::new(5, db_meta(1))
),
]);
}

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
}

0 comments on commit f2433f2

Please sign in to comment.