Skip to content

Commit

Permalink
feat: allow awaiting payload in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr committed Oct 16, 2024
1 parent 7b1b1fc commit 79c6b2b
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 79 deletions.
2 changes: 1 addition & 1 deletion crates/e2e-test-utils/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<E: EngineTypes> PayloadTestContext<E> {
) -> eyre::Result<E::PayloadBuilderAttributes> {
self.timestamp += 1;
let attributes: E::PayloadBuilderAttributes = attributes_generator(self.timestamp);
self.payload_builder.new_payload(attributes.clone()).await.unwrap();
self.payload_builder.send_new_payload(attributes.clone()).await.unwrap()?;
Ok(attributes)
}

Expand Down
6 changes: 2 additions & 4 deletions crates/engine/local/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ where
tokio::select! {
// Wait for the interval or the pool to receive a transaction
_ = &mut self.mode => {
println!("RECEIVED");
if let Err(e) = self.advance().await {
error!(target: "engine::local", "Error advancing the chain: {:?}", e);
}
Expand Down Expand Up @@ -202,10 +203,7 @@ where

let payload_id = res.payload_id.ok_or_eyre("No payload id")?;

// wait for some time to let the payload be built
tokio::time::sleep(Duration::from_millis(200)).await;

let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else {
let Some(Ok(payload)) = self.payload_builder.resolve(payload_id, true).await else {
eyre::bail!("No payload")
};

Expand Down
12 changes: 10 additions & 2 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,10 @@ where
Ok(self.config.attributes.clone())
}

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve(
&mut self,
wait_for_pending: bool,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let best_payload = self.best_payload.take();

if best_payload.is_none() && self.pending_block.is_none() {
Expand Down Expand Up @@ -529,7 +532,8 @@ where
};
}

let fut = ResolveBestPayload { best_payload, maybe_better, empty_payload };
let fut =
ResolveBestPayload { best_payload, maybe_better, empty_payload, wait_for_pending };

(fut, KeepPayloadJobAlive::No)
}
Expand All @@ -552,6 +556,8 @@ pub struct ResolveBestPayload<Payload> {
pub maybe_better: Option<PendingPayload<Payload>>,
/// The empty payload building job in progress, if any.
pub empty_payload: Option<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
/// Whether to wait for the pending payload to finish.
pub wait_for_pending: bool,
}

impl<Payload> ResolveBestPayload<Payload> {
Expand All @@ -577,6 +583,8 @@ where
debug!(target: "payload_builder", "resolving better payload");
return Poll::Ready(Ok(payload))
}
} else if this.wait_for_pending {
return Poll::Pending
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/payload/builder/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
}
PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, _, tx) => tx.send(None).ok(),
PayloadServiceCommand::Subscribe(_) => None,
};
}
Expand Down
69 changes: 29 additions & 40 deletions crates/payload/builder/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ where
pub async fn resolve(
&self,
id: PayloadId,
wait_for_pending: bool,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve(id).await
self.inner.resolve(id, wait_for_pending).await
}

/// Returns the best payload for the given identifier.
Expand Down Expand Up @@ -110,16 +111,13 @@ where
type PayloadType = T;
type Error = PayloadBuilderError;

async fn send_and_resolve_payload(
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error> {
let rx = self.send_new_payload(attr);
let id = rx.await??;

) -> Receiver<Result<PayloadId, Self::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx));
rx.await?.ok_or(PayloadBuilderError::MissingPayload)
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}

/// Note: this does not resolve the job if it's still in progress.
Expand All @@ -132,21 +130,17 @@ where
rx.await.ok()?
}

fn send_new_payload(
async fn resolve(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Receiver<Result<PayloadId, Self::Error>> {
id: PayloadId,
wait_for_pending: bool,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}

/// Note: if there's already payload in progress with same identifier, it will be returned.
async fn new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error> {
self.send_new_payload(attr).await?
self.to_service.send(PayloadServiceCommand::Resolve(id, wait_for_pending, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}

async fn subscribe(&self) -> Result<PayloadEvents<Self::PayloadType>, Self::Error> {
Expand All @@ -168,19 +162,6 @@ where
Self { to_service }
}

/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
async fn resolve(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}

/// Returns the payload attributes associated with the given identifier.
///
/// Note: this returns the attributes of the payload and does not resolve the job.
Expand Down Expand Up @@ -296,11 +277,15 @@ where

/// Returns the best payload for the given identifier that has been built so far and terminates
/// the job if requested.
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture<T::BuiltPayload>> {
fn resolve(
&mut self,
id: PayloadId,
wait_for_pending: bool,
) -> Option<PayloadFuture<T::BuiltPayload>> {
trace!(%id, "resolving payload job");

let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve();
let (fut, keep_alive) = self.payload_jobs[job].0.resolve(wait_for_pending);

if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.swap_remove(job);
Expand Down Expand Up @@ -437,8 +422,8 @@ where
let attributes = this.payload_attributes(id);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, tx) => {
let _ = tx.send(this.resolve(id));
PayloadServiceCommand::Resolve(id, wait_for_pending, tx) => {
let _ = tx.send(this.resolve(id, wait_for_pending));
}
PayloadServiceCommand::Subscribe(tx) => {
let new_rx = this.payload_events.subscribe();
Expand Down Expand Up @@ -469,7 +454,11 @@ pub enum PayloadServiceCommand<T: PayloadTypes> {
oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>),
Resolve(
PayloadId,
/* wait_for_pending: */ bool,
oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
),
/// Payload service events
Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
}
Expand All @@ -489,7 +478,7 @@ where
Self::PayloadAttributes(f0, f1) => {
f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
}
Self::Resolve(f0, _f1) => f.debug_tuple("Resolve").field(&f0).finish(),
Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/payload/builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ impl PayloadJob for TestPayloadJob {
Ok(self.attr.clone())
}

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve(
&mut self,
_wait_for_pending: bool,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let fut = futures_util::future::ready(self.best_payload());
(fut, KeepPayloadJobAlive::No)
}
Expand Down
8 changes: 7 additions & 1 deletion crates/payload/builder/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ pub trait PayloadJob: Future<Output = Result<(), PayloadBuilderError>> + Send +
/// If this returns [`KeepPayloadJobAlive::Yes`], then the [`PayloadJob`] will be polled
/// once more. If this returns [`KeepPayloadJobAlive::No`] then the [`PayloadJob`] will be
/// dropped after this call.
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);
///
/// If `wait_for_pending` is provided and there's a payload building job in progress, returned
/// future will first poll it until completion to get a potentially better payload.
fn resolve(
&mut self,
wait_for_pending: bool,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);
}

/// Whether the payload job should be kept alive or terminated after the payload was requested by
Expand Down
35 changes: 11 additions & 24 deletions crates/payload/primitives/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{PayloadBuilderError, PayloadEvents, PayloadTypes};
use crate::{PayloadEvents, PayloadTypes};
use alloy_primitives::{Address, B256, U256};
use alloy_rpc_types::{
engine::{PayloadAttributes as EthPayloadAttributes, PayloadId},
Expand All @@ -7,12 +7,8 @@ use alloy_rpc_types::{
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use reth_chain_state::ExecutedBlock;
use reth_primitives::{SealedBlock, Withdrawals};
use std::{future::Future, pin::Pin};
use tokio::sync::oneshot;

pub(crate) type PayloadFuture<P> =
Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;

/// A type that can request, subscribe to and resolve payloads.
#[async_trait::async_trait]
pub trait PayloadBuilder: Send + Unpin {
Expand All @@ -21,35 +17,26 @@ pub trait PayloadBuilder: Send + Unpin {
/// The error type returned by the builder.
type Error;

/// Sends a message to the service to start building a new payload for the given payload
/// attributes and returns a future that resolves to the payload.
async fn send_and_resolve_payload(
/// Sends a message to the service to start building a new payload for the given payload.
///
/// Returns a receiver that will receive the payload id.
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error>;
) -> oneshot::Receiver<Result<PayloadId, Self::Error>>;

/// Returns the best payload for the given identifier.
async fn best_payload(
&self,
id: PayloadId,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>>;

/// Sends a message to the service to start building a new payload for the given payload.
///
/// This is the same as [`PayloadBuilder::new_payload`] but does not wait for the result
/// and returns the receiver instead
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> oneshot::Receiver<Result<PayloadId, Self::Error>>;

/// Starts building a new payload for the given payload attributes.
///
/// Returns the identifier of the payload.
async fn new_payload(
/// Resolves the payload job and returns the best payload that has been built so far.
async fn resolve(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error>;
id: PayloadId,
wait_for_pending: bool,
) -> Option<Result<<Self::PayloadType as PayloadTypes>::BuiltPayload, Self::Error>>;

/// Sends a message to the service to subscribe to payload events.
/// Returns a receiver that will receive them.
Expand Down
8 changes: 4 additions & 4 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ where
) -> EngineApiResult<EngineT::ExecutionPayloadV1> {
self.inner
.payload_store
.resolve(payload_id)
.resolve(payload_id, false)
.await
.ok_or(EngineApiError::UnknownPayload)?
.map_err(|_| EngineApiError::UnknownPayload)?
Expand Down Expand Up @@ -295,7 +295,7 @@ where
// Now resolve the payload
self.inner
.payload_store
.resolve(payload_id)
.resolve(payload_id, false)
.await
.ok_or(EngineApiError::UnknownPayload)?
.map_err(|_| EngineApiError::UnknownPayload)?
Expand Down Expand Up @@ -330,7 +330,7 @@ where
// Now resolve the payload
self.inner
.payload_store
.resolve(payload_id)
.resolve(payload_id, false)
.await
.ok_or(EngineApiError::UnknownPayload)?
.map_err(|_| EngineApiError::UnknownPayload)?
Expand Down Expand Up @@ -365,7 +365,7 @@ where
// Now resolve the payload
self.inner
.payload_store
.resolve(payload_id)
.resolve(payload_id, false)
.await
.ok_or(EngineApiError::UnknownPayload)?
.map_err(|_| EngineApiError::UnknownPayload)?
Expand Down
5 changes: 4 additions & 1 deletion examples/custom-payload-builder/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ where
Ok(self.config.attributes.clone())
}

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve(
&mut self,
_wait_for_pending: bool,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let payload = self.best_payload();
(futures_util::future::ready(payload), KeepPayloadJobAlive::No)
}
Expand Down

0 comments on commit 79c6b2b

Please sign in to comment.