Skip to content

Commit

Permalink
Various changes in node (#1230)
Browse files Browse the repository at this point in the history
* Remove unneeded "wait_neighbours_not_empty"

* Fix "round_started_at" for the first round

* Fix use after move in overlay.cpp

* Move "last gc masterchain state" stat to CellDbIn::prepare_stats

* Fix disabling state serializer
  • Loading branch information
SpyCheese authored Sep 30, 2024
1 parent b781993 commit 6755b83
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 91 deletions.
11 changes: 3 additions & 8 deletions overlay/overlay-peers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,14 +633,9 @@ std::vector<adnl::AdnlNodeIdShort> OverlayImpl::get_neighbours(td::uint32 max_si
}

void OverlayImpl::send_message_to_neighbours(td::BufferSlice data) {
wait_neighbours_not_empty([this, data = std::move(data)](td::Result<td::Unit> R) {
if (R.is_error()) {
return;
}
for (auto &n : peer_list_.neighbours_) {
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
}
});
for (auto &n : peer_list_.neighbours_) {
td::actor::send_closure(manager_, &OverlayManager::send_message, n, local_id_, overlay_id_, data.clone());
}
}

size_t OverlayImpl::neighbours_cnt() const {
Expand Down
45 changes: 11 additions & 34 deletions overlay/overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ OverlayImpl::OverlayImpl(td::actor::ActorId<keyring::Keyring> keyring, td::actor

VLOG(OVERLAY_INFO) << this << ": creating";

update_root_member_list(std::move(nodes), std::move(root_public_keys), std::move(cert));

update_neighbours(static_cast<td::uint32>(nodes.size()));
auto nodes_size = static_cast<td::uint32>(nodes.size());
OverlayImpl::update_root_member_list(std::move(nodes), std::move(root_public_keys), std::move(cert));
update_neighbours(nodes_size);
}

void OverlayImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::overlay_getRandomPeers &query,
Expand Down Expand Up @@ -457,20 +457,6 @@ void OverlayImpl::bcast_gc() {
CHECK(delivered_broadcasts_.size() == bcast_lru_.size());
}

void OverlayImpl::wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries) {
if (!peer_list_.neighbours_.empty()) {
promise.set_result(td::Unit());
} else if (max_retries > 0) {
delay_action(
[SelfId = actor_id(this), promise = std::move(promise), max_retries]() mutable {
td::actor::send_closure(SelfId, &OverlayImpl::wait_neighbours_not_empty, std::move(promise), max_retries - 1);
},
td::Timestamp::in(0.5));
} else {
promise.set_error(td::Status::Error(ErrorCode::timeout));
}
}

void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
if (!has_valid_membership_certificate()) {
VLOG(OVERLAY_WARNING) << "member certificate is invalid, valid_until="
Expand All @@ -481,33 +467,23 @@ void OverlayImpl::send_broadcast(PublicKeyHash send_as, td::uint32 flags, td::Bu
VLOG(OVERLAY_WARNING) << "broadcast source certificate is invalid";
return;
}
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
if (S.is_error()) {
LOG(WARNING) << "failed to send broadcast: " << S;
}
});
auto S = BroadcastSimple::create_new(actor_id(this), keyring_, send_as, std::move(data), flags);
if (S.is_error()) {
LOG(WARNING) << "failed to send broadcast: " << S;
}
}

void OverlayImpl::send_broadcast_fec(PublicKeyHash send_as, td::uint32 flags, td::BufferSlice data) {
if (!has_valid_membership_certificate()) {
VLOG(OVERLAY_WARNING) << "meber certificate ist invalid, valid_until="
VLOG(OVERLAY_WARNING) << "meber certificate is invalid, valid_until="
<< peer_list_.local_cert_is_valid_until_.at_unix();
return;
}
if (!has_valid_broadcast_certificate(send_as, data.size(), true)) {
VLOG(OVERLAY_WARNING) << "broadcast source certificate is invalid";
return;
}
wait_neighbours_not_empty([this, send_as, flags, data = std::move(data)](td::Result<td::Unit> R) mutable {
if (R.is_error()) {
return;
}
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
});
OverlayOutboundFecBroadcast::create(std::move(data), flags, actor_id(this), send_as);
}

