Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: add get_blocks_result_v1 support to SHiP #2349

Merged
merged 12 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct valid_t {
struct finality_data_t {
uint32_t major_version{light_header_protocol_version_major};
uint32_t minor_version{light_header_protocol_version_minor};
uint32_t active_finalizer_policy_generation{0};
uint32_t active_finalizer_policy_generation{1};
heifner marked this conversation as resolved.
Show resolved Hide resolved
digest_type action_mroot{};
digest_type base_digest{};
};
Expand Down
13 changes: 12 additions & 1 deletion libraries/state_history/abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ extern const char* const state_history_plugin_abi = R"({
},
{
"name": "get_blocks_result_v0", "fields": [
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
{ "name": "prev_block", "type": "block_position?" },
{ "name": "block", "type": "bytes?" },
{ "name": "traces", "type": "bytes?" },
{ "name": "deltas", "type": "bytes?" }
]
},
{
"name": "get_blocks_result_v1", "fields": [
heifner marked this conversation as resolved.
Show resolved Hide resolved
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
Expand Down Expand Up @@ -576,7 +587,7 @@ extern const char* const state_history_plugin_abi = R"({
],
"variants": [
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_request_v1", "get_blocks_ack_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0", "get_blocks_result_v1"] },

{ "name": "action_receipt", "types": ["action_receipt_v0"] },
{ "name": "action_trace", "types": ["action_trace_v0", "action_trace_v1"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,15 @@ datastream<ST>& operator<<(datastream<ST>& ds, const history_context_wrapper_sta

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v0& obj) {
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
fc::raw::pack(ds, obj.prev_block);
history_pack_big_bytes(ds, obj.block);
ds << static_cast<const eosio::state_history::get_blocks_result_base&>(obj);
history_pack_big_bytes(ds, obj.traces);
history_pack_big_bytes(ds, obj.deltas);
return ds;
}

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v1& obj) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
ds << static_cast<const eosio::state_history::get_blocks_result_v0&>(obj);
history_pack_big_bytes(ds, obj.finality_data);
return ds;
}
Expand Down
9 changes: 7 additions & 2 deletions libraries/state_history/include/eosio/state_history/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ struct get_blocks_result_base {
struct get_blocks_result_v0 : get_blocks_result_base {
std::optional<bytes> traces;
std::optional<bytes> deltas;
};

struct get_blocks_result_v1 : get_blocks_result_v0 {
std::optional<bytes> finality_data;
};

using state_request = std::variant<get_status_request_v0, get_blocks_request_v0, get_blocks_request_v1, get_blocks_ack_request_v0>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0, get_blocks_result_v1>;
using get_blocks_request = std::variant<get_blocks_request_v0, get_blocks_request_v1>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0>;
using get_blocks_result = std::variant<get_blocks_result_v0, get_blocks_result_v1>;

} // namespace state_history
} // namespace eosio
Expand All @@ -142,5 +146,6 @@ FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_bl
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_request_v1, (eosio::state_history::get_blocks_request_v0), (fetch_finality_data));
FC_REFLECT(eosio::state_history::get_blocks_ack_request_v0, (num_messages));
FC_REFLECT(eosio::state_history::get_blocks_result_base, (head)(last_irreversible)(this_block)(prev_block)(block));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas)(finality_data));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v1, (eosio::state_history::get_blocks_result_v0), (finality_data));
// clang-format on
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class blocks_request_send_queue_entry : public send_queue_entry_base {
template <typename Session>
class blocks_result_send_queue_entry : public send_queue_entry_base, public std::enable_shared_from_this<blocks_result_send_queue_entry<Session>> {
std::shared_ptr<Session> session;
state_history::get_blocks_result_v0 r;
state_history::get_blocks_result result;
std::vector<char> data;
std::optional<locked_decompress_stream> stream;

Expand Down Expand Up @@ -264,45 +264,72 @@ class blocks_result_send_queue_entry : public send_queue_entry_base, public std:
});
}

// last to be sent
// last to be sent if result is get_blocks_result_v1
void send_finality_data() {
assert(std::holds_alternative<state_history::get_blocks_result_v1>(result));
stream.reset();
send_log(session->get_finality_data_log_entry(r, stream), true, [me=this->shared_from_this()]() {
send_log(session->get_finality_data_log_entry(std::get<state_history::get_blocks_result_v1>(result), stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();
});
}

// second to be sent
// second to be sent if result is get_blocks_result_v1;
// last to be sent if result is get_blocks_result_v0
void send_deltas() {
stream.reset();
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data();
});
std::visit(chain::overloaded{
[&](state_history::get_blocks_result_v0& r) {
send_log(session->get_delta_log_entry(r, stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();}); },
[&](state_history::get_blocks_result_v1& r) {
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data(); }); }},
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
result);
}

