Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support iterators and views in grpc in firewood #533

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
bee6c15
rename proto
Feb 15, 2024
a21c255
nit newline
Feb 15, 2024
e29e5fa
Merge branch 'main' into danlaine/view-server-proto
Feb 15, 2024
3ebbb71
fix NewViewResponse
Feb 16, 2024
8c93966
Merge branch 'main' into danlaine/view-server-proto
rkuris Feb 16, 2024
6d2e47c
Framework for MerkleService
rkuris Feb 16, 2024
7d3046b
Improve build.rs
rkuris Feb 16, 2024
1f40f0d
Merge branch 'rkuris/improve-build-rs' into rkuris/implement-merkle-grpc
rkuris Feb 16, 2024
fd86e5c
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 16, 2024
f517585
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 16, 2024
f6320c0
More implementation of interface
rkuris Feb 16, 2024
1131c07
Implement view_release
rkuris Feb 16, 2024
df9b19b
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 17, 2024
ddadd4d
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 19, 2024
6d44238
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 19, 2024
e779124
Merge remote-tracking branch 'origin/main' into rkuris/implement-merk…
rkuris Feb 20, 2024
6399d5f
Merged upstream changes
rkuris Feb 20, 2024
12d74b1
Use CommitError from the proto
rkuris Feb 20, 2024
6706e63
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 21, 2024
abd6bac
Merge branch 'main' into rkuris/implement-merkle-grpc
rkuris Feb 23, 2024
7d5ea3f
MerkleKeyValueStream should implement Debug
rkuris Feb 23, 2024
fe19314
WIP
rkuris Feb 24, 2024
4255e5b
Format
rkuris Feb 24, 2024
6236f2a
Merge branch 'rkuris/mkv-stream-debug-impl' into rkuris/implement-mer…
rkuris Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::{
};
use tokio::task::block_in_place;

mod proposal;
pub mod proposal;

use self::proposal::ProposalBase;

Expand Down
2 changes: 1 addition & 1 deletion firewood/src/db/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum ProposalBase {
}

#[async_trait]
impl crate::v2::api::Proposal for Proposal {
impl api::Proposal for Proposal {
type Proposal = Proposal;

#[allow(clippy::unwrap_used)]
Expand Down
2 changes: 1 addition & 1 deletion grpc-testtool/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
// we want to import these proto files
let import_protos = ["sync", "rpcdb", "process-server"];
let import_protos = ["sync", "rpcdb", "process-server", "merkle"];

let protos: Box<[PathBuf]> = import_protos
.into_iter()
Expand Down
101 changes: 101 additions & 0 deletions grpc-testtool/proto/merkle/merkle.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
syntax = "proto3";

package merkle;

import "google/protobuf/empty.proto";

service Merkle {
rpc NewProposal(NewProposalRequest) returns (NewProposalResponse);
rpc ProposalCommit(ProposalCommitRequest) returns (ProposalCommitResponse);

rpc NewView(NewViewRequest) returns (NewViewResponse);
// The methods below may be called with a view ID that corresponds to either a (committable) proposal
// or (non-committable) historical view.
rpc ViewHas(ViewHasRequest) returns (ViewHasResponse);
rpc ViewGet(ViewGetRequest) returns (ViewGetResponse);
rpc ViewNewIteratorWithStartAndPrefix(ViewNewIteratorWithStartAndPrefixRequest) returns (ViewNewIteratorWithStartAndPrefixResponse);
rpc ViewRelease(ViewReleaseRequest) returns (google.protobuf.Empty);
}

message NewProposalRequest {
// If not given, the parent view is the current database revision.
optional uint32 parent_view_id = 1;
repeated PutRequest puts = 2;
repeated bytes deletes = 3;
}

message NewProposalResponse {
uint32 proposal_id = 1;
}

message ProposalCommitRequest {
uint32 proposal_id = 1;
}

message ProposalCommitResponse {
CommitError err = 1;
}

message NewViewRequest {
bytes root_id = 1;
}

message NewViewResponse {
uint32 view_id = 1;
}

message ViewHasRequest {
uint32 view_id = 1;
bytes key = 2;
}

message ViewHasResponse {
bool has = 1;
}

message ViewGetRequest {
uint32 view_id = 1;
bytes key = 2;
}

message ViewGetResponse {
bytes value = 1;
GetError err = 2;
}

message ViewNewIteratorWithStartAndPrefixRequest {
rkuris marked this conversation as resolved.
Show resolved Hide resolved
uint32 view_id = 1;
bytes start = 2;
bytes prefix = 3;
}

message ViewNewIteratorWithStartAndPrefixResponse {
uint32 iterator_id = 1;
}

message ViewReleaseRequest {
uint32 view_id = 1;
}

// TODO import this from the rpcdb package.
message PutRequest {
bytes key = 1;
bytes value = 2;
}

enum GetError {
// ERROR_UNSPECIFIED_GET is used to indicate that no error occurred.
ERROR_UNSPECIFIED_GET = 0;
ERROR_CLOSED_GET = 1;
ERROR_NOT_FOUND = 2;
}

enum CommitError {
// ERROR_UNSPECIFIED_COMMIT is used to indicate that no error occurred.
ERROR_UNSPECIFIED_COMMIT = 0;
ERROR_CLOSED_COMMIT = 1;
ERROR_INVALID = 2;
ERROR_COMMITTED = 3;
ERROR_PARENT_NOT_DATABASE = 4;
ERROR_NON_PROPOSAL_ID = 5;
}
5 changes: 5 additions & 0 deletions grpc-testtool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ pub mod process_server {
tonic::include_proto!("process");
}

pub mod merkle {
#![allow(clippy::unwrap_used, clippy::missing_const_for_fn)]
tonic::include_proto!("merkle");
}

pub mod service;

pub use service::Database as DatabaseService;
37 changes: 37 additions & 0 deletions grpc-testtool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use firewood::db::{Db, DbConfig};
use firewood::storage::WalConfig;
use firewood::v2::{api::Db as _, api::Error};

use std::fmt::Debug;
use std::path::Path;
use std::sync::atomic::AtomicU32;
use std::{
collections::HashMap,
ops::Deref,
Expand All @@ -19,6 +21,7 @@ use tonic::Status;

pub mod database;
pub mod db;
pub mod merkle;
pub mod process;

trait IntoStatusResultExt<T> {
Expand All @@ -44,6 +47,39 @@ impl<T> IntoStatusResultExt<T> for Result<T, Error> {
pub struct Database {
db: Db,
iterators: Arc<Mutex<Iterators>>,
views: Arc<Mutex<Views>>,
}
#[derive(Default, Debug)]
struct Views {
map: HashMap<u32, View>,
next_id: AtomicU32,
}

impl Views {
fn insert(&mut self, view: View) -> u32 {
let next_id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.map.insert(next_id, view);
next_id
}

fn delete(&mut self, view_id: u32) -> Option<View> {
self.map.remove(&view_id)
}
}

enum View {
Historical(Arc<<firewood::db::Db as firewood::v2::api::Db>::Historical>),
Proposal(Arc<<firewood::db::Db as firewood::v2::api::Db>::Proposal>),
}

// TODO: We manually implement Debug since Proposal does not, but probably should
impl Debug for View {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Historical(arg0) => f.debug_tuple("Historical").field(arg0).finish(),
Self::Proposal(_arg0) => f.debug_tuple("Proposal").finish(),
}
}
}

impl Database {
Expand All @@ -62,6 +98,7 @@ impl Database {
Ok(Self {
db,
iterators: Default::default(),
views: Default::default(),
})
}
}
Expand Down
132 changes: 132 additions & 0 deletions grpc-testtool/src/service/merkle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::sync::Arc;

use crate::merkle::{
merkle_server::Merkle as MerkleServiceTrait, NewProposalRequest, NewProposalResponse,
NewViewRequest, NewViewResponse, ProposalCommitRequest, ProposalCommitResponse, ViewGetRequest,
ViewGetResponse, ViewHasRequest, ViewHasResponse, ViewNewIteratorWithStartAndPrefixRequest,
ViewNewIteratorWithStartAndPrefixResponse, ViewReleaseRequest,
};
use firewood::{
db::{BatchOp, Proposal},
v2::api::{self, Db},
};
use tonic::{async_trait, Request, Response, Status};

use super::{IntoStatusResultExt, View};

//#[prost(uint32, optional, tag = "1")]
//pub parent_view_id: ::core::option::Option<u32>,
//#[prost(message, repeated, tag = "2")]
//pub puts: ::prost::alloc::vec::Vec<PutRequest>,
//#[prost(bytes = "vec", repeated, tag = "3")]
//pub deletes: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,

#[async_trait]
impl MerkleServiceTrait for super::Database {
async fn new_proposal(
&self,
req: Request<NewProposalRequest>,
) -> Result<Response<NewProposalResponse>, Status> {
let req = req.into_inner();

// convert the provided data into a set of BatchOp Put and Delete operations
let mut data: Vec<_> = req
.puts
.into_iter()
.map(|put| BatchOp::Put {
key: put.key,
value: put.value,
})
.collect();
data.extend(
req.deletes
.into_iter()
.map(|del| BatchOp::Delete { key: del }),
);

let mut views = self.views.lock().await;

// the proposal depends on the parent_view_id. If it's None, then we propose on the db itself
// Otherwise, we're provided a base proposal to base it off of, so go fetch that
// proposal from the views
let proposal = match req.parent_view_id {
None => self.db.propose(data).await,
Some(parent_id) => {
let view = views.map.get(&parent_id);
match view {
None => return Err(Status::invalid_argument("invalid view id")),
Some(View::Proposal(parent)) => {
firewood::v2::api::Proposal::propose(parent.clone(), data).await
}
Some(_) => return Err(Status::invalid_argument("non-proposal id")),
}
}
}
.into_status_result()?;

// compute the next view id
let view_id = views.insert(View::Proposal(Arc::new(proposal)));

let resp = Response::new(NewProposalResponse {
proposal_id: view_id,
});
Ok(resp)
}

async fn proposal_commit(
&self,
req: Request<ProposalCommitRequest>,
) -> Result<Response<ProposalCommitResponse>, Status> {
let mut views = self.views.lock().await;

match views.map.remove(&req.into_inner().proposal_id) {
None => return Err(Status::invalid_argument("invalid view id")),
Some(View::Proposal(proposal)) => proposal.commit(),
Some(_) => return Err(Status::invalid_argument("non-proposal id")),
}
.await
.into_status_result()?;
Ok(Response::new(ProposalCommitResponse { err: 0 }))
}

async fn new_view(
&self,
req: Request<NewViewRequest>,
) -> Result<Response<NewViewResponse>, Status> {
let hash = std::convert::TryInto::<[u8; 32]>::try_into(req.into_inner().root_id)
.map_err(|_| api::Error::InvalidProposal) // TODO: better error here?
.into_status_result()?;
let mut views = self.views.lock().await;
let view = self.db.revision(hash).await.into_status_result()?;
let view_id = views.insert(View::Historical(view));
Ok(Response::new(NewViewResponse { view_id }))
}

async fn view_has(
&self,
_req: Request<ViewHasRequest>,
) -> Result<Response<ViewHasResponse>, Status> {
todo!()
}

async fn view_get(
&self,
_req: Request<ViewGetRequest>,
) -> Result<Response<ViewGetResponse>, Status> {
todo!()
}

async fn view_new_iterator_with_start_and_prefix(
&self,
_req: Request<ViewNewIteratorWithStartAndPrefixRequest>,
) -> Result<Response<ViewNewIteratorWithStartAndPrefixResponse>, Status> {
todo!()
}

async fn view_release(&self, req: Request<ViewReleaseRequest>) -> Result<Response<()>, Status> {
let mut views = self.views.lock().await;
// we don't care if this works :/
views.delete(req.into_inner().view_id);
Ok(Response::new(()))
}
}
Loading