Skip to content

Commit

Permalink
Implement separate l1 l2 sync for prover (#857)
Browse files Browse the repository at this point in the history
* WIP implement separate l1 l2 sync for prover

* Prover l1/l2 sync separated

* Use function in select loop

* Fix bug in l1 sync & all tests work

* Remove old sync

* Lint

* Toml files dprint

* Remove unnecessary prints and comments

* Lint

* Comment out logs

* Remove unnecessary dep

* Pop front instead of finding index

* Handle errors in prover

* Remove unnecessary clone

* Add common da function to primitives

* Reduce l1 block cache

* Nits

* Dprint toml

* Use interval instead of sleep

* Refactor

* Remove unnecessary sort

* Naming nit

* Nits
  • Loading branch information
ercecan authored Jul 12, 2024
1 parent df685ea commit 9848362
Show file tree
Hide file tree
Showing 21 changed files with 1,045 additions and 564 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/citrea/provers/risc0/guest-bitcoin/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/citrea/src/rollup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint {
Some(prover_service),
Some(prover_config),
code_commitment,
rollup_config.sync_blocks_count,
)?;

Ok(Prover {
Expand Down
187 changes: 106 additions & 81 deletions bin/citrea/tests/e2e/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sov_rollup_interface::da::{DaData, DaSpec};
use sov_rollup_interface::rpc::{ProofRpcResponse, SoftConfirmationStatus};
use sov_rollup_interface::services::da::DaService;
use sov_stf_runner::ProverConfig;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;
use tokio::time::sleep;

Expand Down Expand Up @@ -1281,14 +1282,14 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
let prover_node_port = prover_node_port_rx.await.unwrap();
let prover_node_test_client = make_test_client(prover_node_port).await;

// prover should not have any blocks saved
assert_eq!(prover_node_test_client.eth_block_number().await, 0);

// publish 3 soft confirmations, no commitment should be sent
for _ in 0..3 {
seq_test_client.send_publish_batch_request().await;
}

// prover should not have any blocks saved
assert_eq!(prover_node_test_client.eth_block_number().await, 0);

// start l1 height = 1, end = 2
seq_test_client.send_publish_batch_request().await;

Expand All @@ -1307,28 +1308,24 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
)
.await;

// prover should have synced all 4 l2 blocks
wait_for_l2_block(&prover_node_test_client, 4, None).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 4);
// prover should have synced all 6 l2 blocks
// ps there are 6 blocks because:
// when a new proof is submitted in mock da a new empty da block is published
// and for every empty da block sequencer publishes a new empty soft confirmation in order to not skip a block
wait_for_l2_block(&prover_node_test_client, 6, None).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

seq_test_client.send_publish_batch_request().await;

da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 4, None).await;
// Still should have 4 blocks there are no commitments yet
wait_for_prover_l1_height(
&prover_node_test_client,
4,
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;
assert_eq!(prover_node_test_client.eth_block_number().await, 4);

// Still should have 4 blocks there are no commitments yet
assert_eq!(prover_node_test_client.eth_block_number().await, 4);

da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 4, None).await;
wait_for_l1_block(&da_service, 5, None).await;

seq_test_client.send_publish_batch_request().await;

// wait here until we see from prover's rpc that it finished proving
Expand All @@ -1338,13 +1335,14 @@ async fn test_prover_sync_with_commitments() -> Result<(), anyhow::Error> {
Some(Duration::from_secs(DEFAULT_PROOF_WAIT_DURATION)),
)
.await;

wait_for_l2_block(&seq_test_client, 8, None).await;
assert_eq!(seq_test_client.eth_block_number().await, 8);
// Should now have 8 blocks = 2 commitments of blocks 1-4 and 5-9
// there is an extra soft confirmation due to the prover publishing a proof. This causes
// a new MockDa block, which in turn causes the sequencer to publish an extra soft confirmation
// becase it must not skip blocks.
assert_eq!(prover_node_test_client.eth_block_number().await, 4);

wait_for_l2_block(&prover_node_test_client, 8, None).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 8);
// on the 8th DA block, we should have a proof
let mut blobs = da_service.get_block_at(4).await.unwrap().blobs;

Expand Down Expand Up @@ -1401,45 +1399,51 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
let seq_test_client = make_test_client(seq_port).await;

let (prover_node_port_tx, prover_node_port_rx) = tokio::sync::oneshot::channel();
let (thread_kill_sender, thread_kill_receiver) = std::sync::mpsc::channel();

let da_db_dir_cloned = da_db_dir.clone();
let prover_db_dir_cloned = prover_db_dir.clone();
let prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_db_dir_cloned,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;

let _handle = std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let _prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_db_dir_cloned,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;
});
});
thread_kill_receiver.recv().unwrap();
});

let prover_node_port = prover_node_port_rx.await.unwrap();
let prover_node_test_client = make_test_client(prover_node_port).await;

// prover should not have any blocks saved
assert_eq!(prover_node_test_client.eth_block_number().await, 0);
// publish 3 soft confirmations, no commitment should be sent
for _ in 0..3 {
seq_test_client.send_publish_batch_request().await;
}
wait_for_l2_block(&seq_test_client, 3, None).await;

// prover should not have any blocks saved
assert_eq!(prover_node_test_client.eth_block_number().await, 0);

da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 2, None).await;

seq_test_client.send_publish_batch_request().await;
wait_for_l2_block(&seq_test_client, 4, None).await;

// sequencer commitment should be sent
da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 3, None).await;
Expand All @@ -1459,76 +1463,98 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
// prover should have synced all 4 l2 blocks
assert_eq!(prover_node_test_client.eth_block_number().await, 4);

prover_node_task.abort();
// prover_node_task.abort();
thread_kill_sender.send("kill").unwrap();

sleep(Duration::from_secs(1)).await;

let _ = copy_dir_recursive(&prover_db_dir, &storage_dir.path().join("prover_copy"));
sleep(Duration::from_secs(1)).await;

// Reopen prover with the new path
let (prover_node_port_tx, prover_node_port_rx) = tokio::sync::oneshot::channel();
let (thread_kill_sender, thread_kill_receiver) = std::sync::mpsc::channel();

let prover_copy_db_dir = storage_dir.path().join("prover_copy");
let da_db_dir_cloned = da_db_dir.clone();
let prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_copy_db_dir,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;

let _handle = std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let _prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_copy_db_dir,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;
});
});

thread_kill_receiver.recv().unwrap();
});

let prover_node_port = prover_node_port_rx.await.unwrap();
let prover_node_test_client = make_test_client(prover_node_port).await;

seq_test_client.send_publish_batch_request().await;
wait_for_l2_block(&seq_test_client, 6, None).await;

// Still should have 4 blocks there are no commitments yet
assert_eq!(prover_node_test_client.eth_block_number().await, 4);
wait_for_l2_block(&prover_node_test_client, 6, None).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 6);

prover_node_task.abort();
thread_kill_sender.send("kill").unwrap();
sleep(Duration::from_secs(2)).await;

seq_test_client.send_publish_batch_request().await;
seq_test_client.send_publish_batch_request().await;
wait_for_l2_block(&seq_test_client, 8, None).await;

let _ = copy_dir_recursive(&prover_db_dir, &storage_dir.path().join("prover_copy2"));

sleep(Duration::from_secs(2)).await;
// Reopen prover with the new path
let (prover_node_port_tx, prover_node_port_rx) = tokio::sync::oneshot::channel();

let (thread_kill_sender, thread_kill_receiver) = std::sync::mpsc::channel();
let prover_copy2_dir_cloned = storage_dir.path().join("prover_copy2");
let da_db_dir_cloned = da_db_dir.clone();
let prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_copy2_dir_cloned,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;

let _handle = std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let _prover_node_task = tokio::spawn(async move {
start_rollup(
prover_node_port_tx,
GenesisPaths::from_dir(TEST_DATA_GENESIS_PATH),
Some(ProverConfig::default()),
NodeMode::Prover(seq_port),
prover_copy2_dir_cloned,
da_db_dir_cloned,
4,
true,
None,
None,
Some(true),
DEFAULT_DEPOSIT_MEMPOOL_FETCH_LIMIT,
)
.await;
});
});

thread_kill_receiver.recv().unwrap();
});

let prover_node_port = prover_node_port_rx.await.unwrap();
let prover_node_test_client = make_test_client(prover_node_port).await;

sleep(Duration::from_secs(2)).await;
// Publish a DA to force prover to process new blocks
da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 6, None).await;
Expand All @@ -1537,15 +1563,14 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
// and starts proving the second commitment.
wait_for_l2_block(&prover_node_test_client, 8, Some(Duration::from_secs(300))).await;
assert_eq!(prover_node_test_client.eth_block_number().await, 8);

sleep(Duration::from_secs(1)).await;
seq_test_client.send_publish_batch_request().await;
wait_for_l2_block(&seq_test_client, 9, None).await;

da_service.publish_test_block().await.unwrap();
wait_for_l1_block(&da_service, 7, None).await;
sleep(Duration::from_secs(1)).await;
// Commitment is sent
wait_for_l1_block(&da_service, 8, None).await;

// wait here until we see from prover's rpc that it finished proving
wait_for_prover_l1_height(
&prover_node_test_client,
Expand All @@ -1562,7 +1587,7 @@ async fn test_reopen_prover() -> Result<(), anyhow::Error> {
assert!(prover_node_test_client.eth_block_number().await >= 8);
// TODO: Also test with multiple commitments in single Mock DA Block
seq_task.abort();
prover_node_task.abort();
thread_kill_sender.send("kill").unwrap();
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion crates/fullnode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository.workspace = true

[dependencies]
# Citrea Deps
citrea-primitives = { path = "../primitives", features = ["native"] }
sequencer-client = { path = "../sequencer-client" }
shared-backup-db = { path = "../shared-backup-db" }

Expand All @@ -28,7 +29,6 @@ borsh = { workspace = true }
futures = { workspace = true }
hex = { workspace = true }
jsonrpsee = { workspace = true }
lru = { workspace = true }
rand = { workspace = true }
rs_merkle = { workspace = true }
serde = { workspace = true }
Expand Down
Loading

0 comments on commit 9848362

Please sign in to comment.