void OverlayImpl::print(td::StringBuilder &sb) {
Expand Down Expand Up @@ -752,7 +728,8 @@ bool OverlayImpl::has_valid_broadcast_certificate(const PublicKeyHash &source, s
return false;
}
auto it = certs_.find(source);
return check_source_eligible(source, it == certs_.end() ? nullptr : it->second.get(), (td::uint32)size, is_fec);
return check_source_eligible(source, it == certs_.end() ? nullptr : it->second.get(), (td::uint32)size, is_fec) !=
BroadcastCheckResult::Forbidden;
}

} // namespace overlay
Expand Down
2 changes: 0 additions & 2 deletions overlay/overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ class OverlayImpl : public Overlay {
del_peer(peer_id);
}

void wait_neighbours_not_empty(td::Promise<td::Unit> promise, int max_retries = 20);

private:
template <class T>
void process_query(adnl::AdnlNodeIdShort src, T &query, td::Promise<td::BufferSlice> promise) {
Expand Down
5 changes: 3 additions & 2 deletions validator-session/validator-session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,9 @@ ValidatorSessionImpl::ValidatorSessionImpl(catchain::CatChainSessionId session_i
}

void ValidatorSessionImpl::start() {
round_started_at_ = td::Timestamp::now();
round_debug_at_ = td::Timestamp::in(60.0);
stats_init();
started_ = true;
VLOG(VALIDATOR_SESSION_NOTICE) << this << ": started";

Expand Down Expand Up @@ -1097,8 +1100,6 @@ void ValidatorSessionImpl::start_up() {

check_all();
td::actor::send_closure(rldp_, &rldp::Rldp::add_id, description().get_source_adnl_id(local_idx()));

stats_init();
}

void ValidatorSessionImpl::stats_init() {
Expand Down
9 changes: 1 addition & 8 deletions validator/db/celldb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,6 @@ void CellDbIn::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>>
promise.set_result(boc_->get_cell_db_reader());
}

void CellDbIn::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) {
promise.set_result(last_deleted_mc_state_);
}

std::vector<std::pair<std::string, std::string>> CellDbIn::prepare_stats() {
TD_PERF_COUNTER(celldb_prepare_stats);
auto r_boc_stats = boc_->get_stats();
Expand Down Expand Up @@ -257,6 +253,7 @@ std::vector<std::pair<std::string, std::string>> CellDbIn::prepare_stats() {
double(celldb_size));
add_stat("max_possible_ram_to_celldb_ratio", double(total_mem_stat.total_ram) / double(celldb_size));
}
stats.emplace_back("last_deleted_mc_state", td::to_string(last_deleted_mc_state_));

return stats;
// do not clear statistics, it is needed for flush_db_stats
Expand Down Expand Up @@ -580,10 +577,6 @@ void CellDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p
td::actor::send_closure(cell_db_, &CellDbIn::get_cell_db_reader, std::move(promise));
}

void CellDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) {
td::actor::send_closure(cell_db_, &CellDbIn::get_last_deleted_mc_state, std::move(promise));
}

void CellDb::start_up() {
CellDbBase::start_up();
boc_ = vm::DynamicBagOfCellsDb::create();
Expand Down
2 changes: 0 additions & 2 deletions validator/db/celldb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class CellDbIn : public CellDbBase {
void load_cell(RootHash hash, td::Promise<td::Ref<vm::DataCell>> promise);
void store_cell(BlockIdExt block_id, td::Ref<vm::Cell> cell, td::Promise<td::Ref<vm::DataCell>> promise);
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);
void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise);

void migrate_cell(td::Bits256 hash);

Expand Down Expand Up @@ -190,7 +189,6 @@ class CellDb : public CellDbBase {
in_memory_boc_ = std::move(in_memory_boc);
}
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise);
void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise);

CellDb(td::actor::ActorId<RootDb> root_db, std::string path, td::Ref<ValidatorManagerOptions> opts)
: root_db_(root_db), path_(path), opts_(opts) {
Expand Down
4 changes: 0 additions & 4 deletions validator/db/rootdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ void RootDb::get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> p
td::actor::send_closure(cell_db_, &CellDb::get_cell_db_reader, std::move(promise));
}

