Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/5.0' into GH-1677-ship-main
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Oct 19, 2023
2 parents c582096 + 944226b commit 8281aa3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 52 deletions.
53 changes: 14 additions & 39 deletions libraries/state_history/include/eosio/state_history/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ struct state_history_log_header {
chain::block_id_type block_id = {};
uint64_t payload_size = 0;
};
static const int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
sizeof(state_history_log_header::block_id) +
sizeof(state_history_log_header::payload_size);
static constexpr int state_history_log_header_serial_size = sizeof(state_history_log_header::magic) +
sizeof(state_history_log_header::block_id) +
sizeof(state_history_log_header::payload_size);
static_assert(sizeof(state_history_log_header) == state_history_log_header_serial_size);

namespace state_history {
struct prune_config {
Expand Down Expand Up @@ -323,7 +324,7 @@ class state_history_log {
catalog.open(log_dir, conf.retained_dir, conf.archive_dir, name);
catalog.max_retained_files = conf.max_retained_files;
if (_end_block == 0) {
_begin_block = _end_block = catalog.last_block_num() +1;
_index_begin_block = _begin_block = _end_block = catalog.last_block_num() +1;
}
}
}, _config);
Expand Down Expand Up @@ -539,6 +540,7 @@ class state_history_log {
"wrote payload with incorrect size to ${name}.log", ("name", name));
fc::raw::pack(log, pos);

index.seek_end(0);
fc::raw::pack(index, pos);
if (_begin_block == _end_block)
_index_begin_block = _begin_block = block_num;
Expand Down Expand Up @@ -576,10 +578,14 @@ class state_history_log {
if (block_num >= _begin_block && block_num < _end_block) {
state_history_log_header header;
get_entry(block_num, header);
EOS_ASSERT(chain::block_header::num_from_id(header.block_id) == block_num, chain::plugin_exception,
"header id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(header.block_id))("b", block_num));
return header.block_id;
}
return {};
}
EOS_ASSERT(chain::block_header::num_from_id(*result) == block_num, chain::plugin_exception,
"catalog id does not match requested ${a} != ${b}", ("a", chain::block_header::num_from_id(*result))("b", block_num));
return result;
}

Expand Down Expand Up @@ -894,47 +900,16 @@ class state_history_log {
}

