Skip to content

Commit

Permalink
Feature: feature flag general-snapshot-data
Browse files Browse the repository at this point in the history
Add feature flag `general-snapshot-data`: when enabled, `SnapshotData`
does not have `AsyncSeek + AsyncRead + AsyncWrite` bound.
This enables application to define their own snapshot format.

If this feature flag is not eabled, no changes are required for application to upgrade Openraft.

On the sending end(leader that sends snapshot to follower):

- Without `general-snapshot-data`: `RaftNetwork::snapshot()`
  provides a default implementation that invokes the chunk based API
  `RaftNetwork::install_snapshot()` for transmit.

- With `general-snapshot-data` enabled: `RaftNetwork::snapshot()` must be
  implemented to provide application customized snapshot transmission.
  Application does not also use `RaftNetwork::install_snapshot()` for

On the receiving end(follower):

- `Raft::install_snapshot()` is available only when
  `general-snapshot-data` is disabled.

Add an example `examples/raft-kv-memstore-general-snapshot-data` with
`general-snapshot-data` enabled.
In this example snapshot is transmitted without fragmentation, i.e., via
`RaftNetwork::snapshot()`. The chunk based API
`RaftNetwork::install_snapshot()` is not used.
In a production scenario, a snapshot can be transmitted in arbitrary
manner.

- Fix: databendlabs#606
  • Loading branch information
drmingdrmer committed Feb 19, 2024
1 parent 687fcf2 commit 8ca933f
Show file tree
Hide file tree
Showing 26 changed files with 1,041 additions and 65 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ jobs:
- "nightly"
example:
- "raft-kv-memstore"
- "raft-kv-memstore-general-snapshot-data"
- "raft-kv-memstore-singlethreaded"
- "raft-kv-rocksdb"

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ exclude = [
"stores/rocksstore-v2",
"examples/raft-kv-memstore",
"examples/raft-kv-memstore-singlethreaded",
"examples/raft-kv-memstore-general-snapshot-data",
"examples/raft-kv-rocksdb",
]
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
35 changes: 35 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "raft-kv-memstore-general-snapshot-data"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"drdr xp <[email protected]>",
"Pedro Paulo de Amorim <[email protected]>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["serde", "storage-v2", "general-snapshot-data"] }

clap = { version = "4.1.11", features = ["derive", "env"] }
reqwest = { version = "0.11.9", features = ["json"] }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.57"
tokio = { version = "1.0", default-features = false, features = ["sync"] }
tracing = "0.1.29"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

[dev-dependencies]
maplit = "1.0.2"

[features]

[package.metadata.docs.rs]
all-features = true
17 changes: 17 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Example Openraft kv-store with `general-snapshot-data` enabled

With `general-snapshot-data` feature flag enabled, Openraft allows application to use any data type for snapshot data,
instead of a single-file like data format with `AsyncSeek + AsyncRead + AsyncWrite + Unpin` bounds.

This example is similar to the basic raft-kv-memstore example
but focuses on how to handle snapshot with `general-snapshot-data` enabled.
Other aspects are minimized.

To send a complete snapshot, Refer to implementation of `RaftNetwork::snapshot()` in this example.

To receive a complete snapshot, Refer to implementation of `api::snapshot()` in this example.


## Run it

Run it with `cargo test -- --nocaputre`.
104 changes: 104 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! This mod implements a network API for raft node.

use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::error::RaftError;
use openraft::BasicNode;
use openraft::RaftMetrics;

use crate::app::App;
use crate::decode;
use crate::encode;
use crate::typ;
use crate::NodeId;

pub async fn write(app: &mut App, req: String) -> String {
let res = app.raft.client_write(decode(&req)).await;
encode(res)
}

pub async fn read(app: &mut App, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.ensure_linearizable().await;

let res = match ret {
Ok(_) => {
let state_machine = app.state_machine.state_machine.lock().unwrap();
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<NodeId, CheckIsLeaderError<NodeId, BasicNode>>> =
Ok(value.unwrap_or_default());
res
}
Err(e) => Err(e),
};
encode(res)
}

// Raft API

pub async fn vote(app: &mut App, req: String) -> String {
let res = app.raft.vote(decode(&req)).await;
encode(res)
}

pub async fn append(app: &mut App, req: String) -> String {
let res = app.raft.append_entries(decode(&req)).await;
encode(res)
}

/// Receive a snapshot and install it.
pub async fn snapshot(app: &mut App, req: String) -> String {
let (vote, snapshot_meta, snapshot_data): (typ::Vote, typ::SnapshotMeta, typ::SnapshotData) = decode(&req);
let snapshot = typ::Snapshot {
meta: snapshot_meta,
snapshot: Box::new(snapshot_data),
};
let res = app
.raft
.install_complete_snapshot(vote, snapshot)
.await
.map_err(|e| typ::RaftError::<typ::Infallible>::Fatal(e));
encode(res)
}

// Management API

/// Add a node as **Learner**.
///
/// A Learner receives log replication from the leader but does not vote.
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
pub async fn add_learner(app: &mut App, req: String) -> String {
let node_id: NodeId = decode(&req);
let node = BasicNode { addr: "".to_string() };
let res = app.raft.add_learner(node_id, node, true).await;
encode(res)
}

/// Changes specified learners to members, or remove members.
pub async fn change_membership(app: &mut App, req: String) -> String {
let node_ids: BTreeSet<NodeId> = decode(&req);
let res = app.raft.change_membership(node_ids, false).await;
encode(res)
}

/// Initialize a single-node cluster.
pub async fn init(app: &mut App) -> String {
let mut nodes = BTreeMap::new();
nodes.insert(app.id, BasicNode { addr: "".to_string() });
let res = app.raft.initialize(nodes).await;
encode(res)
}

/// Get the latest metrics of the cluster
pub async fn metrics(app: &mut App) -> String {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics<NodeId, BasicNode>, Infallible> = Ok(metrics);
encode(res)
}
73 changes: 73 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::sync::Arc;

use tokio::sync::mpsc;
use tokio::sync::oneshot;

use crate::api;
use crate::router::Router;
use crate::typ;
use crate::NodeId;
use crate::StateMachineStore;

pub type Path = String;
pub type Payload = String;
pub type ResponseTx = oneshot::Sender<String>;
pub type RequestTx = mpsc::UnboundedSender<(Path, Payload, ResponseTx)>;

/// Representation of an application state.
pub struct App {
pub id: NodeId,
pub raft: typ::Raft,

/// Receive application requests, Raft protocol request or management requests.
pub rx: mpsc::UnboundedReceiver<(Path, Payload, ResponseTx)>,
pub router: Router,

pub state_machine: Arc<StateMachineStore>,
}

impl App {
pub fn new(id: NodeId, raft: typ::Raft, router: Router, state_machine: Arc<StateMachineStore>) -> Self {
let (tx, rx) = mpsc::unbounded_channel();

{
let mut targets = router.targets.lock().unwrap();
targets.insert(id, tx);
}

Self {
id,
raft,
rx,
router,
state_machine,
}
}

pub async fn run(mut self) -> Option<()> {
loop {
let (path, payload, response_tx) = self.rx.recv().await?;

let res = match path.as_str() {
// Application API
"/app/write" => api::write(&mut self, payload).await,
"/app/read" => api::read(&mut self, payload).await,

// Raft API
"/raft/append" => api::append(&mut self, payload).await,
"/raft/snapshot" => api::snapshot(&mut self, payload).await,
"/raft/vote" => api::vote(&mut self, payload).await,

// Management API
"/mng/add-learner" => api::add_learner(&mut self, payload).await,
"/mng/change-membership" => api::change_membership(&mut self, payload).await,
"/mng/init" => api::init(&mut self).await,
"/mng/metrics" => api::metrics(&mut self).await,

_ => panic!("unknown path: {}", path),
};

response_tx.send(res).unwrap();
}
}
}
107 changes: 107 additions & 0 deletions examples/raft-kv-memstore-general-snapshot-data/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::sync::Arc;

use openraft::BasicNode;
use openraft::Config;
use openraft::TokioRuntime;

use crate::app::App;
use crate::router::Router;
use crate::store::Request;
use crate::store::Response;
use crate::store::StateMachineData;

pub mod router;

pub mod api;
pub mod app;
pub mod network;
pub mod store;

pub type NodeId = u64;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = Request,
R = Response,
NodeId = NodeId,
Node = BasicNode,
Entry = openraft::Entry<TypeConfig>,
// In this example, snapshot is just a copy of the state machine.
SnapshotData = StateMachineData,
AsyncRuntime = TokioRuntime
);

pub type LogStore = crate::store::LogStore;
pub type StateMachineStore = crate::store::StateMachineStore;

pub mod typ {
use openraft::BasicNode;

use crate::NodeId;
use crate::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<NodeId>;
pub type SnapshotMeta = openraft::SnapshotMeta<NodeId, BasicNode>;
pub type SnapshotData = <TypeConfig as openraft::RaftTypeConfig>::SnapshotData;
pub type Snapshot = openraft::Snapshot<TypeConfig>;

pub type Infallible = openraft::error::Infallible;
pub type Fatal = openraft::error::Fatal<NodeId>;
pub type RaftError<E = openraft::error::Infallible> = openraft::error::RaftError<NodeId, E>;
pub type RPCError<E = openraft::error::Infallible> = openraft::error::RPCError<NodeId, BasicNode, RaftError<E>>;
pub type StreamingError<E> = openraft::error::StreamingError<TypeConfig, E>;

pub type RaftMetrics = openraft::RaftMetrics<NodeId, BasicNode>;

pub type ClientWriteError = openraft::error::ClientWriteError<NodeId, BasicNode>;
pub type CheckIsLeaderError = openraft::error::CheckIsLeaderError<NodeId, BasicNode>;
pub type ForwardToLeader = openraft::error::ForwardToLeader<NodeId, BasicNode>;
pub type InitializeError = openraft::error::InitializeError<NodeId, BasicNode>;

pub type ClientWriteResponse = openraft::raft::ClientWriteResponse<TypeConfig>;
}

pub fn encode<T: serde::Serialize>(t: T) -> String {
serde_json::to_string(&t).unwrap()
}

pub fn decode<T: serde::de::DeserializeOwned>(s: &str) -> T {
serde_json::from_str(s).unwrap()
}

pub async fn new_raft(node_id: NodeId, router: Router) -> (typ::Raft, App) {
// Create a configuration for the raft instance.
let config = Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
// Once snapshot is built, delete the logs at once.
max_in_snapshot_log_to_keep: 0,
..Default::default()
};

let config = Arc::new(config.validate().unwrap());

// Create a instance of where the Raft logs will be stored.
let log_store = Arc::new(LogStore::default());

// Create a instance of where the state machine data will be stored.
let state_machine_store = Arc::new(StateMachineStore::default());

// Create a local raft instance.
let raft = openraft::Raft::new(node_id, config, router.clone(), log_store, state_machine_store.clone())
.await
.unwrap();

// Create an application that will store all the instances created above, this will
// later be used on the actix-web services.
let app = App::new(node_id, raft.clone(), router, state_machine_store);

(raft, app)
}
Loading

0 comments on commit 8ca933f

Please sign in to comment.