diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index 567294c2a2..0c574476e9 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -73,9 +73,10 @@ struct state_history_log_header { chain::block_id_type block_id = {}; uint64_t payload_size = 0; }; -static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) + - sizeof(state_history_log_header::block_id) + - sizeof(state_history_log_header::payload_size); +static constexpr int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) + + sizeof(state_history_log_header::block_id) + + sizeof(state_history_log_header::payload_size); +static_assert(sizeof(state_history_log_header) == state_history_log_header_serial_size); namespace state_history { struct prune_config { @@ -323,7 +324,7 @@ class state_history_log { catalog.open(log_dir, conf.retained_dir, conf.archive_dir, name); catalog.max_retained_files = conf.max_retained_files; if (_end_block == 0) { - _begin_block = _end_block = catalog.last_block_num() +1; + _index_begin_block = _begin_block = _end_block = catalog.last_block_num() +1; } } }, _config); @@ -539,6 +540,7 @@ class state_history_log { "wrote payload with incorrect size to ${name}.log", ("name", name)); fc::raw::pack(log, pos); + index.seek_end(0); fc::raw::pack(index, pos); if (_begin_block == _end_block) _index_begin_block = _begin_block = block_num; @@ -576,10 +578,14 @@ class state_history_log { if (block_num >= _begin_block && block_num < _end_block) { state_history_log_header header; get_entry(block_num, header); + EOS_ASSERT(chain::block_header::num_from_id(header.block_id) == block_num, chain::plugin_exception, + "header id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num)); return header.block_id; } return {}; } + EOS_ASSERT(chain::block_header::num_from_id(*result) == block_num, chain::plugin_exception, + "catalog id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num)); return result; } @@ -894,47 +900,16 @@ class state_history_log { } void split_log() { - - std::filesystem::path log_file_path = log.get_file_path(); - std::filesystem::path index_file_path = index.get_file_path(); - - fc::datastream new_log_file; - fc::datastream new_index_file; - - std::filesystem::path tmp_log_file_path = log_file_path; - tmp_log_file_path.replace_extension("log.tmp"); - std::filesystem::path tmp_index_file_path = index_file_path; - tmp_index_file_path.replace_extension("index.tmp"); - - new_log_file.set_file_path(tmp_log_file_path); - new_index_file.set_file_path(tmp_index_file_path); - - try { - new_log_file.open(fc::cfile::truncate_rw_mode); - new_index_file.open(fc::cfile::truncate_rw_mode); - - } catch (...) { - wlog("Unable to open new state history log or index file for writing during log spliting, " - "continue writing to existing block log file\n"); - return; - } - index.close(); log.close(); catalog.add(_begin_block, _end_block - 1, log.get_file_path().parent_path(), name); - _begin_block = _end_block; + _index_begin_block = _begin_block = _end_block; - using std::swap; - swap(new_log_file, log); - swap(new_index_file, index); - - std::filesystem::rename(tmp_log_file_path, log_file_path); - std::filesystem::rename(tmp_index_file_path, index_file_path); - - log.set_file_path(log_file_path); - index.set_file_path(index_file_path); + log.open(fc::cfile::truncate_rw_mode); + log.seek_end(0); + index.open(fc::cfile::truncate_rw_mode); } }; // state_history_log diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp index 3d3c920af1..26af991d58 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/session.hpp @@ -57,11 +57,15 @@ class session_manager { private: using entry_ptr = std::unique_ptr; + boost::asio::io_context& ship_io_context; std::set> session_set; bool sending = false; std::deque, entry_ptr>> send_queue; public: + explicit session_manager(boost::asio::io_context& ship_io_context) + : ship_io_context(ship_io_context) {} + void insert(std::shared_ptr s) { session_set.insert(std::move(s)); } @@ -103,8 +107,12 @@ class session_manager { void pop_entry(bool call_send = true) { send_queue.erase(send_queue.begin()); sending = false; - if (call_send || !send_queue.empty()) - send(); + if (call_send || !send_queue.empty()) { + // avoid blowing the stack + boost::asio::post(ship_io_context, [this]() { + send(); + }); + } } void send_updates() { diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index f117996f14..a779a50be0 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -62,7 +62,6 @@ struct state_history_plugin_impl : std::enable_shared_from_this thread_pool; + session_manager session_mgr{thread_pool.get_executor()}; + bool plugin_started = false; public: diff --git a/plugins/state_history_plugin/tests/session_test.cpp b/plugins/state_history_plugin/tests/session_test.cpp index c7b7be1ecc..63b260040c 100644 --- a/plugins/state_history_plugin/tests/session_test.cpp +++ b/plugins/state_history_plugin/tests/session_test.cpp @@ -101,20 +101,22 @@ struct mock_state_history_plugin { eosio::state_history::block_position block_head; fc::temp_directory log_dir; - std::optional log; + std::optional trace_log; + std::optional state_log; std::atomic stopping = false; - eosio::session_manager session_mgr; + eosio::session_manager session_mgr{ship_ioc}; constexpr static uint32_t default_frame_size = 1024; - std::optional& get_trace_log() { return log; } - std::optional& get_chain_state_log() { return log; } + std::optional& get_trace_log() { return trace_log; } + std::optional& get_chain_state_log() { return state_log; } fc::sha256 get_chain_id() const { return {}; } boost::asio::io_context& get_ship_executor() { return ship_ioc; } void setup_state_history_log(eosio::state_history_log_config conf = {}) { - log.emplace("ship", log_dir.path(), conf); + trace_log.emplace("ship_trace", log_dir.path(), conf); + state_log.emplace("ship_state", log_dir.path(), conf); } fc::logger logger = fc::logger::get(DEFAULT_LOGGER); @@ -130,7 +132,20 @@ struct mock_state_history_plugin { return fc::time_point{}; } - std::optional get_block_id(uint32_t block_num) { return block_id_for(block_num); } + std::optional get_block_id(uint32_t block_num) { + std::optional id; + if( trace_log ) { + id = trace_log->get_block_id( block_num ); + if( id ) + return id; + } + if( state_log ) { + id = state_log->get_block_id( block_num ); + if( id ) + return id; + } + return block_id_for(block_num); + } eosio::state_history::block_position get_block_head() { return block_head; } eosio::state_history::block_position get_last_irreversible() { return block_head; } @@ -284,13 +299,24 @@ struct state_history_test_fixture { header.payload_size += sizeof(uint64_t); } - server.log->write_entry(header, block_id_for(index - 1), [&](auto& f) { + std::unique_lock gt(server.trace_log->_mx); + server.trace_log->write_entry(header, block_id_for(index - 1), [&](auto& f) { f.write((const char*)&type, sizeof(type)); if (type == 1) { f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count)); } f.write(compressed.data(), compressed.size()); }); + gt.unlock(); + std::unique_lock gs(server.state_log->_mx); + server.state_log->write_entry(header, block_id_for(index - 1), [&](auto& f) { + f.write((const char*)&type, sizeof(type)); + if (type == 1) { + f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count)); + } + f.write(compressed.data(), compressed.size()); + }); + gs.unlock(); if (written_data.size() < index) written_data.resize(index); @@ -428,6 +454,62 @@ BOOST_FIXTURE_TEST_CASE(test_session_no_prune, state_history_test_fixture) { FC_LOG_AND_RETHROW() } +BOOST_FIXTURE_TEST_CASE(test_split_log, state_history_test_fixture) { + try { + // setup block head for the server + constexpr uint32_t head = 1023; + eosio::state_history::partition_config conf; + conf.stride = 25; + server.setup_state_history_log(conf); + uint32_t head_block_num = head; + server.block_head = {head_block_num, block_id_for(head_block_num)}; + + // generate the log data used for traces and deltas + uint32_t n = mock_state_history_plugin::default_frame_size; + add_to_log(1, n * sizeof(uint32_t), generate_data(n)); // original data format + add_to_log(2, 0, generate_data(n)); // format to accommodate the compressed size greater than 4GB + add_to_log(3, 1, generate_data(n)); // format to encode decompressed size to avoid decompress entire data upfront. + for (size_t i = 4; i <= head; ++i) { + add_to_log(i, 1, generate_data(n)); + } + + send_request(eosio::state_history::get_blocks_request_v0{.start_block_num = 1, + .end_block_num = UINT32_MAX, + .max_messages_in_flight = UINT32_MAX, + .have_positions = {}, + .irreversible_only = false, + .fetch_block = true, + .fetch_traces = true, + .fetch_deltas = true}); + + eosio::state_history::state_result result; + // we should get 1023 consecutive block result + eosio::chain::block_id_type prev_id; + for (int i = 0; i < head; ++i) { + receive_result(result); + BOOST_REQUIRE(std::holds_alternative(result)); + auto r = std::get(result); + BOOST_REQUIRE_EQUAL(r.head.block_num, server.block_head.block_num); + if (i > 0) { + BOOST_TEST(prev_id.str() == r.prev_block->block_id.str()); + } + prev_id = r.this_block->block_id; + BOOST_REQUIRE(r.traces.has_value()); + BOOST_REQUIRE(r.deltas.has_value()); + auto traces = r.traces.value(); + auto deltas = r.deltas.value(); + auto& data = written_data[i]; + auto data_size = data.size() * sizeof(int32_t); + BOOST_REQUIRE_EQUAL(traces.size(), data_size); + BOOST_REQUIRE_EQUAL(deltas.size(), data_size); + + BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); + BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); + } + } + FC_LOG_AND_RETHROW() +} + BOOST_FIXTURE_TEST_CASE(test_session_with_prune, state_history_test_fixture) { try { // setup block head for the server diff --git a/tests/ship_streamer_test.py b/tests/ship_streamer_test.py index e5710c5e36..710ab90db4 100755 --- a/tests/ship_streamer_test.py +++ b/tests/ship_streamer_test.py @@ -70,7 +70,7 @@ def getLatestSnapshot(nodeId): shipNodeNum = 1 specificExtraNodeosArgs={} - specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin " # producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin " @@ -206,7 +206,7 @@ def getLatestSnapshot(nodeId): prodNode0.waitForProducer(forkAtProducer) prodNode1.waitForProducer(prodNode1Prod) if nonProdNode.verifyAlive(): - Utils.errorExit("Bridge did not shutdown"); + Utils.errorExit("Bridge did not shutdown") Print("Fork started") forkProgress="defproducer" + chr(ord(forkAtProducer[-1])+3) @@ -215,7 +215,7 @@ def getLatestSnapshot(nodeId): Print("Restore fork") Print("Relaunching the non-producing bridge node to connect the producing nodes again") if nonProdNode.verifyAlive(): - Utils.errorExit("Bridge is already running"); + Utils.errorExit("Bridge is already running") if not nonProdNode.relaunch(): Utils.errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted")