Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(katana): retain transactions in pool until mined #2630

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 43 additions & 32 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
// TODO: remove the messaging feature flag
// TODO: move the tasks to a separate module

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use block_producer::BlockProductionError;
use futures::channel::mpsc::Receiver;
use futures::stream::{Fuse, Stream, StreamExt};
use futures::stream::StreamExt;
use katana_executor::ExecutorFactory;
use katana_pool::ordering::PoolOrd;
use katana_pool::pending::PendingTransactions;
use katana_pool::{TransactionPool, TxPool};
use katana_primitives::transaction::ExecutableTxWithHash;
use katana_primitives::Felt;
use tracing::{error, info};

use self::block_producer::BlockProducer;
Expand All @@ -30,24 +27,40 @@ pub(crate) const LOG_TARGET: &str = "node";
/// to construct a new block.
#[must_use = "BlockProductionTask does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct BlockProductionTask<EF: ExecutorFactory> {
pub struct BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
/// creates new blocks
pub(crate) block_producer: BlockProducer<EF>,
/// the miner responsible to select transactions from the `pool´
pub(crate) miner: TransactionMiner,
pub(crate) miner: TransactionMiner<O>,
/// the pool that holds all transactions
pub(crate) pool: TxPool,
/// Metrics for recording the service operations
metrics: BlockProducerMetrics,
}

impl<EF: ExecutorFactory> BlockProductionTask<EF> {
pub fn new(pool: TxPool, miner: TransactionMiner, block_producer: BlockProducer<EF>) -> Self {
impl<EF, O> BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pub fn new(
pool: TxPool,
miner: TransactionMiner<O>,
block_producer: BlockProducer<EF>,
) -> Self {
Self { block_producer, miner, pool, metrics: BlockProducerMetrics::default() }
}
}

impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
impl<EF, O> Future for BlockProductionTask<EF, O>
where
EF: ExecutorFactory,
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
type Output = Result<(), BlockProductionError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -65,6 +78,9 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
let steps_used = outcome.stats.cairo_steps_used;
this.metrics.l1_gas_processed_total.increment(gas_used as u64);
this.metrics.cairo_steps_processed_total.increment(steps_used as u64);

// remove mined transactions from the pool
this.pool.remove_transactions(&outcome.txs);
}

Err(error) => {
Expand All @@ -74,7 +90,7 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {
}
}

if let Poll::Ready(pool_txs) = this.miner.poll(&this.pool, cx) {
if let Poll::Ready(pool_txs) = this.miner.poll(cx) {
// miner returned a set of transaction that we feed to the producer
this.block_producer.queue(pool_txs);
} else {
Expand All @@ -89,37 +105,32 @@ impl<EF: ExecutorFactory> Future for BlockProductionTask<EF> {

/// The type which takes the transaction from the pool and feeds them to the block producer.
#[derive(Debug)]
pub struct TransactionMiner {
/// stores whether there are pending transacions (if known)
has_pending_txs: Option<bool>,
/// Receives hashes of transactions that are ready from the pool
rx: Fuse<Receiver<Felt>>,
pub struct TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pending_txs: PendingTransactions<ExecutableTxWithHash, O>,
}

impl TransactionMiner {
pub fn new(rx: Receiver<Felt>) -> Self {
Self { rx: rx.fuse(), has_pending_txs: None }
impl<O> TransactionMiner<O>
where
O: PoolOrd<Transaction = ExecutableTxWithHash>,
{
pub fn new(pending_txs: PendingTransactions<ExecutableTxWithHash, O>) -> Self {
Self { pending_txs }
}

fn poll(&mut self, pool: &TxPool, cx: &mut Context<'_>) -> Poll<Vec<ExecutableTxWithHash>> {
// drain the notification stream
while let Poll::Ready(Some(_)) = Pin::new(&mut self.rx).poll_next(cx) {
self.has_pending_txs = Some(true);
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Vec<ExecutableTxWithHash>> {
let mut transactions = Vec::new();

if self.has_pending_txs == Some(false) {
return Poll::Pending;
while let Poll::Ready(Some(tx)) = self.pending_txs.poll_next_unpin(cx) {
transactions.push(tx.tx.as_ref().clone());
}
kariy marked this conversation as resolved.
Show resolved Hide resolved

// take all the transactions from the pool
let transactions =
pool.take_transactions().map(|tx| tx.tx.as_ref().clone()).collect::<Vec<_>>();

if transactions.is_empty() {
return Poll::Pending;
}

self.has_pending_txs = Some(false);
Poll::Ready(transactions)
}
}
7 changes: 3 additions & 4 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
}

fn run_block_production(&self) -> TaskHandle<Result<(), BlockProductionError>> {
let pool = self.pool.clone();
let miner = TransactionMiner::new(pool.add_listener());
// Create a new transaction miner with a subscription to the pool's pending transactions.
let miner = TransactionMiner::new(self.pool.pending_transactions());
let block_producer = self.block_producer.clone();

let service = BlockProductionTask::new(pool, miner, block_producer);
let service = BlockProductionTask::new(self.pool.clone(), miner, block_producer);
self.task_spawner.build_task().name("Block production").spawn(service)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/katana/pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ katana-primitives.workspace = true
katana-provider.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "sync" ] }
tracing.workspace = true

[dev-dependencies]
futures-util.workspace = true
rand.workspace = true
tokio.workspace = true
14 changes: 10 additions & 4 deletions crates/katana/pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod ordering;
pub mod pending;
pub mod pool;
pub mod subscription;
pub mod tx;
pub mod validation;

Expand All @@ -10,8 +12,9 @@ use std::sync::Arc;
use futures::channel::mpsc::Receiver;
use katana_primitives::transaction::{ExecutableTxWithHash, TxHash};
use ordering::{FiFo, PoolOrd};
use pending::PendingTransactions;
use pool::Pool;
use tx::{PendingTx, PoolTransaction};
use tx::PoolTransaction;
use validation::error::InvalidTransactionError;
use validation::stateful::TxValidator;
use validation::Validator;
Expand Down Expand Up @@ -44,9 +47,9 @@ pub trait TransactionPool {
/// Add a new transaction to the pool.
fn add_transaction(&self, tx: Self::Transaction) -> PoolResult<TxHash>;

fn take_transactions(
&self,
) -> impl Iterator<Item = PendingTx<Self::Transaction, Self::Ordering>>;
/// Returns a [`Stream`](futures::Stream) which yields pending transactions - transactions that
/// can be executed - from the pool.
fn pending_transactions(&self) -> PendingTransactions<Self::Transaction, Self::Ordering>;

/// Check if the pool contains a transaction with the given hash.
fn contains(&self, hash: TxHash) -> bool;
Expand All @@ -56,6 +59,9 @@ pub trait TransactionPool {

fn add_listener(&self) -> Receiver<TxHash>;

/// Removes a list of transactions from the pool according to their hashes.
fn remove_transactions(&self, hashes: &[TxHash]);

kariy marked this conversation as resolved.
Show resolved Hide resolved
/// Get the total number of transactions in the pool.
fn size(&self) -> usize;

Expand Down
66 changes: 38 additions & 28 deletions crates/katana/pool/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,16 @@ impl<T> Default for TipOrdering<T> {
#[cfg(test)]
mod tests {

use futures::StreamExt;

use crate::ordering::{self, FiFo};
use crate::pool::test_utils::*;
use crate::tx::PoolTransaction;
use crate::validation::NoopValidator;
use crate::{Pool, TransactionPool};

#[test]
fn fifo_ordering() {
#[tokio::test]
async fn fifo_ordering() {
// Create mock transactions
let txs = [PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new(), PoolTx::new()];

Expand All @@ -145,16 +147,17 @@ mod tests {
});

// Get pending transactions
let pendings = pool.take_transactions().collect::<Vec<_>>();
let mut pendings = pool.pending_transactions();

// Assert that the transactions are in the order they were added (first to last)
pendings.iter().zip(txs).for_each(|(pending, tx)| {
for tx in txs {
let pending = pendings.next().await.unwrap();
assert_eq!(pending.tx.as_ref(), &tx);
});
}
}

#[test]
fn tip_based_ordering() {
#[tokio::test]
async fn tip_based_ordering() {
// Create mock transactions with different tips and in random order
let txs = [
PoolTx::new().with_tip(2),
Expand All @@ -176,36 +179,43 @@ mod tests {
let _ = pool.add_transaction(tx.clone());
});

// Get pending transactions
let pending = pool.take_transactions().collect::<Vec<_>>();
assert_eq!(pending.len(), txs.len());
let mut pending = pool.pending_transactions();

// Assert that the transactions are ordered by tip (highest to lowest)
assert_eq!(pending[0].tx.tip(), 7);
assert_eq!(pending[0].tx.hash(), txs[8].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 7);
assert_eq!(tx.tx.hash(), txs[8].hash());

assert_eq!(pending[1].tx.tip(), 6);
assert_eq!(pending[1].tx.hash(), txs[2].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 6);
assert_eq!(tx.tx.hash(), txs[2].hash());

assert_eq!(pending[2].tx.tip(), 5);
assert_eq!(pending[2].tx.hash(), txs[6].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 5);
assert_eq!(tx.tx.hash(), txs[6].hash());

assert_eq!(pending[3].tx.tip(), 4);
assert_eq!(pending[3].tx.hash(), txs[7].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 4);
assert_eq!(tx.tx.hash(), txs[7].hash());

assert_eq!(pending[4].tx.tip(), 3);
assert_eq!(pending[4].tx.hash(), txs[3].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 3);
assert_eq!(tx.tx.hash(), txs[3].hash());

assert_eq!(pending[5].tx.tip(), 2);
assert_eq!(pending[5].tx.hash(), txs[0].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[0].hash());

assert_eq!(pending[6].tx.tip(), 2);
assert_eq!(pending[6].tx.hash(), txs[4].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[4].hash());

assert_eq!(pending[7].tx.tip(), 2);
assert_eq!(pending[7].tx.hash(), txs[5].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 2);
assert_eq!(tx.tx.hash(), txs[5].hash());

assert_eq!(pending[8].tx.tip(), 1);
assert_eq!(pending[8].tx.hash(), txs[1].hash());
let tx = pending.next().await.unwrap();
assert_eq!(tx.tx.tip(), 1);
assert_eq!(tx.tx.hash(), txs[1].hash());
}
}
Loading
Loading