diff --git a/crates/light-client-prover/Cargo.toml b/crates/light-client-prover/Cargo.toml deleted file mode 100644 index df53c7a23..000000000 --- a/crates/light-client-prover/Cargo.toml +++ /dev/null @@ -1,59 +0,0 @@ -[package] -name = "citrea-light-client-prover" -version.workspace = true -authors.workspace = true -edition.workspace = true -homepage.workspace = true -license.workspace = true -publish.workspace = true -repository.workspace = true - -[dependencies] -# Citrea Deps -citrea-common = { path = "../common", optional = true } -citrea-primitives = { path = "../primitives", optional = true } - -# Sov SDK deps -sov-db = { path = "../sovereign-sdk/full-node/db/sov-db", optional = true } -sov-ledger-rpc = { path = "../sovereign-sdk/full-node/sov-ledger-rpc", features = ["client"], optional = true } -sov-modules-api = { path = "../sovereign-sdk/module-system/sov-modules-api", default-features = false } -sov-modules-rollup-blueprint = { path = "../sovereign-sdk/module-system/sov-modules-rollup-blueprint", optional = true } -sov-rollup-interface = { path = "../sovereign-sdk/rollup-interface" } -sov-stf-runner = { path = "../sovereign-sdk/full-node/sov-stf-runner", optional = true } - -# 3rd-party deps -anyhow = { workspace = true, optional = true } -async-trait = { workspace = true, optional = true } -bincode = { workspace = true } -borsh = { workspace = true } -hex = { workspace = true } -jsonrpsee = { workspace = true, optional = true, features = ["http-client", "server", "client"] } -reth-primitives = { workspace = true, optional = true } -tokio = { workspace = true, optional = true } -tokio-util = { workspace = true, optional = true } -tower = { workspace = true, optional = true } -tracing = { workspace = true, optional = true } - -[dev-dependencies] -sov-mock-da = { path = "../sovereign-sdk/adapters/mock-da", features = ["native"] } -sov-mock-zkvm = { path = "../sovereign-sdk/adapters/mock-zkvm" } -tempfile = { workspace = true } - -[features] -default = [] -native = [ - "dep:citrea-primitives", - "dep:citrea-common", - "dep:sov-db", - "dep:sov-modules-rollup-blueprint", - "dep:sov-stf-runner", - "dep:sov-ledger-rpc", - "dep:anyhow", - "dep:async-trait", - "dep:jsonrpsee", - "dep:reth-primitives", - "dep:tokio", - "dep:tokio-util", - "dep:tower", - "dep:tracing", -] diff --git a/crates/light-client-prover/src/da_block_handler.rs b/crates/light-client-prover/src/da_block_handler.rs deleted file mode 100644 index a5c06c829..000000000 --- a/crates/light-client-prover/src/da_block_handler.rs +++ /dev/null @@ -1,394 +0,0 @@ -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; - -use anyhow::anyhow; -use borsh::BorshDeserialize; -use citrea_common::cache::L1BlockCache; -use citrea_common::da::get_da_block_at_height; -use citrea_common::LightClientProverConfig; -use citrea_primitives::forks::FORKS; -use jsonrpsee::http_client::HttpClient; -use reth_primitives::U64; -use sov_db::ledger_db::{LightClientProverLedgerOps, SharedLedgerOps}; -use sov_db::schema::types::{SlotNumber, StoredLightClientProofOutput}; -use sov_ledger_rpc::LedgerRpcClient; -use sov_modules_api::fork::fork_from_block_number; -use sov_modules_api::{BatchProofCircuitOutput, BlobReaderTrait, DaSpec, Zkvm}; -use sov_rollup_interface::da::{BlockHeaderTrait, DaDataLightClient, DaNamespace}; -use sov_rollup_interface::services::da::{DaService, SlotData}; -use sov_rollup_interface::spec::SpecId; -use sov_rollup_interface::zk::{ - LightClientCircuitInput, LightClientCircuitOutput, Proof, ZkvmHost, -}; -use sov_stf_runner::ProverService; -use tokio::select; -use tokio::sync::{mpsc, Mutex}; -use tokio::time::{sleep, Duration}; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -pub(crate) struct L1BlockHandler -where - Da: DaService, - Vm: ZkvmHost + Zkvm, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone, - Ps: ProverService, -{ - _prover_config: LightClientProverConfig, - prover_service: Arc, - ledger_db: DB, - da_service: Arc, - batch_prover_da_pub_key: Vec, - batch_proof_code_commitments: HashMap, - light_client_proof_code_commitments: HashMap, - light_client_proof_elfs: HashMap>, - l1_block_cache: Arc>>, - queued_l1_blocks: VecDeque<::FilteredBlock>, - sequencer_client: Arc, -} - -impl L1BlockHandler -where - Da: DaService, - Vm: ZkvmHost + Zkvm, - Ps: ProverService, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone, -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - prover_config: LightClientProverConfig, - prover_service: Arc, - ledger_db: DB, - da_service: Arc, - batch_prover_da_pub_key: Vec, - batch_proof_code_commitments: HashMap, - light_client_proof_code_commitments: HashMap, - light_client_proof_elfs: HashMap>, - sequencer_client: Arc, - ) -> Self { - Self { - _prover_config: prover_config, - prover_service, - ledger_db, - da_service, - batch_prover_da_pub_key, - batch_proof_code_commitments, - light_client_proof_code_commitments, - light_client_proof_elfs, - l1_block_cache: Arc::new(Mutex::new(L1BlockCache::new())), - queued_l1_blocks: VecDeque::new(), - sequencer_client, - } - } - - pub async fn run(mut self, start_l1_height: u64, cancellation_token: CancellationToken) { - // if self.prover_config.enable_recovery { - // if let Err(e) = self.check_and_recover_ongoing_proving_sessions().await { - // error!("Failed to recover ongoing proving sessions: {:?}", e); - // } - // } else { - // // If recovery is disabled, clear pending proving sessions - // self.ledger_db - // .clear_pending_proving_sessions() - // .expect("Failed to clear pending proving sessions"); - // } - - let (l1_tx, mut l1_rx) = mpsc::channel(1); - let l1_sync_worker = sync_l1( - start_l1_height, - self.da_service.clone(), - l1_tx, - self.l1_block_cache.clone(), - ); - tokio::pin!(l1_sync_worker); - - let mut interval = tokio::time::interval(Duration::from_secs(2)); - interval.tick().await; - loop { - select! { - biased; - _ = cancellation_token.cancelled() => { - return; - } - _ = &mut l1_sync_worker => {}, - Some(l1_block) = l1_rx.recv() => { - self.queued_l1_blocks.push_back(l1_block); - }, - _ = interval.tick() => { - if let Err(e) = self.process_queued_l1_blocks().await { - error!("Could not process queued L1 blocks and generate proof: {:?}", e); - } - }, - } - } - } - - async fn process_queued_l1_blocks(&mut self) -> Result<(), anyhow::Error> { - while !self.queued_l1_blocks.is_empty() { - let l1_block = self - .queued_l1_blocks - .front() - .expect("Pending l1 blocks cannot be empty"); - - self.process_l1_block(l1_block).await?; - - self.queued_l1_blocks.pop_front(); - } - - Ok(()) - } - - async fn process_l1_block(&self, l1_block: &Da::FilteredBlock) -> anyhow::Result<()> { - let l1_hash = l1_block.header().hash().into(); - let l1_height = l1_block.header().height(); - - // Set the l1 height of the l1 hash - self.ledger_db - .set_l1_height_of_l1_hash(l1_hash, l1_height) - .expect("Setting l1 height of l1 hash in ledger db"); - - let (mut da_data, inclusion_proof, completeness_proof) = self - .da_service - .extract_relevant_blobs_with_proof(l1_block, DaNamespace::ToLightClientProver); - - let batch_proofs = self.extract_batch_proofs(&mut da_data, l1_hash).await; - tracing::info!( - "Block {} has {} batch proofs", - l1_height, - batch_proofs.len() - ); - - let mut assumptions = vec![]; - for batch_proof in batch_proofs { - if let DaDataLightClient::Complete(proof) = batch_proof { - let batch_proof_output = Vm::extract_output::< - ::Spec, - BatchProofCircuitOutput<::Spec, [u8; 32]>, - >(&proof) - .map_err(|_| anyhow!("Proof should be deserializable"))?; - let last_l2_height = batch_proof_output.last_l2_height; - let current_spec = fork_from_block_number(FORKS, last_l2_height).spec_id; - let batch_proof_method_id = self - .batch_proof_code_commitments - .get(¤t_spec) - .expect("Batch proof code commitment not found"); - if let Err(e) = Vm::verify(proof.as_slice(), batch_proof_method_id) { - tracing::error!("Failed to verify batch proof: {:?}", e); - continue; - } - assumptions.push(proof); - } - } - let previous_l1_height = l1_height - 1; - let mut light_client_proof_journal = None; - let mut l2_genesis_state_root = None; - let l2_last_height = match self - .ledger_db - .get_light_client_proof_data_by_l1_height(previous_l1_height)? - { - Some(data) => { - let proof = data.proof; - let output = data.light_client_proof_output; - assumptions.push(proof); - light_client_proof_journal = Some(borsh::to_vec(&output)?); - Some(output.last_l2_height) - } - None => { - let soft_confirmation = self - .sequencer_client - .get_soft_confirmation_by_number(U64::from(1)) - .await? - .unwrap(); - let initial_l1_height = soft_confirmation.da_slot_height; - // If the prev block is the block before the first processed l1 block - // then we don't have a previous light client proof, so just give an info - if previous_l1_height == initial_l1_height { - // TODO: Provide genesis state root here to the light client proof circuit input - l2_genesis_state_root = self - .sequencer_client - .get_l2_genesis_state_root() - .await? - .map(|v| v.as_slice().try_into().unwrap()); - - tracing::info!( - "No previous light client proof found for L1 block: {}", - previous_l1_height - ); - } - // If not then we have a problem - else { - panic!( - "No previous light client proof found for L1 block: {}", - previous_l1_height - ); - } - Some(soft_confirmation.l2_height) - } - }; - - let l2_last_height = l2_last_height.ok_or(anyhow!( - "Could not determine the last L2 height for batch proof" - ))?; - let current_fork = fork_from_block_number(FORKS, l2_last_height); - let batch_proof_method_id = self - .batch_proof_code_commitments - .get(¤t_fork.spec_id) - .expect("Fork should have a guest code attached"); - let light_client_proof_code_commitment = self - .light_client_proof_code_commitments - .get(¤t_fork.spec_id) - .expect("Fork should have a guest code attached"); - let light_client_elf = self - .light_client_proof_elfs - .get(¤t_fork.spec_id) - .expect("Fork should have a guest code attached") - .clone(); - - let circuit_input = LightClientCircuitInput { - da_data, - inclusion_proof, - completeness_proof, - da_block_header: l1_block.header().clone(), - batch_prover_da_pub_key: self.batch_prover_da_pub_key.clone(), - batch_proof_method_id: batch_proof_method_id.clone().into(), - light_client_proof_method_id: light_client_proof_code_commitment.clone().into(), - previous_light_client_proof_journal: light_client_proof_journal, - l2_genesis_state_root, - }; - - let proof = self - .prove(light_client_elf, circuit_input, assumptions) - .await?; - - let circuit_output = - Vm::extract_output::>(&proof) - .expect("Should deserialize valid proof"); - - tracing::info!( - "Generated proof for L1 block: {l1_height} output={:?}", - circuit_output - ); - - let stored_proof_output = StoredLightClientProofOutput { - state_root: circuit_output.state_root, - light_client_proof_method_id: circuit_output.light_client_proof_method_id, - da_block_hash: circuit_output.da_block_hash.into(), - da_block_height: circuit_output.da_block_height, - da_total_work: circuit_output.da_total_work, - da_current_target_bits: circuit_output.da_current_target_bits, - da_epoch_start_time: circuit_output.da_epoch_start_time, - da_prev_11_timestamps: circuit_output.da_prev_11_timestamps, - unchained_batch_proofs_info: circuit_output.unchained_batch_proofs_info, - last_l2_height: circuit_output.last_l2_height, - l2_genesis_state_root: circuit_output.l2_genesis_state_root, - }; - - self.ledger_db.insert_light_client_proof_data_by_l1_height( - l1_height, - proof, - stored_proof_output, - )?; - - self.ledger_db - .set_last_scanned_l1_height(SlotNumber(l1_block.header().height())) - .expect("Saving last scanned l1 height to ledger db"); - - Ok(()) - } - - async fn extract_batch_proofs( - &self, - da_data: &mut [<::Spec as DaSpec>::BlobTransaction], - da_slot_hash: [u8; 32], // passing this as an argument is not clever - ) -> Vec { - let mut batch_proofs = Vec::new(); - - da_data.iter_mut().for_each(|tx| { - // Check for commitment - if tx.sender().as_ref() == self.batch_prover_da_pub_key.as_slice() { - let data = DaDataLightClient::try_from_slice(tx.full_data()); - - if let Ok(proof) = data { - batch_proofs.push(proof); - } else { - tracing::warn!( - "Found broken DA data in block 0x{}: {:?}", - hex::encode(da_slot_hash), - data - ); - } - } - }); - batch_proofs - } - - async fn prove( - &self, - light_client_elf: Vec, - circuit_input: LightClientCircuitInput<::Spec>, - assumptions: Vec>, - ) -> Result { - let prover_service = self.prover_service.as_ref(); - - prover_service - .add_proof_data((borsh::to_vec(&circuit_input)?, assumptions)) - .await; - - let proofs = self.prover_service.prove(light_client_elf).await?; - - assert_eq!(proofs.len(), 1); - - Ok(proofs[0].clone()) - } -} - -async fn sync_l1( - start_l1_height: u64, - da_service: Arc, - sender: mpsc::Sender, - l1_block_cache: Arc>>, -) where - Da: DaService, -{ - let mut l1_height = start_l1_height; - info!("Starting to sync from L1 height {}", l1_height); - - 'block_sync: loop { - // TODO: for a node, the da block at slot_height might not have been finalized yet - // should wait for it to be finalized - let last_finalized_l1_block_header = - match da_service.get_last_finalized_block_header().await { - Ok(header) => header, - Err(e) => { - error!("Could not fetch last finalized L1 block header: {}", e); - sleep(Duration::from_secs(2)).await; - continue; - } - }; - - let new_l1_height = last_finalized_l1_block_header.height(); - - for block_number in l1_height + 1..=new_l1_height { - let l1_block = - match get_da_block_at_height(&da_service, block_number, l1_block_cache.clone()) - .await - { - Ok(block) => block, - Err(e) => { - error!("Could not fetch last finalized L1 block: {}", e); - sleep(Duration::from_secs(2)).await; - continue 'block_sync; - } - }; - if block_number > l1_height { - l1_height = block_number; - if let Err(e) = sender.send(l1_block).await { - error!("Could not notify about L1 block: {}", e); - continue 'block_sync; - } - } - } - - sleep(Duration::from_secs(2)).await; - } -} diff --git a/crates/light-client-prover/src/runner.rs b/crates/light-client-prover/src/runner.rs deleted file mode 100644 index 1675eeb5a..000000000 --- a/crates/light-client-prover/src/runner.rs +++ /dev/null @@ -1,265 +0,0 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; - -use citrea_common::tasks::manager::TaskManager; -use citrea_common::{LightClientProverConfig, RollupPublicKeys, RpcConfig, RunnerConfig}; -use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; -use jsonrpsee::server::{BatchRequestConfig, ServerBuilder}; -use jsonrpsee::RpcModule; -use reth_primitives::U64; -use sov_db::ledger_db::{LedgerDB, LightClientProverLedgerOps, SharedLedgerOps}; -use sov_db::schema::types::SlotNumber; -use sov_ledger_rpc::LedgerRpcClient; -use sov_modules_rollup_blueprint::RollupBlueprint; -use sov_rollup_interface::services::da::DaService; -use sov_rollup_interface::spec::SpecId; -use sov_rollup_interface::zk::ZkvmHost; -use sov_stf_runner::ProverService; -use tokio::signal; -use tokio::sync::oneshot; -use tracing::{error, info, instrument}; - -use crate::da_block_handler::L1BlockHandler; -use crate::rpc::{create_rpc_module, RpcContext}; - -/// Dependencies needed to run the rollup. -pub struct LightClientProver { - /// The State Transition Runner. - #[allow(clippy::type_complexity)] - pub runner: CitreaLightClientProver, - /// Rpc methods for the rollup. - pub rpc_methods: jsonrpsee::RpcModule<()>, -} - -impl LightClientProver { - /// Runs the rollup. - #[instrument(level = "trace", skip_all, err, ret(level = "error"))] - pub async fn run(self) -> Result<(), anyhow::Error> { - self.run_and_report_rpc_port(None).await - } - - /// Only run the rpc. - pub async fn run_rpc(mut self) -> Result<(), anyhow::Error> { - self.runner.start_rpc_server(self.rpc_methods, None).await?; - Ok(()) - } - - /// Runs the rollup. Reports rpc port to the caller using the provided channel. - pub async fn run_and_report_rpc_port( - self, - channel: Option>, - ) -> Result<(), anyhow::Error> { - let mut runner = self.runner; - runner.start_rpc_server(self.rpc_methods, channel).await?; - - runner.run().await?; - Ok(()) - } -} - -pub struct CitreaLightClientProver -where - Da: DaService + Send + Sync, - Vm: ZkvmHost, - Ps: ProverService, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone, -{ - _runner_config: RunnerConfig, - public_keys: RollupPublicKeys, - rpc_config: RpcConfig, - da_service: Arc, - ledger_db: DB, - sequencer_client: HttpClient, - prover_service: Arc, - prover_config: LightClientProverConfig, - task_manager: TaskManager<()>, - batch_proof_commitments_by_spec: HashMap, - light_client_proof_commitment: HashMap, - light_client_proof_elfs: HashMap>, -} - -impl CitreaLightClientProver -where - Da: DaService + Send + Sync + 'static, - Vm: ZkvmHost, - Ps: ProverService + Send + Sync + 'static, - DB: LightClientProverLedgerOps + SharedLedgerOps + Clone + 'static, -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - runner_config: RunnerConfig, - public_keys: RollupPublicKeys, - rpc_config: RpcConfig, - da_service: Arc, - ledger_db: DB, - prover_service: Arc, - prover_config: LightClientProverConfig, - batch_proof_commitments_by_spec: HashMap, - light_client_proof_commitment: HashMap, - light_client_proof_elfs: HashMap>, - task_manager: TaskManager<()>, - ) -> Result { - let sequencer_client_url = runner_config.sequencer_client_url.clone(); - Ok(Self { - _runner_config: runner_config, - public_keys, - rpc_config, - da_service, - ledger_db, - sequencer_client: HttpClientBuilder::default().build(sequencer_client_url)?, - prover_service, - prover_config, - task_manager, - batch_proof_commitments_by_spec, - light_client_proof_commitment, - light_client_proof_elfs, - }) - } - - /// Starts a RPC server with provided rpc methods. - pub async fn start_rpc_server( - &mut self, - methods: RpcModule<()>, - channel: Option>, - ) -> anyhow::Result<()> { - let methods = self.register_rpc_methods(methods)?; - let listen_address = SocketAddr::new( - self.rpc_config - .bind_host - .parse() - .map_err(|e| anyhow::anyhow!("Failed to parse bind host: {}", e))?, - self.rpc_config.bind_port, - ); - - let max_connections = self.rpc_config.max_connections; - let max_subscriptions_per_connection = self.rpc_config.max_subscriptions_per_connection; - let max_request_body_size = self.rpc_config.max_request_body_size; - let max_response_body_size = self.rpc_config.max_response_body_size; - let batch_requests_limit = self.rpc_config.batch_requests_limit; - - let middleware = tower::ServiceBuilder::new().layer(citrea_common::rpc::get_cors_layer()); - // .layer(citrea_common::rpc::get_healthcheck_proxy_layer()); - - self.task_manager.spawn(|cancellation_token| async move { - let server = ServerBuilder::default() - .max_connections(max_connections) - .max_subscriptions_per_connection(max_subscriptions_per_connection) - .max_request_body_size(max_request_body_size) - .max_response_body_size(max_response_body_size) - .set_batch_request_config(BatchRequestConfig::Limit(batch_requests_limit)) - .set_http_middleware(middleware) - .build([listen_address].as_ref()) - .await; - - match server { - Ok(server) => { - let bound_address = match server.local_addr() { - Ok(address) => address, - Err(e) => { - error!("{}", e); - return; - } - }; - if let Some(channel) = channel { - if let Err(e) = channel.send(bound_address) { - error!("Could not send bound_address {}: {}", bound_address, e); - return; - } - } - info!("Starting RPC server at {} ", &bound_address); - - let _server_handle = server.start(methods); - cancellation_token.cancelled().await; - } - Err(e) => { - error!("Could not start RPC server: {}", e); - } - } - }); - Ok(()) - } - - /// Runs the rollup. - #[instrument(level = "trace", skip_all, err)] - pub async fn run(&mut self) -> Result<(), anyhow::Error> { - let last_l1_height_scanned = match self.ledger_db.get_last_scanned_l1_height()? { - Some(l1_height) => l1_height, - // If not found, start from the first L2 block's L1 height - None => SlotNumber(get_initial_da_height(&self.sequencer_client).await), - }; - - let prover_config = self.prover_config.clone(); - let prover_service = self.prover_service.clone(); - let ledger_db = self.ledger_db.clone(); - let da_service = self.da_service.clone(); - let batch_prover_da_pub_key = self.public_keys.prover_da_pub_key.clone(); - let batch_proof_commitments_by_spec = self.batch_proof_commitments_by_spec.clone(); - let light_client_proof_commitment = self.light_client_proof_commitment.clone(); - let light_client_proof_elfs = self.light_client_proof_elfs.clone(); - let sequencer_client = self.sequencer_client.clone(); - - self.task_manager.spawn(|cancellation_token| async move { - let l1_block_handler = L1BlockHandler::::new( - prover_config, - prover_service, - ledger_db, - da_service, - batch_prover_da_pub_key, - batch_proof_commitments_by_spec, - light_client_proof_commitment, - light_client_proof_elfs, - Arc::new(sequencer_client), - ); - l1_block_handler - .run(last_l1_height_scanned.0, cancellation_token) - .await - }); - - // Temporary fix - signal::ctrl_c().await.expect("Failed to listen ctrl+c"); - Ok(()) - - // TODO: update this once l2 sync is implemented - // loop { - // select! { - // _ = signal::ctrl_c() => { - // info!("Shutting down"); - // self.task_manager.abort().await; - // return Ok(()); - // } - // } - // } - } - - /// Creates a shared RpcContext with all required data. - fn create_rpc_context(&self) -> RpcContext { - RpcContext { - ledger: self.ledger_db.clone(), - } - } - - /// Updates the given RpcModule with Prover methods. - pub fn register_rpc_methods( - &self, - mut rpc_methods: jsonrpsee::RpcModule<()>, - ) -> Result, jsonrpsee::core::RegisterMethodError> { - let rpc_context = self.create_rpc_context(); - let rpc = create_rpc_module(rpc_context); - rpc_methods.merge(rpc)?; - Ok(rpc_methods) - } -} - -async fn get_initial_da_height(client: &HttpClient) -> u64 { - loop { - match client.get_soft_confirmation_by_number(U64::from(1)).await { - Ok(Some(batch)) => return batch.da_slot_height, - _ => { - // sleep 1 - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; - } - } - } -}