Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow awaiting payload in progress #11823

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/e2e-test-utils/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<E: EngineTypes> PayloadTestContext<E> {
) -> eyre::Result<E::PayloadBuilderAttributes> {
self.timestamp += 1;
let attributes: E::PayloadBuilderAttributes = attributes_generator(self.timestamp);
self.payload_builder.new_payload(attributes.clone()).await.unwrap();
self.payload_builder.send_new_payload(attributes.clone()).await.unwrap()?;
Ok(attributes)
}

Expand Down
9 changes: 4 additions & 5 deletions crates/engine/local/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::EngineTypes;
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes,
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes,
};
use reth_provider::{BlockReader, ChainSpecProvider};
use reth_rpc_types_compat::engine::payload::block_to_payload;
Expand Down Expand Up @@ -202,10 +202,9 @@ where

let payload_id = res.payload_id.ok_or_eyre("No payload id")?;

// wait for some time to let the payload be built
tokio::time::sleep(Duration::from_millis(200)).await;

let Some(Ok(payload)) = self.payload_builder.best_payload(payload_id).await else {
let Some(Ok(payload)) =
self.payload_builder.resolve_kind(payload_id, PayloadKind::WaitForPending).await
else {
eyre::bail!("No payload")
};

Expand Down
15 changes: 12 additions & 3 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use reth_chainspec::{ChainSpec, EthereumHardforks};
use reth_payload_builder::{
database::CachedReads, KeepPayloadJobAlive, PayloadId, PayloadJob, PayloadJobGenerator,
};
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError};
use reth_payload_primitives::{
BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind,
};
use reth_primitives::{
constants::{EMPTY_WITHDRAWALS, RETH_CLIENT_VERSION, SLOT_DURATION},
proofs, BlockNumberOrTag, SealedBlock, Withdrawals,
Expand Down Expand Up @@ -473,7 +475,10 @@ where
Ok(self.config.attributes.clone())
}

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve_kind(
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let best_payload = self.best_payload.take();

if best_payload.is_none() && self.pending_block.is_none() {
Expand Down Expand Up @@ -529,7 +534,11 @@ where
};
}

let fut = ResolveBestPayload { best_payload, maybe_better, empty_payload };
let fut = ResolveBestPayload {
best_payload,
maybe_better,
empty_payload: empty_payload.filter(|_| kind != PayloadKind::WaitForPending),
};

(fut, KeepPayloadJobAlive::No)
}
Expand Down
6 changes: 3 additions & 3 deletions crates/payload/builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! use std::pin::Pin;
//! use std::task::{Context, Poll};
//! use alloy_primitives::U256;
//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator};
//! use reth_payload_builder::{EthBuiltPayload, PayloadBuilderError, KeepPayloadJobAlive, EthPayloadBuilderAttributes, PayloadJob, PayloadJobGenerator, PayloadKind};
//! use reth_primitives::{Block, Header};
//!
//! /// The generator type that creates new jobs that builds empty blocks.
Expand Down Expand Up @@ -73,7 +73,7 @@
//! Ok(self.attributes.clone())
//! }
//!
//! fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
//! fn resolve_kind(&mut self, _kind: PayloadKind) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
//! let payload = self.best_payload();
//! (futures_util::future::ready(payload), KeepPayloadJobAlive::No)
//! }
Expand Down Expand Up @@ -112,7 +112,7 @@ pub mod noop;
pub mod test_utils;