void RootDb::get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) {
td::actor::send_closure(cell_db_, &CellDb::get_last_deleted_mc_state, std::move(promise));
}

void RootDb::store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) {
td::actor::send_closure(archive_db_, &ArchiveManager::add_persistent_state, block_id, masterchain_block_id,
Expand Down
1 change: 0 additions & 1 deletion validator/db/rootdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class RootDb : public Db {
td::Promise<td::Ref<ShardState>> promise) override;
void get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) override;
void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) override;
void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) override;

void store_block_handle(BlockHandle handle, td::Promise<td::Unit> promise) override;
void get_block_handle(BlockIdExt id, td::Promise<BlockHandle> promise) override;
Expand Down
1 change: 0 additions & 1 deletion validator/interfaces/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class Db : public td::actor::Actor {
td::Promise<td::Ref<ShardState>> promise) = 0;
virtual void get_block_state(ConstBlockHandle handle, td::Promise<td::Ref<ShardState>> promise) = 0;
virtual void get_cell_db_reader(td::Promise<std::shared_ptr<vm::CellDbReader>> promise) = 0;
virtual void get_last_deleted_mc_state(td::Promise<BlockSeqno> promise) = 0;

virtual void store_persistent_state_file(BlockIdExt block_id, BlockIdExt masterchain_block_id, td::BufferSlice state,
td::Promise<td::Unit> promise) = 0;
Expand Down
31 changes: 2 additions & 29 deletions validator/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2178,20 +2178,10 @@ void ValidatorManagerImpl::update_shards() {
td::actor::send_closure(SelfId, &ValidatorManagerImpl::written_destroyed_validator_sessions, std::move(gc));
});
td::actor::send_closure(db_, &Db::update_destroyed_validator_sessions, gc_list_, std::move(P));

if (!serializer_.empty()) {
td::actor::send_closure(
serializer_, &AsyncStateSerializer::auto_disable_serializer,
validating_masterchain &&
last_masterchain_state_->get_validator_set(ShardIdFull{masterchainId})->export_vector().size() * 2 <=
last_masterchain_state_->get_total_validator_set(0)->export_vector().size());
}
}

if (!serializer_.empty()) {
td::actor::send_closure(
serializer_, &AsyncStateSerializer::auto_disable_serializer,
!validator_groups_.empty() && last_masterchain_state_->get_global_id() == -239); // mainnet only
td::actor::send_closure(serializer_, &AsyncStateSerializer::auto_disable_serializer,
is_validator() && last_masterchain_state_->get_global_id() == -239); // mainnet only
}
}

Expand Down Expand Up @@ -2790,23 +2780,6 @@ void ValidatorManagerImpl::prepare_stats(td::Promise<std::vector<std::pair<std::
vec.emplace_back("rotatemasterchainblock", last_rotate_block_id_.to_str());
//vec.emplace_back("shardclientmasterchainseqno", td::to_string(min_confirmed_masterchain_seqno_));
vec.emplace_back("stateserializermasterchainseqno", td::to_string(state_serializer_masterchain_seqno_));

td::actor::send_closure(db_, &Db::get_last_deleted_mc_state,
[promise = merger.make_promise(""),
gc_seqno = gc_masterchain_handle_->id().seqno()](td::Result<BlockSeqno> R) mutable {
TRY_RESULT_PROMISE(promise, seqno, std::move(R));
std::string s;
if (seqno == 0) {
s = "none";
} else if (seqno <= gc_seqno) {
s = PSTRING() << seqno << " (gc_seqno-" << (gc_seqno - seqno) << ")";
} else {
s = PSTRING() << seqno << " (gc_seqno+" << (seqno - gc_seqno) << ")";
}
std::vector<std::pair<std::string, std::string>> vec;
vec.emplace_back("lastgcdmasterchainstate", std::move(s));
promise.set_value(std::move(vec));
});
}
td::NamedThreadSafeCounter::get_default().for_each([&](auto key, auto value) {
vec.emplace_back("counter." + key, PSTRING() << value);
Expand Down

0 comments on commit 6755b83

Please sign in to comment.