From fa5ff2f966b3d416d410f92c3d8828395b10499f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 13 Oct 2023 15:23:22 -0500 Subject: [PATCH 1/8] GH-1677 Avoid stack overflow by posting to ship io_context on recursive send() --- .../include/eosio/state_history_plugin/session.hpp | 12 ++++++++++-- .../state_history_plugin/state_history_plugin.cpp | 3 ++- plugins/state_history_plugin/tests/session_test.cpp | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 6b2e80f1d9..f9d656c1d7 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -57,11 +57,15 @@ class session_manager { private: using entry_ptr = std::unique_ptr; + boost::asio::io_context& ship_io_context; std::set> session_set; bool sending = false; std::deque, entry_ptr>> send_queue; public: + explicit session_manager(boost::asio::io_context& ship_io_context) + : ship_io_context(ship_io_context) {} + void insert(std::shared_ptr s) { session_set.insert(std::move(s)); } @@ -103,8 +107,12 @@ class session_manager { void pop_entry(bool call_send = true) { send_queue.erase(send_queue.begin()); sending = false; - if (call_send || !send_queue.empty()) - send(); + if (call_send || !send_queue.empty()) { + // avoid blowing the stack + boost::asio::post(ship_io_context, [this]() { + send(); + }); + } } void send_updates() { diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index d32842fd93..fc5d649bdb 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -75,7 +75,6 @@ struct state_history_plugin_impl : std::enable_shared_from_this thread_pool; + session_manager session_mgr{thread_pool.get_executor()}; + bool plugin_started = false; static fc::logger& logger() { return _log; } diff --git a/plugins/state_history_plugin/tests/session_test.cpp b/plugins/state_history_plugin/tests/session_test.cpp index b1f13752b0..ee6e22d5aa 100644 --- a/plugins/state_history_plugin/tests/session_test.cpp +++ b/plugins/state_history_plugin/tests/session_test.cpp @@ -100,7 +100,7 @@ struct mock_state_history_plugin { fc::temp_directory log_dir; std::optional log; std::atomic stopping = false; - eosio::session_manager session_mgr; + eosio::session_manager session_mgr{ship_ioc}; constexpr static uint32_t default_frame_size = 1024; From 1705e35b798d4bedfcb1023f64fe7ff1b0fd5dbe Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 13 Oct 2023 15:24:44 -0500 Subject: [PATCH 2/8] GH-1677 Update index_begin_block on log split otherwise next lookup in index is at the wrong offset --- .../state_history/include/eosio/state_history/log.hpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index a3a0da67f7..c1da12cd10 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -74,9 +74,10 @@ struct state_history_log_header { chain::block_id_type block_id = {}; uint64_t payload_size = 0; }; -static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) + - sizeof(state_history_log_header::block_id) + - sizeof(state_history_log_header::payload_size); +static constexpr int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) + + sizeof(state_history_log_header::block_id) + + sizeof(state_history_log_header::payload_size); +static_assert(sizeof(state_history_log_header) == state_history_log_header_serial_size); namespace state_history { struct prune_config { @@ -324,7 +325,7 @@ class state_history_log { catalog.open(log_dir, conf.retained_dir, conf.archive_dir, name); catalog.max_retained_files = conf.max_retained_files; if (_end_block == 0) { - _begin_block = _end_block = catalog.last_block_num() +1; + _index_begin_block = _begin_block = _end_block = catalog.last_block_num() +1; } } }, _config); @@ -925,7 +926,7 @@ class state_history_log { catalog.add(_begin_block, _end_block - 1, log.get_file_path().parent_path(), name); - _begin_block = _end_block; + _index_begin_block = _begin_block = _end_block; using std::swap; swap(new_log_file, log); From 5723fa927987f74f984f6852b7907835742dcda7 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 16 Oct 2023 10:16:44 -0500 Subject: [PATCH 3/8] GH-1677 Close files as rename/swap was not working --- .../state_history/include/eosio/state_history/log.hpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index c1da12cd10..d4fe5cb5ed 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -900,8 +900,8 @@ class state_history_log { fc::path log_file_path = log.get_file_path(); fc::path index_file_path = index.get_file_path(); - fc::datastream new_log_file; - fc::datastream new_index_file; + fc::cfile new_log_file; + fc::cfile new_index_file; fc::path tmp_log_file_path = log_file_path; tmp_log_file_path.replace_extension("log.tmp"); @@ -914,13 +914,14 @@ class state_history_log { try { new_log_file.open(fc::cfile::truncate_rw_mode); new_index_file.open(fc::cfile::truncate_rw_mode); - } catch (...) { wlog("Unable to open new state history log or index file for writing during log spliting, " "continue writing to existing block log file\n"); return; } + new_log_file.close(); + new_index_file.close(); index.close(); log.close(); @@ -937,6 +938,10 @@ class state_history_log { log.set_file_path(log_file_path); index.set_file_path(index_file_path); + + log.open(fc::cfile::update_rw_mode); + log.seek_end(0); + index.open(fc::cfile::create_or_update_rw_mode); } }; // state_history_log From b795fee0687ba1d91b433c71a1ead79d70a4aebf Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 16 Oct 2023 10:33:04 -0500 Subject: [PATCH 4/8] GH-1677 Add assert and error message --- .../state_history/include/eosio/state_history/log.hpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index d4fe5cb5ed..aca21a148c 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -578,10 +578,19 @@ class state_history_log { if (block_num >= _begin_block && block_num < _end_block) { state_history_log_header header; get_entry(block_num, header); + if (chain::block_header::num_from_id(header.block_id) != block_num) { // entry does not match requested block num + elog("id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num)); + assert(false); + } return header.block_id; } return {}; } + if (chain::block_header::num_from_id(*result) != block_num) { // catalog failed to fetch correct block id + elog("id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num)); + assert(false); + } + return result; } From f5b5e64dd6c6f2610c3bb643349fd4bb1f78a994 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 17 Oct 2023 10:42:03 -0500 Subject: [PATCH 5/8] GH-1677 Add additional tests --- .../tests/session_test.cpp | 94 +++++++++++++++++-- tests/ship_streamer_test.py | 6 +- 2 files changed, 91 insertions(+), 9 deletions(-) diff --git a/plugins/state_history_plugin/tests/session_test.cpp b/plugins/state_history_plugin/tests/session_test.cpp index ee6e22d5aa..83c718ab8d 100644 --- a/plugins/state_history_plugin/tests/session_test.cpp +++ b/plugins/state_history_plugin/tests/session_test.cpp @@ -98,20 +98,22 @@ struct mock_state_history_plugin { eosio::state_history::block_position block_head; fc::temp_directory log_dir; - std::optional log; + std::optional trace_log; + std::optional state_log; std::atomic stopping = false; eosio::session_manager session_mgr{ship_ioc}; constexpr static uint32_t default_frame_size = 1024; - std::optional& get_trace_log() { return log; } - std::optional& get_chain_state_log() { return log; } + std::optional& get_trace_log() { return trace_log; } + std::optional& get_chain_state_log() { return state_log; } fc::sha256 get_chain_id() const { return {}; } boost::asio::io_context& get_ship_executor() { return ship_ioc; } void setup_state_history_log(eosio::state_history_log_config conf = {}) { - log.emplace("ship", log_dir.path(), conf); + trace_log.emplace("ship_trace", log_dir.path(), conf); + state_log.emplace("ship_state", log_dir.path(), conf); } fc::logger logger() { return fc::logger::get(DEFAULT_LOGGER); } @@ -125,7 +127,20 @@ struct mock_state_history_plugin { return fc::time_point{}; } - std::optional get_block_id(uint32_t block_num) { return block_id_for(block_num); } + std::optional get_block_id(uint32_t block_num) { + std::optional id; + if( trace_log ) { + id = trace_log->get_block_id( block_num ); + if( id ) + return id; + } + if( state_log ) { + id = state_log->get_block_id( block_num ); + if( id ) + return id; + } + return block_id_for(block_num); + } eosio::state_history::block_position get_block_head() { return block_head; } eosio::state_history::block_position get_last_irreversible() { return block_head; } @@ -346,13 +361,24 @@ struct state_history_test_fixture { header.payload_size += sizeof(uint64_t); } - server.log->write_entry(header, block_id_for(index - 1), [&](auto& f) { + std::unique_lock gt(server.trace_log->_mx); + server.trace_log->write_entry(header, block_id_for(index - 1), [&](auto& f) { f.write((const char*)&type, sizeof(type)); if (type == 1) { f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count)); } f.write(compressed.data(), compressed.size()); }); + gt.unlock(); + std::unique_lock gs(server.state_log->_mx); + server.state_log->write_entry(header, block_id_for(index - 1), [&](auto& f) { + f.write((const char*)&type, sizeof(type)); + if (type == 1) { + f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count)); + } + f.write(compressed.data(), compressed.size()); + }); + gs.unlock(); if (written_data.size() < index) written_data.resize(index); @@ -490,6 +516,62 @@ BOOST_FIXTURE_TEST_CASE(test_session_no_prune, state_history_test_fixture) { FC_LOG_AND_RETHROW() } +BOOST_FIXTURE_TEST_CASE(test_split_log, state_history_test_fixture) { + try { + // setup block head for the server + constexpr uint32_t head = 1023; + eosio::state_history::partition_config conf; + conf.stride = 25; + server.setup_state_history_log(conf); + uint32_t head_block_num = head; + server.block_head = {head_block_num, block_id_for(head_block_num)}; + + // generate the log data used for traces and deltas + uint32_t n = mock_state_history_plugin::default_frame_size; + add_to_log(1, n * sizeof(uint32_t), generate_data(n)); // original data format + add_to_log(2, 0, generate_data(n)); // format to accommodate the compressed size greater than 4GB + add_to_log(3, 1, generate_data(n)); // format to encode decompressed size to avoid decompress entire data upfront. + for (size_t i = 4; i <= head; ++i) { + add_to_log(i, 1, generate_data(n)); + } + + send_request(eosio::state_history::get_blocks_request_v0{.start_block_num = 1, + .end_block_num = UINT32_MAX, + .max_messages_in_flight = UINT32_MAX, + .have_positions = {}, + .irreversible_only = false, + .fetch_block = true, + .fetch_traces = true, + .fetch_deltas = true}); + + eosio::state_history::state_result result; + // we should get 1023 consecutive block result + eosio::chain::block_id_type prev_id; + for (int i = 0; i < head; ++i) { + receive_result(result); + BOOST_REQUIRE(std::holds_alternative(result)); + auto r = std::get(result); + BOOST_REQUIRE_EQUAL(r.head.block_num, server.block_head.block_num); + if (i > 0) { + BOOST_TEST(prev_id.str() == r.prev_block->block_id.str()); + } + prev_id = r.this_block->block_id; + BOOST_REQUIRE(r.traces.has_value()); + BOOST_REQUIRE(r.deltas.has_value()); + auto traces = r.traces.value(); + auto deltas = r.deltas.value(); + auto& data = written_data[i]; + auto data_size = data.size() * sizeof(int32_t); + BOOST_REQUIRE_EQUAL(traces.size(), data_size); + BOOST_REQUIRE_EQUAL(deltas.size(), data_size); + + BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); + BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); + } + } + FC_LOG_AND_RETHROW() +} + BOOST_FIXTURE_TEST_CASE(test_session_with_prune, state_history_test_fixture) { try { // setup block head for the server diff --git a/tests/ship_streamer_test.py b/tests/ship_streamer_test.py index 8134e4dc50..893551101c 100755 --- a/tests/ship_streamer_test.py +++ b/tests/ship_streamer_test.py @@ -78,7 +78,7 @@ def getLatestSnapshot(nodeId): shipNodeNum = 1 specificExtraNodeosArgs={} - specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " # producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin " @@ -214,7 +214,7 @@ def getLatestSnapshot(nodeId): prodNode0.waitForProducer(forkAtProducer) prodNode1.waitForProducer(prodNode1Prod) if nonProdNode.verifyAlive(): - Utils.errorExit("Bridge did not shutdown"); + Utils.errorExit("Bridge did not shutdown") Print("Fork started") prodNode0.waitForProducer("defproducerb") # wait for fork to progress a bit @@ -222,7 +222,7 @@ def getLatestSnapshot(nodeId): Print("Restore fork") Print("Relaunching the non-producing bridge node to connect the producing nodes again") if nonProdNode.verifyAlive(): - Utils.errorExit("Bridge is already running"); + Utils.errorExit("Bridge is already running") if not nonProdNode.relaunch(): Utils.errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted") From a396da9c2ccee402cb3f096307e16cd657e50e03 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Oct 2023 10:33:16 -0500 Subject: [PATCH 6/8] GH-1677 Simplify split_log function as the protection against out of file descriptors seems like overkill and is difficult to follow/verify is correct. --- .../include/eosio/state_history/log.hpp | 40 +------------------ 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index aca21a148c..41533cd740 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -905,32 +905,6 @@ class state_history_log { } void split_log() { - - fc::path log_file_path = log.get_file_path(); - fc::path index_file_path = index.get_file_path(); - - fc::cfile new_log_file; - fc::cfile new_index_file; - - fc::path tmp_log_file_path = log_file_path; - tmp_log_file_path.replace_extension("log.tmp"); - fc::path tmp_index_file_path = index_file_path; - tmp_index_file_path.replace_extension("index.tmp"); - - new_log_file.set_file_path(tmp_log_file_path); - new_index_file.set_file_path(tmp_index_file_path); - - try { - new_log_file.open(fc::cfile::truncate_rw_mode); - new_index_file.open(fc::cfile::truncate_rw_mode); - } catch (...) { - wlog("Unable to open new state history log or index file for writing during log spliting, " - "continue writing to existing block log file\n"); - return; - } - - new_log_file.close(); - new_index_file.close(); index.close(); log.close(); @@ -938,19 +912,9 @@ class state_history_log { _index_begin_block = _begin_block = _end_block; - using std::swap; - swap(new_log_file, log); - swap(new_index_file, index); - - fc::rename(tmp_log_file_path, log_file_path); - fc::rename(tmp_index_file_path, index_file_path); - - log.set_file_path(log_file_path); - index.set_file_path(index_file_path); - - log.open(fc::cfile::update_rw_mode); + log.open(fc::cfile::truncate_rw_mode); log.seek_end(0); - index.open(fc::cfile::create_or_update_rw_mode); + index.open(fc::cfile::truncate_rw_mode); } }; // state_history_log From 1cb6a9dd07a40d42188d2f29beb54861574c4b64 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Oct 2023 10:45:18 -0500 Subject: [PATCH 7/8] GH-1677 Use EOS_ASSERT instead of elog/assert --- .../include/eosio/state_history/log.hpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index 41533cd740..b1ed4f4926 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -578,19 +578,14 @@ class state_history_log { if (block_num >= _begin_block && block_num < _end_block) { state_history_log_header header; get_entry(block_num, header); - if (chain::block_header::num_from_id(header.block_id) != block_num) { // entry does not match requested block num - elog("id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num)); - assert(false); - } + EOS_ASSERT(chain::block_header::num_from_id(header.block_id) == block_num, chain::plugin_exception, + "header id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num)); return header.block_id; } return {}; } - if (chain::block_header::num_from_id(*result) != block_num) { // catalog failed to fetch correct block id - elog("id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num)); - assert(false); - } - + EOS_ASSERT(chain::block_header::num_from_id(*result) == block_num, chain::plugin_exception, + "catalog id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num)); return result; } From aee1c1d59f5533c9a604e1534711736d78803b05 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 19 Oct 2023 13:59:59 -0500 Subject: [PATCH 8/8] GH-1677 Make sure we append to index file --- libraries/state_history/include/eosio/state_history/log.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index b1ed4f4926..d3765080d5 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -541,6 +541,7 @@ class state_history_log { "wrote payload with incorrect size to ${name}.log", ("name", name)); fc::raw::pack(log, pos); + index.seek_end(0); fc::raw::pack(index, pos); if (_begin_block == _end_block) _index_begin_block = _begin_block = block_num;