pub use alloy_rpc_types::engine::PayloadId;
pub use reth_payload_primitives::PayloadBuilderError;
pub use reth_payload_primitives::{PayloadBuilderError, PayloadKind};
pub use service::{
PayloadBuilderHandle, PayloadBuilderService, PayloadServiceCommand, PayloadStore,
};
Expand Down
2 changes: 1 addition & 1 deletion crates/payload/builder/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
}
PayloadServiceCommand::BestPayload(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::PayloadAttributes(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, tx) => tx.send(None).ok(),
PayloadServiceCommand::Resolve(_, _, tx) => tx.send(None).ok(),
PayloadServiceCommand::Subscribe(_) => None,
};
}
Expand Down
79 changes: 38 additions & 41 deletions crates/payload/builder/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloy_rpc_types::engine::PayloadId;
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_payload_primitives::{
BuiltPayload, Events, PayloadBuilder, PayloadBuilderAttributes, PayloadBuilderError,
PayloadEvents, PayloadTypes,
PayloadEvents, PayloadKind, PayloadTypes,
};
use reth_provider::CanonStateNotification;
use std::{
Expand Down Expand Up @@ -45,11 +45,20 @@ where
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
pub async fn resolve_kind(
&self,
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve_kind(id, kind).await
}

/// Resolves the payload job and returns the best payload that has been built so far.
pub async fn resolve(
&self,
id: PayloadId,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
self.inner.resolve(id).await
self.resolve_kind(id, PayloadKind::Earliest).await
}

/// Returns the best payload for the given identifier.
Expand Down Expand Up @@ -110,16 +119,13 @@ where
type PayloadType = T;
type Error = PayloadBuilderError;

async fn send_and_resolve_payload(
fn send_new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadFuture<<Self::PayloadType as PayloadTypes>::BuiltPayload>, Self::Error> {
let rx = self.send_new_payload(attr);
let id = rx.await??;

) -> Receiver<Result<PayloadId, Self::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::Resolve(id, tx));
rx.await?.ok_or(PayloadBuilderError::MissingPayload)
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}

/// Note: this does not resolve the job if it's still in progress.
Expand All @@ -132,21 +138,17 @@ where
rx.await.ok()?
}