void split_log() {

std::filesystem::path log_file_path = log.get_file_path();
std::filesystem::path index_file_path = index.get_file_path();

fc::datastream<fc::cfile> new_log_file;
fc::datastream<fc::cfile> new_index_file;

std::filesystem::path tmp_log_file_path = log_file_path;
tmp_log_file_path.replace_extension("log.tmp");
std::filesystem::path tmp_index_file_path = index_file_path;
tmp_index_file_path.replace_extension("index.tmp");

new_log_file.set_file_path(tmp_log_file_path);
new_index_file.set_file_path(tmp_index_file_path);

try {
new_log_file.open(fc::cfile::truncate_rw_mode);
new_index_file.open(fc::cfile::truncate_rw_mode);

} catch (...) {
wlog("Unable to open new state history log or index file for writing during log spliting, "
"continue writing to existing block log file\n");
return;
}

index.close();
log.close();

catalog.add(_begin_block, _end_block - 1, log.get_file_path().parent_path(), name);

_begin_block = _end_block;
_index_begin_block = _begin_block = _end_block;

using std::swap;
swap(new_log_file, log);
swap(new_index_file, index);

std::filesystem::rename(tmp_log_file_path, log_file_path);
std::filesystem::rename(tmp_index_file_path, index_file_path);

log.set_file_path(log_file_path);
index.set_file_path(index_file_path);
log.open(fc::cfile::truncate_rw_mode);
log.seek_end(0);
index.open(fc::cfile::truncate_rw_mode);
}
}; // state_history_log

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ class session_manager {
private:
using entry_ptr = std::unique_ptr<send_queue_entry_base>;

boost::asio::io_context& ship_io_context;
std::set<std::shared_ptr<session_base>> session_set;
bool sending = false;
std::deque<std::pair<std::shared_ptr<session_base>, entry_ptr>> send_queue;

public:
explicit session_manager(boost::asio::io_context& ship_io_context)
: ship_io_context(ship_io_context) {}

void insert(std::shared_ptr<session_base> s) {
session_set.insert(std::move(s));
}
Expand Down Expand Up @@ -103,8 +107,12 @@ class session_manager {
void pop_entry(bool call_send = true) {
send_queue.erase(send_queue.begin());
sending = false;
if (call_send || !send_queue.empty())
send();
if (call_send || !send_queue.empty()) {
// avoid blowing the stack
boost::asio::post(ship_io_context, [this]() {
send();
});
}
}

void send_updates() {
Expand Down
3 changes: 2 additions & 1 deletion plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
string endpoint_address;
string unix_path;
state_history::trace_converter trace_converter;
session_manager session_mgr;

mutable std::mutex mtx;
block_id_type head_id;
Expand All @@ -71,6 +70,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

named_thread_pool<struct ship> thread_pool;

session_manager session_mgr{thread_pool.get_executor()};

bool plugin_started = false;

public:
Expand Down
96 changes: 89 additions & 7 deletions plugins/state_history_plugin/tests/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,22 @@ struct mock_state_history_plugin {

eosio::state_history::block_position block_head;
fc::temp_directory log_dir;
std::optional<eosio::state_history_log> log;
std::optional<eosio::state_history_log> trace_log;
std::optional<eosio::state_history_log> state_log;
std::atomic<bool> stopping = false;
eosio::session_manager session_mgr;
eosio::session_manager session_mgr{ship_ioc};

constexpr static uint32_t default_frame_size = 1024;

std::optional<eosio::state_history_log>& get_trace_log() { return log; }
std::optional<eosio::state_history_log>& get_chain_state_log() { return log; }
std::optional<eosio::state_history_log>& get_trace_log() { return trace_log; }
std::optional<eosio::state_history_log>& get_chain_state_log() { return state_log; }
fc::sha256 get_chain_id() const { return {}; }

boost::asio::io_context& get_ship_executor() { return ship_ioc; }

void setup_state_history_log(eosio::state_history_log_config conf = {}) {
log.emplace("ship", log_dir.path(), conf);
trace_log.emplace("ship_trace", log_dir.path(), conf);
state_log.emplace("ship_state", log_dir.path(), conf);
}

fc::logger logger = fc::logger::get(DEFAULT_LOGGER);
Expand All @@ -130,7 +132,20 @@ struct mock_state_history_plugin {
return fc::time_point{};
}

std::optional<eosio::chain::block_id_type> get_block_id(uint32_t block_num) { return block_id_for(block_num); }
std::optional<eosio::chain::block_id_type> get_block_id(uint32_t block_num) {
std::optional<eosio::chain::block_id_type> id;
if( trace_log ) {
id = trace_log->get_block_id( block_num );
if( id )
return id;
}
if( state_log ) {
id = state_log->get_block_id( block_num );
if( id )
return id;
}
return block_id_for(block_num);
}

eosio::state_history::block_position get_block_head() { return block_head; }
eosio::state_history::block_position get_last_irreversible() { return block_head; }
Expand Down Expand Up @@ -284,13 +299,24 @@ struct state_history_test_fixture {
header.payload_size += sizeof(uint64_t);
}

server.log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
std::unique_lock gt(server.trace_log->_mx);
server.trace_log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
f.write((const char*)&type, sizeof(type));
if (type == 1) {
f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count));
}
f.write(compressed.data(), compressed.size());
});
gt.unlock();
std::unique_lock gs(server.state_log->_mx);
server.state_log->write_entry(header, block_id_for(index - 1), [&](auto& f) {
f.write((const char*)&type, sizeof(type));
if (type == 1) {
f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count));
}
f.write(compressed.data(), compressed.size());
});
gs.unlock();

if (written_data.size() < index)
written_data.resize(index);
Expand Down Expand Up @@ -428,6 +454,62 @@ BOOST_FIXTURE_TEST_CASE(test_session_no_prune, state_history_test_fixture) {
FC_LOG_AND_RETHROW()
}

BOOST_FIXTURE_TEST_CASE(test_split_log, state_history_test_fixture) {
try {
// setup block head for the server
constexpr uint32_t head = 1023;
eosio::state_history::partition_config conf;
conf.stride = 25;
server.setup_state_history_log(conf);
uint32_t head_block_num = head;
server.block_head = {head_block_num, block_id_for(head_block_num)};

// generate the log data used for traces and deltas
uint32_t n = mock_state_history_plugin::default_frame_size;
add_to_log(1, n * sizeof(uint32_t), generate_data(n)); // original data format
add_to_log(2, 0, generate_data(n)); // format to accommodate the compressed size greater than 4GB
add_to_log(3, 1, generate_data(n)); // format to encode decompressed size to avoid decompress entire data upfront.
for (size_t i = 4; i <= head; ++i) {
add_to_log(i, 1, generate_data(n));
}

send_request(eosio::state_history::get_blocks_request_v0{.start_block_num = 1,
.end_block_num = UINT32_MAX,
.max_messages_in_flight = UINT32_MAX,
.have_positions = {},
.irreversible_only = false,
.fetch_block = true,
.fetch_traces = true,
.fetch_deltas = true});

eosio::state_history::state_result result;
// we should get 1023 consecutive block result
eosio::chain::block_id_type prev_id;
for (int i = 0; i < head; ++i) {
receive_result(result);
BOOST_REQUIRE(std::holds_alternative<eosio::state_history::get_blocks_result_v0>(result));
auto r = std::get<eosio::state_history::get_blocks_result_v0>(result);
BOOST_REQUIRE_EQUAL(r.head.block_num, server.block_head.block_num);
if (i > 0) {
BOOST_TEST(prev_id.str() == r.prev_block->block_id.str());
}
prev_id = r.this_block->block_id;
BOOST_REQUIRE(r.traces.has_value());
BOOST_REQUIRE(r.deltas.has_value());
auto traces = r.traces.value();
auto deltas = r.deltas.value();
auto& data = written_data[i];
auto data_size = data.size() * sizeof(int32_t);
BOOST_REQUIRE_EQUAL(traces.size(), data_size);
BOOST_REQUIRE_EQUAL(deltas.size(), data_size);

BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data()));
BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data()));
}
}
FC_LOG_AND_RETHROW()
}

BOOST_FIXTURE_TEST_CASE(test_session_with_prune, state_history_test_fixture) {
try {
// setup block head for the server
Expand Down
6 changes: 3 additions & 3 deletions tests/ship_streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def getLatestSnapshot(nodeId):

shipNodeNum = 1
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
# producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node
specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin "

Expand Down Expand Up @@ -206,7 +206,7 @@ def getLatestSnapshot(nodeId):
prodNode0.waitForProducer(forkAtProducer)
prodNode1.waitForProducer(prodNode1Prod)
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge did not shutdown");
Utils.errorExit("Bridge did not shutdown")
Print("Fork started")

forkProgress="defproducer" + chr(ord(forkAtProducer[-1])+3)
Expand All @@ -215,7 +215,7 @@ def getLatestSnapshot(nodeId):
Print("Restore fork")
Print("Relaunching the non-producing bridge node to connect the producing nodes again")
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge is already running");
Utils.errorExit("Bridge is already running")
if not nonProdNode.relaunch():
Utils.errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted")

Expand Down

0 comments on commit 8281aa3

Please sign in to comment.