// first to be sent
void send_traces() {
stream.reset();
send_log(session->get_trace_log_entry(r, stream), false, [me=this->shared_from_this()]() {
send_log(session->get_trace_log_entry(result, stream), false, [me=this->shared_from_this()]() {
me->send_deltas();
});
}

template<typename T>
void pack_result_base(const T& result, uint32_t variant_index) {
// pack the state_result{get_blocks_result} excluding the fields `traces` and `deltas`,
// and `finality_data` if get_blocks_result_v1
fc::datastream<size_t> ss;

fc::raw::pack(ss, fc::unsigned_int(variant_index)); // pack the variant index of state_result{result}
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(result));
data.resize(ss.tellp());
fc::datastream<char*> ds(data.data(), data.size());
fc::raw::pack(ds, fc::unsigned_int(variant_index)); // pack the variant index of state_result{result}
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(result));
}

public:
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result_v0&& r)
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result&& r)
: session(std::move(s)),
r(std::move(r)) {}
result(std::move(r)) {}

void send_entry() override {
// pack the state_result{get_blocks_result} excluding the fields `traces` and `deltas`
fc::datastream<size_t> ss;
fc::raw::pack(ss, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(r));
data.resize(ss.tellp());
fc::datastream<char*> ds(data.data(), data.size());
fc::raw::pack(ds, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(r));
std::visit(
chain::overloaded{
[&](state_history::get_blocks_result_v0& r) {
static_assert(std::is_same_v<state_history::get_blocks_result_v0, std::variant_alternative_t<1, state_history::state_result>>);
pack_result_base(r, 1); // 1 for variant index of get_blocks_result_v0 in state_result
},
[&](state_history::get_blocks_result_v1& r) {
static_assert(std::is_same_v<state_history::get_blocks_result_v1, std::variant_alternative_t<2, state_history::state_result>>);
pack_result_base(r, 2); // 2 for variant index of get_blocks_result_v1 in state_result
}
heifner marked this conversation as resolved.
Show resolved Hide resolved
},
result
);

async_send(false, data, [me=this->shared_from_this()]() {
me->send_traces();
Expand Down Expand Up @@ -394,31 +421,31 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}
}

uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result& result,
bool has_value,
std::optional<state_history_log>& optional_log,
std::optional<locked_decompress_stream>& buf) {
if (has_value) {
if( optional_log ) {
buf.emplace( optional_log->create_locked_decompress_stream() );
return optional_log->get_unpacked_entry( result.this_block->block_num, *buf );
return std::visit([&](auto& r) { return optional_log->get_unpacked_entry( r.this_block->block_num, *buf ); }, result);
}
}
return 0;
}

uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.traces.has_value(), plugin.get_trace_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.traces.has_value(), plugin.get_trace_log(), buf); }, result);
}

uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.deltas.has_value(), plugin.get_chain_state_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.deltas.has_value(), plugin.get_chain_state_log(), buf); }, result);
}

uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v0& result,
std::optional<locked_decompress_stream>& buf) {
uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v1& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.finality_data.has_value(), plugin.get_finality_data_log(), buf);
}

Expand Down Expand Up @@ -515,7 +542,8 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
current_request = std::move(req);
}

void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
template<typename T> // get_blocks_result_v0 or get_blocks_result_v1
void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, T&& result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
need_to_send_update = true;

result.last_irreversible = plugin.get_last_irreversible();
Expand Down Expand Up @@ -565,8 +593,10 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
result.traces.emplace();
if (request.fetch_deltas && plugin.get_chain_state_log())
result.deltas.emplace();
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace(); // create finality_data (it's an optional field)
if constexpr (std::is_same_v<T, state_history::get_blocks_result_v1>) {
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace();
}
}
}
++to_send_block_num;
Expand Down Expand Up @@ -601,7 +631,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
return !max_messages_in_flight;
}

void send_update(state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
void send_update(const state_history::block_position& head, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
if (no_request_or_not_max_messages_in_flight()) {
session_mgr.pop_entry(false);
return;
Expand All @@ -613,9 +643,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

std::visit(eosio::chain::overloaded{
[&](eosio::state_history::get_blocks_request_v0& request) {
send_update(request, false, result, block, id); },
state_history::get_blocks_result_v0 result;
result.head = head;
send_update(request, false, std::move(result), block, id); },
[&](eosio::state_history::get_blocks_request_v1& request) {
send_update(request, request.fetch_finality_data, result, block, id); } },
state_history::get_blocks_result_v1 result;
result.head = head;
send_update(request, request.fetch_finality_data, std::move(result), block, id); } },
*current_request);
}

Expand All @@ -626,17 +660,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}

auto block_num = block->block_num();
state_history::get_blocks_result_v0 result;
result.head = {block_num, id};
to_send_block_num = std::min(block_num, to_send_block_num);
send_update(std::move(result), block, id);
send_update(state_history::block_position{block_num, id}, block, id);
}

void send_update(bool changed) override {
if (changed || need_to_send_update) {
state_history::get_blocks_result_v0 result;
result.head = plugin.get_block_head();
send_update(std::move(result), nullptr, chain::block_id_type{});
send_update(plugin.get_block_head(), nullptr, chain::block_id_type{});
} else {
session_mgr.pop_entry(false);
}
Expand Down
Loading
Loading