Skip to content
This repository has been archived by the owner on May 30, 2023. It is now read-only.

Implement basic RPC for the demo app #6

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,17 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.6"
jupiter = { path ="../Jupiter"}
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
accounts = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
election = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-state = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["mocks"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-modules-macros = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "bfdb58159ff9215a84aa60b9a4d3ce1f32136597" }
sov-app-template = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
accounts = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
bank = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
election = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sovereign-sdk = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-state = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-modules-api = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["mocks"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sov-modules-macros = { git = "https://github.com/Sovereign-Labs/sovereign.git", rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
sovereign-db = { git = "https://github.com/Sovereign-Labs/sovereign.git", features = ["temp"], rev = "abfe1485f9c30e2c4a41ee8c9a18d9d646013c9c" }
tracing = "0.1.37"
tracing-subscriber = "0.3.16"

anyhow = "1.0.62"
jsonrpsee = { version = "0.16.2", features = ["http-client"] }

[patch.crates-io]
# Patch borsh until "bytes" support is added
borsh = { git = "https://github.com/preston-evans98/borsh-rs.git", branch = "release-2" }
jsonrpsee = { version = "0.16.2", features = ["http-client", "server"] }
70 changes: 45 additions & 25 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,56 @@
use crate::runtime::GenesisConfig;
use bank::TokenConfig;
use jsonrpsee::http_client::HeaderMap;
use jupiter::{
da_app::{CelestiaApp, TmHash},
da_service::{CelestiaService, FilteredCelestiaBlock},
};
use sha2::{Digest, Sha256};
use sov_app_template::{AppTemplate, Batch};
use sov_modules_api::mocks::MockContext;
use sov_modules_api::{mocks::MockContext, Address};
use sov_state::ProverStorage;
use sovereign_db::{
ledger_db::{LedgerDB, SlotCommitBuilder},
schema::types::{
BatchNumber, DbBytes, DbHash, EventNumber, Status, StoredBatch, StoredSlot,
StoredTransaction, TxNumber,
BatchNumber, DbBytes, EventNumber, Status, StoredBatch, StoredSlot, StoredTransaction,
TxNumber,
},
};
use sovereign_sdk::{
da::BlobTransactionTrait,
serial::Encode,
services::da::{DaService, SlotData},
spec::RollupSpec,
};
use sovereign_sdk::{da::DaLayerTrait, stf::StateTransitionFunction};
use sovereign_sdk::{db::SlotStore, serial::Decode};

use tracing::Level;
use tx_verifier_impl::DemoAppTxVerifier;

use crate::{
data_generation::QueryGenerator, helpers::run_query, runtime::Runtime,
tx_hooks_impl::DemoAppTxHooks,
};
use crate::{runtime::Runtime, tx_hooks_impl::DemoAppTxHooks};

mod data_generation;
mod helpers;
mod rpc;
mod runtime;
mod tx_hooks_impl;
mod tx_verifier_impl;

#[derive(Debug, Clone)]
struct Spec;

impl RollupSpec for Spec {
type SlotData = FilteredCelestiaBlock;

type Stf = DemoApp;

type Hasher = Sha256;
}

type C = MockContext;
type DemoApp = AppTemplate<C, DemoAppTxVerifier<C>, Runtime<C>, DemoAppTxHooks<C>>;
type DemoApp =
AppTemplate<C, DemoAppTxVerifier<C>, Runtime<C>, DemoAppTxHooks<C>, GenesisConfig<C>>;
const CELESTIA_NODE_AUTH_TOKEN: &'static str = "";

const START_HEIGHT: u64 = HEIGHT_OF_FIRST_TXS - 5;
Expand Down Expand Up @@ -69,24 +82,42 @@ const DATA_DIR_LOCATION: &'static str = "demo_data";

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let sequencer_address: Address = [1u8; 32].into();

let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.with_max_level(Level::WARN)
.finish();
tracing::subscriber::set_global_default(subscriber)
.map_err(|_err| eprintln!("Unable to set global default subscriber"))
.expect("Cannot fail to set subscriber");
let cel_service = default_celestia_service();
let ledger_db = LedgerDB::<FilteredCelestiaBlock>::with_path(DATA_DIR_LOCATION).unwrap();
let ledger_db = LedgerDB::<Spec>::with_path(DATA_DIR_LOCATION).unwrap();
let storage = ProverStorage::with_path(DATA_DIR_LOCATION).unwrap();
let mut demo = DemoApp::new(
storage.clone(),
Runtime::new(),
DemoAppTxVerifier::new(),
DemoAppTxHooks::new(),
GenesisConfig::new(
(),
bank::BankConfig {
tokens: vec![TokenConfig {
token_name: "sovereign".to_string(),
address_and_balances: vec![(sequencer_address, 1000)],
}],
},
accounts::AccountConfig { pub_keys: vec![] },
),
);
let da_app = CelestiaApp {
db: ledger_db.clone(),
};

let rpc_ledger = ledger_db.clone();
let rpc_storage = storage.clone();

let _rpc_handle = rpc::RpcProvider::start(rpc_ledger, rpc_storage).await?;

let mut item_numbers = ledger_db.get_next_items_numbers();
if item_numbers.slot_number == 1 {
print!("No history detected. Initializing chain...");
Expand All @@ -103,8 +134,6 @@ async fn main() -> Result<(), anyhow::Error> {
if last_slot_processed_before_shutdown > i {
println!("Slot at {} has already been processed! Skipping", height);
continue;
} else {
println!("Processing slot at {}", height);
}

let filtered_block: FilteredCelestiaBlock = cel_service.get_finalized_at(height).await?;
Expand All @@ -118,7 +147,6 @@ async fn main() -> Result<(), anyhow::Error> {

demo.begin_slot();
let num_batches = batches.len();
println!(" Found {} batches.", num_batches);
for raw_batch in batches {
let mut data = raw_batch.data();
let batch = match Batch::decode(&mut data) {
Expand All @@ -132,7 +160,8 @@ async fn main() -> Result<(), anyhow::Error> {
let tx_start = item_numbers.tx_number;
let num_txs = batch.txs.len();
let mut batch_to_store = StoredBatch {
hash: DbBytes::new(batch_hash.to_vec()),
sender: raw_batch.sender.as_ref().to_vec(),
hash: batch_hash,
extra_data: DbBytes::new(raw_batch.sender.as_ref().to_vec()),
txs: TxNumber(tx_start)..TxNumber(tx_start + num_txs as u64),
status: Status::Skipped,
Expand All @@ -149,7 +178,7 @@ async fn main() -> Result<(), anyhow::Error> {
let end_event_number = start_event_number + events.len() as u64;
item_numbers.event_number = end_event_number;
StoredTransaction {
hash: DbHash::new(sha2(&tx.data[..]).to_vec()),
hash: sha2(&tx.data[..]),
events: EventNumber(start_event_number)..EventNumber(end_event_number),
data: DbBytes::new(tx.data),
status: Status::Applied,
Expand All @@ -170,22 +199,13 @@ async fn main() -> Result<(), anyhow::Error> {

demo.end_slot();
data_to_persist.slot_data = Some(StoredSlot {
hash: DbHash::new(slot_hash.to_vec()),
hash: slot_hash,
extra_data: DbBytes::new(slot_extra_data),
batches: BatchNumber(item_numbers.batch_number)
..BatchNumber(item_numbers.batch_number + num_batches as u64),
});
item_numbers.batch_number += num_batches as u64;
ledger_db.commit_slot(data_to_persist.finalize()?)?;

println!(
"Current state: {}",
run_query(
&mut demo.runtime,
QueryGenerator::generate_query_election_message(),
storage.clone(),
)
);
}

Ok(())
Expand Down
142 changes: 142 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::net::SocketAddr;

use bank::{
query::{BankRpcImpl, BankRpcServer},
Bank,
};
use jsonrpsee::{server::ServerHandle, RpcModule};
use sov_modules_api::mocks::MockContext;
use sov_state::{mocks::MockStorageSpec, ProverStorage, WorkingSet};
use sovereign_db::ledger_db::LedgerDB;
use sovereign_sdk::rpc::{
BatchIdentifier, EventIdentifier, LedgerRpcProvider, QueryMode, SlotIdentifier, TxIdentifier,
};

use crate::{runtime::Runtime, tx_verifier_impl::DemoAppTxVerifier, Spec};

#[derive(Clone)]
pub(crate) struct RpcProvider {
pub ledger_db: LedgerDB<Spec>,
pub state_db: ProverStorage<MockStorageSpec>,
pub runtime: Runtime<MockContext>,
pub _tx_verifier: DemoAppTxVerifier<MockContext>,
}

impl BankRpcImpl<MockContext> for RpcProvider {
fn get_backing_impl(&self) -> &Bank<MockContext> {
&self.runtime.bank
}

fn get_working_set(&self) -> WorkingSet<ProverStorage<MockStorageSpec>> {
WorkingSet::new(self.state_db.clone())
}
}

impl RpcProvider {
pub async fn start(
ledger_db: LedgerDB<Spec>,
state_db: ProverStorage<MockStorageSpec>,
) -> Result<ServerHandle, jsonrpsee::core::Error> {
let address: SocketAddr = "127.0.0.1:12345".parse().unwrap();
let server = jsonrpsee::server::ServerBuilder::default()
.build([address].as_ref())
.await?;

let runtime = Runtime::new();
let tx_verifier = DemoAppTxVerifier::new();
let bank_rpc = Self {
ledger_db,
state_db,
runtime,
_tx_verifier: tx_verifier,
};

let mut bank_rpc = bank_rpc.into_rpc();

register_ledger_rpc(&mut bank_rpc)?;

server.start(bank_rpc)
}
}

fn register_ledger_rpc(rpc: &mut RpcModule<RpcProvider>) -> Result<(), jsonrpsee::core::Error> {
rpc.register_method("ledger_head", move |_, db| {
db.get_head().map_err(|e| e.into())
})?;

rpc.register_method("ledger_getSlots", move |params, db| {
let ids: Vec<SlotIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_slots(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getBatches", move |params, db| {
let ids: Vec<BatchIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_batches(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getTransactions", move |params, db| {
let ids: Vec<TxIdentifier>;
let query_mode: QueryMode;
(ids, query_mode) = params.parse()?;
db.get_transactions(&ids, query_mode).map_err(|e| e.into())
})?;

rpc.register_method("ledger_getEvents", move |params, db| {
let ids: Vec<EventIdentifier> = params.parse()?;
db.get_events(&ids).map_err(|e| e.into())
})?;

Ok(())
}

// TODO: implement TransactionRpcProvider and expose an endpoint for it

/// Delegate all the Ledger RPC methods to the LedgerDB.
impl LedgerRpcProvider for RpcProvider {
type SlotResponse = <LedgerDB<Spec> as LedgerRpcProvider>::SlotResponse;

type BatchResponse = <LedgerDB<Spec> as LedgerRpcProvider>::BatchResponse;

type TxResponse = <LedgerDB<Spec> as LedgerRpcProvider>::TxResponse;

type EventResponse = <LedgerDB<Spec> as LedgerRpcProvider>::EventResponse;

fn get_head(&self) -> Result<Option<Self::SlotResponse>, anyhow::Error> {
self.ledger_db.get_head()
}

fn get_slots(
&self,
slot_ids: &[sovereign_sdk::rpc::SlotIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::SlotResponse>>, anyhow::Error> {
self.ledger_db.get_slots(slot_ids, query_mode)
}

fn get_batches(
&self,
batch_ids: &[sovereign_sdk::rpc::BatchIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::BatchResponse>>, anyhow::Error> {
self.ledger_db.get_batches(batch_ids, query_mode)
}

fn get_transactions(
&self,
tx_ids: &[sovereign_sdk::rpc::TxIdentifier],
query_mode: sovereign_sdk::rpc::QueryMode,
) -> Result<Vec<Option<Self::TxResponse>>, anyhow::Error> {
self.ledger_db.get_transactions(tx_ids, query_mode)
}

fn get_events(
&self,
event_ids: &[sovereign_sdk::rpc::EventIdentifier],
) -> Result<Vec<Option<Self::EventResponse>>, anyhow::Error> {
self.ledger_db.get_events(event_ids)
}
}
9 changes: 8 additions & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,25 @@ use sov_modules_macros::{DispatchCall, DispatchQuery, Genesis, MessageCodec};
///
/// Similar mechanism works for queries with the difference that queries are submitted by users directly to the rollup node
/// instead of going through the DA layer.
#[derive(Genesis, DispatchCall, DispatchQuery, MessageCodec)]
#[derive(Genesis, DispatchCall, DispatchQuery, MessageCodec, Clone)]
#[serialization(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub(crate) struct Runtime<C: Context> {
/// Definition of the first module in the rollup (must implement the sov_modules_api::Module trait).
#[allow(unused)]
election: election::Election<C>,
#[allow(unused)]
pub(crate) bank: bank::Bank<C>,
#[allow(unused)]
accounts: accounts::Accounts<C>,
}

// TODO add macro to generate the following code.
impl<C: Context> Runtime<C> {
pub(crate) fn new() -> Self {
Self {
election: election::Election::new(),
bank: bank::Bank::new(),
accounts: accounts::Accounts::new(),
}
}
}
Loading