fn send_new_payload(
async fn resolve_kind(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Receiver<Result<PayloadId, Self::Error>> {
id: PayloadId,
kind: PayloadKind,
) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
rx
}

/// Note: if there's already payload in progress with same identifier, it will be returned.
async fn new_payload(
&self,
attr: <Self::PayloadType as PayloadTypes>::PayloadBuilderAttributes,
) -> Result<PayloadId, Self::Error> {
self.send_new_payload(attr).await?
self.to_service.send(PayloadServiceCommand::Resolve(id, kind, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}

async fn subscribe(&self) -> Result<PayloadEvents<Self::PayloadType>, Self::Error> {
Expand All @@ -168,19 +170,6 @@ where
Self { to_service }
}

/// Resolves the payload job and returns the best payload that has been built so far.
///
/// Note: depending on the installed [`PayloadJobGenerator`], this may or may not terminate the
/// job, See [`PayloadJob::resolve`].
async fn resolve(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
let (tx, rx) = oneshot::channel();
self.to_service.send(PayloadServiceCommand::Resolve(id, tx)).ok()?;
match rx.await.transpose()? {
Ok(fut) => Some(fut.await),
Err(e) => Some(Err(e.into())),
}
}

/// Returns the payload attributes associated with the given identifier.
///
/// Note: this returns the attributes of the payload and does not resolve the job.
Expand Down Expand Up @@ -296,11 +285,15 @@ where

/// Returns the best payload for the given identifier that has been built so far and terminates
/// the job if requested.
fn resolve(&mut self, id: PayloadId) -> Option<PayloadFuture<T::BuiltPayload>> {
fn resolve(
&mut self,
id: PayloadId,
kind: PayloadKind,
) -> Option<PayloadFuture<T::BuiltPayload>> {
trace!(%id, "resolving payload job");

let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
let (fut, keep_alive) = self.payload_jobs[job].0.resolve();
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);

if keep_alive == KeepPayloadJobAlive::No {
let (_, id) = self.payload_jobs.swap_remove(job);
Expand Down Expand Up @@ -437,8 +430,8 @@ where
let attributes = this.payload_attributes(id);
let _ = tx.send(attributes);
}
PayloadServiceCommand::Resolve(id, tx) => {
let _ = tx.send(this.resolve(id));
PayloadServiceCommand::Resolve(id, strategy, tx) => {
let _ = tx.send(this.resolve(id, strategy));
}
PayloadServiceCommand::Subscribe(tx) => {
let new_rx = this.payload_events.subscribe();
Expand Down Expand Up @@ -469,7 +462,11 @@ pub enum PayloadServiceCommand<T: PayloadTypes> {
oneshot::Sender<Option<Result<T::PayloadBuilderAttributes, PayloadBuilderError>>>,
),
/// Resolve the payload and return the payload
Resolve(PayloadId, oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>),
Resolve(
PayloadId,
/* kind: */ PayloadKind,
oneshot::Sender<Option<PayloadFuture<T::BuiltPayload>>>,
),
/// Payload service events
Subscribe(oneshot::Sender<broadcast::Receiver<Events<T>>>),
}
Expand All @@ -489,7 +486,7 @@ where
Self::PayloadAttributes(f0, f1) => {
f.debug_tuple("PayloadAttributes").field(&f0).field(&f1).finish()
}
Self::Resolve(f0, _f1) => f.debug_tuple("Resolve").field(&f0).finish(),
Self::Resolve(f0, f1, _f2) => f.debug_tuple("Resolve").field(&f0).field(&f1).finish(),
Self::Subscribe(f0) => f.debug_tuple("Subscribe").field(&f0).finish(),
}
}
Expand Down
7 changes: 5 additions & 2 deletions crates/payload/builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{

use alloy_primitives::U256;
use reth_chain_state::ExecutedBlock;
use reth_payload_primitives::{PayloadBuilderError, PayloadTypes};
use reth_payload_primitives::{PayloadBuilderError, PayloadKind, PayloadTypes};
use reth_primitives::Block;
use reth_provider::CanonStateNotification;
use std::{
Expand Down Expand Up @@ -96,7 +96,10 @@ impl PayloadJob for TestPayloadJob {
Ok(self.attr.clone())
}

fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
fn resolve_kind(
&mut self,
_kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
let fut = futures_util::future::ready(self.best_payload());
(fut, KeepPayloadJobAlive::No)
}
Expand Down
20 changes: 18 additions & 2 deletions crates/payload/builder/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Trait abstractions used by the payload crate.

use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError};
use reth_payload_primitives::{
BuiltPayload, PayloadBuilderAttributes, PayloadBuilderError, PayloadKind,
};
use reth_provider::CanonStateNotification;
use std::future::Future;

Expand Down Expand Up @@ -53,7 +55,21 @@ pub trait PayloadJob: Future<Output = Result<(), PayloadBuilderError>> + Send +
/// If this returns [`KeepPayloadJobAlive::Yes`], then the [`PayloadJob`] will be polled
/// once more. If this returns [`KeepPayloadJobAlive::No`] then the [`PayloadJob`] will be
/// dropped after this call.
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);
///
/// The [`PayloadKind`] determines how the payload should be resolved in the
/// `ResolvePayloadFuture`. [`PayloadKind::Earliest`] should return the earliest available
/// payload (as fast as possible), e.g. racing an empty payload job against a pending job if
/// there's no payload available yet. [`PayloadKind::WaitForPending`] is allowed to wait
/// until a built payload is available.
fn resolve_kind(
&mut self,
kind: PayloadKind,
) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive);

/// Resolves the payload as fast as possible.
fn resolve(&mut self) -> (Self::ResolvePayloadFuture, KeepPayloadJobAlive) {
self.resolve_kind(PayloadKind::Earliest)
}
}

/// Whether the payload job should be kept alive or terminated after the payload was requested by
Expand Down
22 changes: 22 additions & 0 deletions crates/payload/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,28 @@ pub enum EngineApiMessageVersion {
V4,
}

/// Determines how we should choose the payload to return.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PayloadKind {
/// Returns the next best available payload (the earliest available payload).
/// This does not wait for a real for pending job to finish if there's no best payload yet and
/// is allowed to race various payload jobs (empty, pending best) against each other and
/// returns whichever job finishes faster.
///
/// This should be used when it's more important to return a valid payload as fast as possible.
/// For example, the engine API timeout for `engine_getPayload` is 1s and clients should rather
/// return an empty payload than indefinitely waiting for the pending payload job to finish and
/// risk missing the deadline.
#[default]
Earliest,
/// Only returns once we have at least one built payload.
///
/// Compared to [`PayloadKind::Earliest`] this does not race an empty payload job against the
/// already in progress one, and returns the best available built payload or awaits the job in
/// progress.
WaitForPending,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading