Skip to content

Commit

Permalink
GH-2159 Add accept_block
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Feb 29, 2024
1 parent 80ee495 commit 3c7a014
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 27 deletions.
19 changes: 19 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3409,6 +3409,25 @@ struct controller_impl {
return fork_db.apply<std::optional<block_handle>>(f);
}

template <class BSP>
void accept_block(const BSP& bsp) {
assert(bsp && bsp->block);

// Save the received QC as soon as possible, no matter whether the block itself is valid or not
if constexpr (std::is_same_v<BSP, block_state_ptr>) {
integrate_received_qc_to_block(bsp);
}

auto do_accept_block = [&](auto& forkdb) {
if constexpr (std::is_same_v<BSP, typename std::decay_t<decltype(forkdb.chain_head)>>)
forkdb.add( bsp, mark_valid_t::no, ignore_duplicate_t::no );

emit( accepted_block_header, std::tie(bsp->block, bsp->id()) );
};

fork_db.apply<void>(do_accept_block);
}

template <class BSP>
void push_block( controller::block_report& br,
const BSP& bsp,
Expand Down
7 changes: 5 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,18 @@ namespace eosio::chain {

/**
* @param br returns statistics for block
* @param bt block to push, created by create_block_handle
* @param b block to push, created by create_block_handle
* @param cb calls cb with forked applied transactions for each forked block
* @param trx_lookup user provided lookup function for externally cached transaction_metadata
*/
void push_block( block_report& br,
const block_handle& bt,
const block_handle& b,
const forked_callback_t& cb,
const trx_meta_cache_lookup& trx_lookup );

/// Accept block into fork_database
void accept_block(const block_handle& b);

boost::asio::io_context& get_thread_pool();

const chainbase::database& db()const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&, const std::optional<block_handle>&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const std::optional<block_handle>&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,8 @@ fc::variant read_only::get_block_info(const read_only::get_block_info_params& pa

void read_write::push_block(read_write::push_block_params&& params, next_function<read_write::push_block_results> next) {
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>( std::move(params) ), std::optional<block_id_type>{}, std::optional<block_handle>{});
auto b = std::make_shared<signed_block>( std::move(params) );
app().get_method<incoming::methods::block_sync>()(b, b->calculate_id(), std::optional<block_handle>{});
} catch ( boost::interprocess::bad_alloc& ) {
handle_db_exhaustion();
} catch ( const std::bad_alloc& ) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3905,7 +3905,7 @@ namespace eosio {
if( reason == unlinkable || reason == no_reason ) {
dispatcher->add_unlinkable_block( std::move(block), blk_id );
}
// reason==no_reason means accept_block() return false because we are producing, don't call rejected_block which sends handshake
// reason==no_reason means accept_block() return false which is a fatal error, don't call rejected_block which sends handshake
if( reason != no_reason ) {
sync_master->rejected_block( c, blk_num, sync_manager::closing_mode::handshake );
}
Expand Down
52 changes: 30 additions & 22 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,38 +667,46 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_time_tracker.clear();
}

bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const std::optional<block_handle>& obt) {
auto& chain = chain_plug->chain();
if (in_producing_mode()) {
fc_wlog(_log, "dropped incoming block #${num} id: ${id}", ("num", block->block_num())("id", block_id ? (*block_id).str() : "UNKNOWN"));
return false;
}

// start a new speculative block, speculative start_block may have been interrupted
auto ensure = fc::make_scoped_exit([this]() { schedule_production_loop(); });

bool on_incoming_block(const signed_block_ptr& block, const block_id_type& id, const std::optional<block_handle>& obt) {
auto now = fc::time_point::now();
const auto& id = block_id ? *block_id : block->calculate_id();
auto blk_num = block->block_num();

if (now - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0)) // only log every 1000 during sync
fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id));
auto& chain = chain_plug->chain();
auto blk_num = block->block_num();

_time_tracker.add_idle_time(now);

EOS_ASSERT(block->timestamp < (now + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it: ${id}", ("id", id));
// start a new speculative block, speculative start_block may have been interrupted
auto ensure = fc::make_scoped_exit([this]() {
// avoid schedule_production_loop if in_producing_mode(); speculative block was not interrupted and we don't want to abort block
if (!in_producing_mode()) {
schedule_production_loop();
} else {
_time_tracker.add_other_time();
}
});

// de-dupe here... no point in aborting block if we already know the block
// de-dupe here... no point in aborting block if we already know the block; avoid exception in create_block_handle_future
if (chain.block_exists(id)) {
return true; // return true because the block is valid
}
return true; // return true because the block is accepted
}

EOS_ASSERT(block->timestamp < (now + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it: ${id}", ("id", id));

// start processing of block
std::future<block_handle> btf;
if (!obt) {
btf = chain.create_block_handle_future(id, block);
}

if (in_producing_mode()) {
fc_ilog(_log, "producing, incoming block #${num} id: ${id}", ("num", blk_num)("id", id));
const block_handle& bh = obt ? *obt : btf.get();
chain.accept_block(bh);
return true; // return true because block was accepted
}

if (now - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0)) // only log every 1000 during sync
fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id));

// abort the pending block
abort_block();

Expand All @@ -711,10 +719,10 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

controller::block_report br;
try {
const block_handle& bt = obt ? *obt : btf.get();
const block_handle& bh = obt ? *obt : btf.get();
chain.push_block(
br,
bt,
bh,
[this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
} catch (const guard_exception& e) {
Expand Down Expand Up @@ -1284,7 +1292,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia
ilog("read-only-threads ${s}, max read-only trx time to be enforced: ${t} us", ("s", _ro_thread_pool_size)("t", _ro_max_trx_time_us));

_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
[this](const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const std::optional<block_handle>& obt) {
[this](const signed_block_ptr& block, const block_id_type& block_id, const std::optional<block_handle>& obt) {
return on_incoming_block(block, block_id, obt);
});

Expand Down

0 comments on commit 3c7a014

Please sign in to comment.