Skip to content
This repository has been archived by the owner on May 30, 2023. It is now read-only.

Commit

Permalink
Implement basic RPC for the demo app (#6)
Browse files Browse the repository at this point in the history
* Update for latest sov sdk

* Implement bank/slot RPC

* Upgrade to rpc-enabled SDK

* Add ledger RPC endpoint
  • Loading branch information
preston-evans98 authored Apr 25, 2023
1 parent 05f5c71 commit 9ca80dd
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 44 deletions.
23 changes: 10 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.6"
jupiter = { path ="../Jupiter"}
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
accounts = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
election = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-state = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["mocks"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-modules-macros = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
accounts = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
bank = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
election = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-state = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["mocks"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-modules-macros = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"

anyhow = "1.0.62"
jsonrpsee = { version = "0.16.2", features = ["http-client"] }

[patch.crates-io]
# Patch borsh until "bytes" support is added
borsh = { git = "https://github.com/preston-evans98/borsh-rs.git", branch = "release-2" }
jsonrpsee = { version = "0.16.2", features = ["http-client", "server"] }
70 changes: 45 additions & 25 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,56 @@
use crate::runtime::GenesisConfig;
use bank::TokenConfig;
use jsonrpsee::http_client::HeaderMap;
use jupiter::{
da_app::{CelestiaApp, TmHash},
da_service::{CelestiaService, FilteredCelestiaBlock},
};
use sha2::{Digest, Sha256};
use sov_app_template::{AppTemplate, Batch};
use sov_modules_api::mocks::MockContext;
use sov_modules_api::{mocks::MockContext, Address};
use sov_state::ProverStorage;
use sovereign_db::{
ledger_db::{LedgerDB, SlotCommitBuilder},
schema::types::{
BatchNumber, DbBytes, DbHash, EventNumber, Status, StoredBatch, StoredSlot,
StoredTransaction, TxNumber,
BatchNumber, DbBytes, EventNumber, Status, StoredBatch, StoredSlot, StoredTransaction,
TxNumber,
},
};
use sovereign_sdk::{
da::BlobTransactionTrait,
serial::Encode,
services::da::{DaService, SlotData},
spec::RollupSpec,
};
use sovereign_sdk::{da::DaLayerTrait, stf::StateTransitionFunction};
use sovereign_sdk::{db::SlotStore, serial::Decode};

use tracing::Level;
use tx_verifier_impl::DemoAppTxVerifier;

use crate::{
data_generation::QueryGenerator, helpers::run_query, runtime::Runtime,
tx_hooks_impl::DemoAppTxHooks,
};
use crate::{runtime::Runtime, tx_hooks_impl::DemoAppTxHooks};

mod data_generation;
mod helpers;
mod rpc;
mod runtime;
mod tx_hooks_impl;
mod tx_verifier_impl;

#[derive(Debug, Clone)]
struct Spec;

impl RollupSpec for Spec {
type SlotData = FilteredCelestiaBlock;

type Stf = DemoApp;

type Hasher = Sha256;
}

type C = MockContext;
type DemoApp = AppTemplate<C, DemoAppTxVerifier<C>, Runtime<C>, DemoAppTxHooks<C>>;
type DemoApp =
AppTemplate<C, DemoAppTxVerifier<C>, Runtime<C>, DemoAppTxHooks<C>, GenesisConfig<C>>;
const CELESTIA_NODE_AUTH_TOKEN: &'static str = "";

const START_HEIGHT: u64 = HEIGHT_OF_FIRST_TXS - 5;
Expand Down Expand Up @@ -69,24 +82,42 @@ const DATA_DIR_LOCATION: &'static str = "demo_data";

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let sequencer_address: Address = [1u8; 32].into();

let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber)
.map_err(|_err| eprintln!("Unable to set global default subscriber"))
.expect("Cannot fail to set subscriber");
let cel_service = default_celestia_service();
let ledger_db = LedgerDB::<FilteredCelestiaBlock>::with_path(DATA_DIR_LOCATION).unwrap();
let ledger_db = LedgerDB::<Spec>::with_path(DATA_DIR_LOCATION).unwrap();
let storage = ProverStorage::with_path(DATA_DIR_LOCATION).unwrap();
let mut demo = DemoApp::new(
storage.clone(),
Runtime::new(),
DemoAppTxVerifier::new(),
DemoAppTxHooks::new(),
GenesisConfig::new(
(),
bank::BankConfig {
tokens: vec![TokenConfig {
token_name: "sovereign".to_string(),
address_and_balances: vec![(sequencer_address, 1000)],
}],
},
accounts::AccountConfig { pub_keys: vec![] },
),
);
let da_app = CelestiaApp {
db: ledger_db.clone(),
};

let rpc_ledger = ledger_db.clone();
let rpc_storage = storage.clone();

let _rpc_handle = rpc::RpcProvider::start(rpc_ledger, rpc_storage).await?;

let mut item_numbers = ledger_db.get_next_items_numbers();
if item_numbers.slot_number == 1 {
print!("No history detected. Initializing chain...");
Expand All @@ -103,8 +134,6 @@ async fn main() -> Result<(), anyhow::Error> {
if last_slot_processed_before_shutdown > i {
println!("Slot at {} has already been processed! Skipping", height);
continue;
} else {
println!("Processing slot at {}", height);
}

let filtered_block: FilteredCelestiaBlock = cel_service.get_finalized_at(height).await?;
Expand All @@ -118,7 +147,6 @@ async fn main() -> Result<(), anyhow::Error> {

demo.begin_slot();
let num_batches = batches.len();
println!(" Found {} batches.", num_batches);
for raw_batch in batches {
let mut data = raw_batch.data();
let batch = match Batch::decode(&mut data) {
Expand All @@ -132,7 +160,8 @@ async fn main() -> Result<(), anyhow::Error> {
let tx_start = item_numbers.tx_number;
let num_txs = batch.txs.len();
let mut batch_to_store = StoredBatch {
hash: DbBytes::new(batch_hash.to_vec()),
sender: raw_batch.sender.as_ref().to_vec(),
hash: batch_hash,
extra_data: DbBytes::new(raw_batch.sender.as_ref().to_vec()),
txs: TxNumber(tx_start)..TxNumber(tx_start + num_txs as u64),
status: Status::Skipped,
Expand All @@ -149,7 +178,7 @@ async fn main() -> Result<(), anyhow::Error> {
let end_event_number = start_event_number + events.len() as u64;
item_numbers.event_number = end_event_number;
StoredTransaction {
hash: DbHash::new(sha2(&tx.data[..]).to_vec()),
hash: sha2(&tx.data[..]),
events: EventNumber(start_event_number)..EventNumber(end_event_number),
data: DbBytes::new(tx.data),
status: Status::Applied,
Expand All @@ -170,22 +199,13 @@ async fn main() -> Result<(), anyhow::Error> {

demo.end_slot();
data_to_persist.slot_data = Some(StoredSlot {
hash: DbHash::new(slot_hash.to_vec()),
hash: slot_hash,
extra_data: DbBytes::new(slot_extra_data),
batches: BatchNumber(item_numbers.batch_number)
..BatchNumber(item_numbers.batch_number + num_batches as u64),
});
item_numbers.batch_number += num_batches as u64;
ledger_db.commit_slot(data_to_persist.finalize()?)?;

println!(
"Current state: {}",
run_query(
&mut demo.runtime,
QueryGenerator::generate_query_election_message(),
storage.clone(),
)
);
}

Ok(())
Expand Down
142 changes: 142 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::net::SocketAddr;

use bank::{
query::{BankRpcImpl, BankRpcServer},
Bank,
};
use jsonrpsee::{server::ServerHandle, RpcModule};
use sov_modules_api::mocks::MockContext;
use sov_state::{mocks::MockStorageSpec, ProverStorage, WorkingSet};
use sovereign_db::ledger_db::LedgerDB;
use sovereign_sdk::rpc::{
BatchIdentifier, EventIdentifier, LedgerRpcProvider, QueryMode, SlotIdentifier, TxIdentifier,
};

use crate::{runtime::Runtime, tx_verifier_impl::DemoAppTxVerifier, Spec};

#[derive(Clone)]
pub(crate) struct RpcProvider {
pub ledger_db: LedgerDB<Spec>,
pub state_db: ProverStorage<MockStorageSpec>,
pub runtime: Runtime<MockContext>,
pub _tx_verifier: DemoAppTxVerifier<MockContext>,
}

impl BankRpcImpl<MockContext> for RpcProvider {
fn get_backing_impl(&self) -> &Bank<MockContext> {
&self.runtime.bank
}

fn get_working_set(&self) -> WorkingSet<ProverStorage<MockStorageSpec>> {
WorkingSet::new(self.state_db.clone())
}
}

impl RpcProvider {
pub async fn start(
ledger_db: LedgerDB<Spec>,
state_db: ProverStorage<MockStorageSpec>,
) -> Result<ServerHandle, jsonrpsee::core::Error> {
let address: SocketAddr = "127.0.0.1:12345".parse().unwrap();
let server = jsonrpsee::server::ServerBuilder::default()
.build([address].as_ref())
.await?;

let runtime = Runtime::new();
let tx_verifier = DemoAppTxVerifier::new();
let bank_rpc = Self {
ledger_db,
state_db,
runtime,
_tx_verifier: tx_verifier,
};

let mut bank_rpc = bank_rpc.into_rpc();

register_ledger_rpc(&mut bank_rpc)?;

server.start(bank_rpc)
}
}

fn register_ledger_rpc(rpc: &mut RpcModule<RpcProvider>) -> Result<(), jsonrpsee::core::Error> {
rpc.register_method("ledger_head", move |_, db| {
db.get_head().map_err(|e| e.into())
})?;

rpc.register_method("ledger_getSlots", move |params, db| {
let ids: Vec<SlotIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_slots(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getBatches", move |params, db| {
let ids: Vec<BatchIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_batches(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getTransactions", move |params, db| {
let ids: Vec<TxIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_transactions(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getEvents", move |params, db| {
let ids: Vec<EventIdentifier> = params.parse()?;
db.get_events(&ids).map_err(|e| e.into())
})?;

Ok(())
}

// TODO: implement TransactionRpcProvider and expose an endpoint for it

/// Delegate all the Ledger RPC methods to the LedgerDB.
impl LedgerRpcProvider for RpcProvider {
type SlotResponse = <LedgerDB<Spec> as LedgerRpcProvider>::SlotResponse;

type BatchResponse = <LedgerDB<Spec> as LedgerRpcProvider>::BatchResponse;

type TxResponse = <LedgerDB<Spec> as LedgerRpcProvider>::TxResponse;

type EventResponse = <LedgerDB<Spec> as LedgerRpcProvider>::EventResponse;

fn get_head(&self) -> Result<Option<Self::SlotResponse>, anyhow::Error> {
self.ledger_db.get_head()
}

fn get_slots(
&self,
slot_ids: &[sovereign_sdk::rpc::SlotIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::SlotResponse>>, anyhow::Error> {
self.ledger_db.get_slots(slot_ids, query_mode)
}

fn get_batches(
&self,
batch_ids: &[sovereign_sdk::rpc::BatchIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::BatchResponse>>, anyhow::Error> {
self.ledger_db.get_batches(batch_ids, query_mode)
}

fn get_transactions(
&self,
tx_ids: &[sovereign_sdk::rpc::TxIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::TxResponse>>, anyhow::Error> {
self.ledger_db.get_transactions(tx_ids, query_mode)
}

fn get_events(
&self,
event_ids: &[sovereign_sdk::rpc::EventIdentifier],
) -> Result<Vec<Option<Self::EventResponse>>, anyhow::Error> {
self.ledger_db.get_events(event_ids)
}
}
9 changes: 8 additions & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,25 @@ use sov_modules_macros::{DispatchCall, DispatchQuery, Genesis, MessageCodec};
///
/// Similar mechanism works for queries with the difference that queries are submitted by users directly to the rollup node
/// instead of going through the DA layer.
#[derive(Genesis, DispatchCall, DispatchQuery, MessageCodec)]
#[derive(Genesis, DispatchCall, DispatchQuery, MessageCodec, Clone)]
#[serialization(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub(crate) struct Runtime<C: Context> {
/// Definition of the first module in the rollup (must implement the sov_modules_api::Module trait).
#[allow(unused)]
election: election::Election<C>,
#[allow(unused)]
pub(crate) bank: bank::Bank<C>,
#[allow(unused)]
accounts: accounts::Accounts<C>,
}

// TODO add macro to generate the following code.
impl<C: Context> Runtime<C> {
pub(crate) fn new() -> Self {
Self {
election: election::Election::new(),
bank: bank::Bank::new(),
accounts: accounts::Accounts::new(),
}
}
}
Loading

0 comments on commit 9ca80dd

Please sign in to comment.