From 57f95cc28224a9e938d029bd92125fdcd3f71c8d Mon Sep 17 00:00:00 2001 From: EmelyanenkoK Date: Wed, 10 Jul 2024 14:58:13 +0300 Subject: [PATCH] Add collator options (#1052) * Set collator options from validator console * Fix compilation error in manager-disk * Defer all messages if out msg queue is too big * Fix checking queue size in collator --------- Co-authored-by: SpyCheese --- crypto/block/block.tlb | 2 +- crypto/block/mc-config.cpp | 1 + crypto/block/mc-config.h | 1 + tl/generate/scheme/ton_api.tl | 7 ++ tl/generate/scheme/ton_api.tlo | Bin 90304 -> 90924 bytes .../validator-engine-console-query.cpp | 39 ++++++++ .../validator-engine-console-query.h | 41 ++++++++ .../validator-engine-console.cpp | 2 + validator-engine/validator-engine.cpp | 91 ++++++++++++++++++ validator-engine/validator-engine.hpp | 6 ++ validator/fabric.h | 4 +- validator/impl/collator-impl.h | 7 +- validator/impl/collator.cpp | 39 +++++--- validator/impl/fabric.cpp | 10 +- validator/impl/validate-query.cpp | 26 ++++- validator/impl/validate-query.hpp | 1 + validator/manager-disk.cpp | 4 +- validator/manager.cpp | 8 +- validator/validator-group.cpp | 4 +- validator/validator-group.hpp | 4 + validator/validator-options.hpp | 7 ++ validator/validator.h | 17 ++++ 22 files changed, 294 insertions(+), 27 deletions(-) diff --git a/crypto/block/block.tlb b/crypto/block/block.tlb index b96b49140..a3684f563 100644 --- a/crypto/block/block.tlb +++ b/crypto/block/block.tlb @@ -801,7 +801,7 @@ size_limits_config#01 max_msg_bits:uint32 max_msg_cells:uint32 max_library_cells max_ext_msg_size:uint32 max_ext_msg_depth:uint16 = SizeLimitsConfig; size_limits_config_v2#02 max_msg_bits:uint32 max_msg_cells:uint32 max_library_cells:uint32 max_vm_data_depth:uint16 max_ext_msg_size:uint32 max_ext_msg_depth:uint16 max_acc_state_cells:uint32 max_acc_state_bits:uint32 - max_acc_public_libraries:uint32 = SizeLimitsConfig; + max_acc_public_libraries:uint32 defer_out_queue_size_limit:uint32 = SizeLimitsConfig; _ SizeLimitsConfig = ConfigParam 43; // key is [ wc:int32 addr:uint256 ] diff --git a/crypto/block/mc-config.cpp b/crypto/block/mc-config.cpp index 1dbfeaedc..6da0f034d 100644 --- a/crypto/block/mc-config.cpp +++ b/crypto/block/mc-config.cpp @@ -1956,6 +1956,7 @@ td::Result Config::do_get_size_limits_config(td::RefA)f6dx8aHf(8H@Y^rYn delta 55 zcmZ2;jP<}lR^CUm^{p77fNdjhu<+&u!Y&GootvK;SI2Ianf#)Mv2Hu70%N$;_A^$D G2^s+I01^8D diff --git a/validator-engine-console/validator-engine-console-query.cpp b/validator-engine-console/validator-engine-console-query.cpp index 5385d2e6c..72fbce2e7 100644 --- a/validator-engine-console/validator-engine-console-query.cpp +++ b/validator-engine-console/validator-engine-console-query.cpp @@ -1203,3 +1203,42 @@ td::Status SetStateSerializerEnabledQuery::receive(td::BufferSlice data) { td::TerminalIO::out() << "success\n"; return td::Status::OK(); } + +td::Status SetCollatorOptionsJsonQuery::run() { + TRY_RESULT_ASSIGN(file_name_, tokenizer_.get_token()); + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status SetCollatorOptionsJsonQuery::send() { + TRY_RESULT(data, td::read_file(file_name_)); + auto b = + ton::create_serialize_tl_object(data.as_slice().str()); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status SetCollatorOptionsJsonQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} + +td::Status ResetCollatorOptionsQuery::run() { + TRY_STATUS(tokenizer_.check_endl()); + return td::Status::OK(); +} + +td::Status ResetCollatorOptionsQuery::send() { + auto b = ton::create_serialize_tl_object("{}"); + td::actor::send_closure(console_, &ValidatorEngineConsole::envelope_send_query, std::move(b), create_promise()); + return td::Status::OK(); +} + +td::Status ResetCollatorOptionsQuery::receive(td::BufferSlice data) { + TRY_RESULT_PREFIX(f, ton::fetch_tl_object(data.as_slice(), true), + "received incorrect answer: "); + td::TerminalIO::out() << "success\n"; + return td::Status::OK(); +} diff --git a/validator-engine-console/validator-engine-console-query.h b/validator-engine-console/validator-engine-console-query.h index 3047350fe..cb87f2ae6 100644 --- a/validator-engine-console/validator-engine-console-query.h +++ b/validator-engine-console/validator-engine-console-query.h @@ -1229,3 +1229,44 @@ class SetStateSerializerEnabledQuery : public Query { private: bool enabled_; }; + +class SetCollatorOptionsJsonQuery : public Query { + public: + SetCollatorOptionsJsonQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "setcollatoroptionsjson"; + } + static std::string get_help() { + return "setcollatoroptionsjson \tset collator options from file "; + } + std::string name() const override { + return get_name(); + } + + private: + std::string file_name_; +}; + +class ResetCollatorOptionsQuery : public Query { + public: + ResetCollatorOptionsQuery(td::actor::ActorId console, Tokenizer tokenizer) + : Query(console, std::move(tokenizer)) { + } + td::Status run() override; + td::Status send() override; + td::Status receive(td::BufferSlice data) override; + static std::string get_name() { + return "resetcollatoroptions"; + } + static std::string get_help() { + return "resetcollatoroptions\tset collator options to default values"; + } + std::string name() const override { + return get_name(); + } +}; diff --git a/validator-engine-console/validator-engine-console.cpp b/validator-engine-console/validator-engine-console.cpp index 4878a292f..dc92c4761 100644 --- a/validator-engine-console/validator-engine-console.cpp +++ b/validator-engine-console/validator-engine-console.cpp @@ -147,6 +147,8 @@ void ValidatorEngineConsole::run() { add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); + add_query_runner(std::make_unique>()); } bool ValidatorEngineConsole::envelope_send_query(td::BufferSlice query, td::Promise promise) { diff --git a/validator-engine/validator-engine.cpp b/validator-engine/validator-engine.cpp index d81dcb8f0..8a46ef530 100644 --- a/validator-engine/validator-engine.cpp +++ b/validator-engine/validator-engine.cpp @@ -1823,6 +1823,7 @@ void ValidatorEngine::started_overlays() { void ValidatorEngine::start_validator() { validator_options_.write().set_allow_blockchain_init(config_.validators.size() > 0); validator_options_.write().set_state_serializer_enabled(config_.state_serializer_enabled); + load_collator_options(); validator_manager_ = ton::validator::ValidatorManagerFactory::create( validator_options_, db_root_, keyring_.get(), adnl_.get(), rldp_.get(), overlay_manager_.get()); @@ -2414,6 +2415,69 @@ void ValidatorEngine::del_custom_overlay_from_config(std::string name, td::Promi promise.set_error(td::Status::Error(PSTRING() << "no overlay \"" << name << "\" in config")); } +static td::Result> parse_collator_options(td::MutableSlice json_str) { + td::Ref ref{true}; + ton::validator::CollatorOptions& opts = ref.write(); + + // Set default values (from_json leaves missing fields as is) + ton::ton_api::engine_validator_collatorOptions f; + f.deferring_enabled_ = opts.deferring_enabled; + f.defer_out_queue_size_limit_ = opts.defer_out_queue_size_limit; + f.defer_messages_after_ = opts.defer_messages_after; + f.dispatch_phase_2_max_total_ = opts.dispatch_phase_2_max_total; + f.dispatch_phase_3_max_total_ = opts.dispatch_phase_3_max_total; + f.dispatch_phase_2_max_per_initiator_ = opts.dispatch_phase_2_max_per_initiator; + f.dispatch_phase_3_max_per_initiator_ = + opts.dispatch_phase_3_max_per_initiator ? opts.dispatch_phase_3_max_per_initiator.value() : -1; + + TRY_RESULT_PREFIX(json, td::json_decode(json_str), "failed to parse json: "); + TRY_STATUS_PREFIX(ton::ton_api::from_json(f, json.get_object()), "json does not fit TL scheme: "); + + if (f.defer_messages_after_ <= 0) { + return td::Status::Error("defer_messages_after should be positive"); + } + if (f.defer_out_queue_size_limit_ < 0) { + return td::Status::Error("defer_out_queue_size_limit should be non-negative"); + } + if (f.dispatch_phase_2_max_total_ < 0) { + return td::Status::Error("dispatch_phase_2_max_total should be non-negative"); + } + if (f.dispatch_phase_3_max_total_ < 0) { + return td::Status::Error("dispatch_phase_3_max_total should be non-negative"); + } + if (f.dispatch_phase_2_max_per_initiator_ < 0) { + return td::Status::Error("dispatch_phase_2_max_per_initiator should be non-negative"); + } + + opts.deferring_enabled = f.deferring_enabled_; + opts.defer_messages_after = f.defer_messages_after_; + opts.defer_out_queue_size_limit = f.defer_out_queue_size_limit_; + opts.dispatch_phase_2_max_total = f.dispatch_phase_2_max_total_; + opts.dispatch_phase_3_max_total = f.dispatch_phase_3_max_total_; + opts.dispatch_phase_2_max_per_initiator = f.dispatch_phase_2_max_per_initiator_; + if (f.dispatch_phase_3_max_per_initiator_ >= 0) { + opts.dispatch_phase_3_max_per_initiator = f.dispatch_phase_3_max_per_initiator_; + } else { + opts.dispatch_phase_3_max_per_initiator = {}; + } + + return ref; +} + +void ValidatorEngine::load_collator_options() { + auto r_data = td::read_file(collator_options_file()); + if (r_data.is_error()) { + return; + } + td::BufferSlice data = r_data.move_as_ok(); + auto r_collator_options = parse_collator_options(data.as_slice()); + if (r_collator_options.is_error()) { + LOG(ERROR) << "Failed to read collator options from file: " << r_collator_options.move_as_error(); + return; + } + validator_options_.write().set_collator_options(r_collator_options.move_as_ok()); +} + void ValidatorEngine::check_key(ton::PublicKeyHash id, td::Promise promise) { if (keys_.count(id) == 1) { promise.set_value(td::Unit()); @@ -3684,6 +3748,33 @@ void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setStateS }); } +void ValidatorEngine::run_control_query(ton::ton_api::engine_validator_setCollatorOptionsJson &query, + td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, + td::Promise promise) { + if (!(perm & ValidatorEnginePermissions::vep_modify)) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::error, "not authorized"))); + return; + } + if (!started_) { + promise.set_value(create_control_query_error(td::Status::Error(ton::ErrorCode::notready, "not started"))); + return; + } + auto r_collator_options = parse_collator_options(query.json_); + if (r_collator_options.is_error()) { + promise.set_value(create_control_query_error(r_collator_options.move_as_error_prefix("failed to parse json: "))); + return; + } + auto S = td::write_file(collator_options_file(), query.json_); + if (S.is_error()) { + promise.set_value(create_control_query_error(r_collator_options.move_as_error_prefix("failed to write file: "))); + return; + } + validator_options_.write().set_collator_options(r_collator_options.move_as_ok()); + td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::update_options, + validator_options_); + promise.set_value(ton::create_serialize_tl_object()); +} + void ValidatorEngine::process_control_query(td::uint16 port, ton::adnl::AdnlNodeIdShort src, ton::adnl::AdnlNodeIdShort dst, td::BufferSlice data, td::Promise promise) { diff --git a/validator-engine/validator-engine.hpp b/validator-engine/validator-engine.hpp index 8adc8d9a7..ad5e479b1 100644 --- a/validator-engine/validator-engine.hpp +++ b/validator-engine/validator-engine.hpp @@ -384,12 +384,16 @@ class ValidatorEngine : public td::actor::Actor { std::string custom_overlays_config_file() const { return db_root_ + "/custom-overlays.json"; } + std::string collator_options_file() const { + return db_root_ + "/collator-options.json"; + } void load_custom_overlays_config(); td::Status write_custom_overlays_config(); void add_custom_overlay_to_config( ton::tl_object_ptr overlay, td::Promise promise); void del_custom_overlay_from_config(std::string name, td::Promise promise); + void load_collator_options(); void check_key(ton::PublicKeyHash id, td::Promise promise); @@ -477,6 +481,8 @@ class ValidatorEngine : public td::actor::Actor { ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); void run_control_query(ton::ton_api::engine_validator_setStateSerializerEnabled &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); + void run_control_query(ton::ton_api::engine_validator_setCollatorOptionsJson &query, td::BufferSlice data, + ton::PublicKeyHash src, td::uint32 perm, td::Promise promise); template void run_control_query(T &query, td::BufferSlice data, ton::PublicKeyHash src, td::uint32 perm, td::Promise promise) { diff --git a/validator/fabric.h b/validator/fabric.h index 6bb668452..949a6c9ff 100644 --- a/validator/fabric.h +++ b/validator/fabric.h @@ -80,8 +80,8 @@ void run_validate_query(ShardIdFull shard, UnixTime min_ts, BlockIdExt min_maste td::Promise promise, bool is_fake = false); void run_collate_query(ShardIdFull shard, td::uint32 min_ts, const BlockIdExt& min_masterchain_block_id, std::vector prev, Ed25519_PublicKey local_id, td::Ref validator_set, - td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise); + td::Ref collator_opts, td::actor::ActorId manager, + td::Timestamp timeout, td::Promise promise); void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_block_id, std::vector prev, td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise); diff --git a/validator/impl/collator-impl.h b/validator/impl/collator-impl.h index edb007219..44286e722 100644 --- a/validator/impl/collator-impl.h +++ b/validator/impl/collator-impl.h @@ -71,6 +71,7 @@ class Collator final : public td::actor::Actor { std::vector> prev_states; std::vector> prev_block_data; Ed25519_PublicKey created_by_; + Ref collator_opts_; Ref validator_set_; td::actor::ActorId manager; td::Timestamp timeout; @@ -90,7 +91,8 @@ class Collator final : public td::actor::Actor { public: Collator(ShardIdFull shard, bool is_hardfork, td::uint32 min_ts, BlockIdExt min_masterchain_block_id, std::vector prev, Ref validator_set, Ed25519_PublicKey collator_id, - td::actor::ActorId manager, td::Timestamp timeout, td::Promise promise); + Ref collator_opts, td::actor::ActorId manager, td::Timestamp timeout, + td::Promise promise); ~Collator() override = default; bool is_busy() const { return busy_; @@ -195,6 +197,7 @@ class Collator final : public td::actor::Actor { std::unique_ptr in_msg_dict, out_msg_dict, out_msg_queue_, sibling_out_msg_queue_; std::map unprocessed_deferred_messages_; // number of messages from dispatch queue in new_msgs td::uint64 out_msg_queue_size_ = 0; + td::uint64 old_out_msg_queue_size_ = 0; bool have_out_msg_queue_size_in_state_ = false; std::unique_ptr ihr_pending; std::shared_ptr processed_upto_, sibling_processed_upto_; @@ -211,6 +214,8 @@ class Collator final : public td::actor::Actor { unsigned dispatch_queue_ops_{0}; std::map last_dispatch_queue_emitted_lt_; bool have_unprocessed_account_dispatch_queue_ = true; + td::uint64 defer_out_queue_size_limit_; + td::uint64 hard_defer_out_queue_size_limit_; bool msg_metadata_enabled_ = false; bool deferring_messages_enabled_ = false; diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index 81ec57561..43b17dfdc 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -50,7 +50,6 @@ static const td::uint32 SPLIT_MAX_QUEUE_SIZE = 100000; static const td::uint32 MERGE_MAX_QUEUE_SIZE = 2047; static const td::uint32 SKIP_EXTERNALS_QUEUE_SIZE = 8000; static const int HIGH_PRIORITY_EXTERNAL = 10; // don't skip high priority externals when queue is big -static const int DEFER_MESSAGES_AFTER = 10; // 10'th and later messages from address will be deferred #define DBG(__n) dbg(__n)&& #define DSTART int __dcnt = 0; @@ -72,20 +71,22 @@ static inline bool dbg(int c) { * @param prev A vector of BlockIdExt representing the previous blocks. * @param validator_set A reference to the ValidatorSet. * @param collator_id The public key of the block creator. + * @param collator_opts A reference to CollatorOptions. * @param manager The ActorId of the ValidatorManager. * @param timeout The timeout for the collator. * @param promise The promise to return the result. */ Collator::Collator(ShardIdFull shard, bool is_hardfork, UnixTime min_ts, BlockIdExt min_masterchain_block_id, - std::vector prev, td::Ref validator_set, Ed25519_PublicKey collator_id, - td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise) + std::vector prev, Ref validator_set, Ed25519_PublicKey collator_id, + Ref collator_opts, td::actor::ActorId manager, + td::Timestamp timeout, td::Promise promise) : shard_(shard) , is_hardfork_(is_hardfork) , min_ts(min_ts) , min_mc_block_id{min_masterchain_block_id} , prev_blocks(std::move(prev)) , created_by_(collator_id) + , collator_opts_(collator_opts) , validator_set_(std::move(validator_set)) , manager(manager) , timeout(timeout) @@ -1786,6 +1787,7 @@ bool Collator::try_collate() { last_proc_int_msg_.second.set_zero(); first_unproc_int_msg_.first = ~0ULL; first_unproc_int_msg_.second.set_ones(); + old_out_msg_queue_size_ = out_msg_queue_size_; if (is_masterchain()) { LOG(DEBUG) << "getting the list of special smart contracts"; auto res = config_->get_special_smartcontracts(); @@ -1970,6 +1972,10 @@ bool Collator::fetch_config_params() { return fatal_error(res.move_as_error()); } compute_phase_cfg_.libraries = std::make_unique(config_->get_libraries_root(), 256); + defer_out_queue_size_limit_ = std::max(collator_opts_->defer_out_queue_size_limit, + compute_phase_cfg_.size_limits.defer_out_queue_size_limit); + // This one is checked in validate-query + hard_defer_out_queue_size_limit_ = compute_phase_cfg_.size_limits.defer_out_queue_size_limit; return true; } @@ -3053,8 +3059,10 @@ int Collator::process_one_new_message(block::NewOutMsg msg, bool enqueue_only, R bool is_special_account = is_masterchain() && config_->is_special_smartcontract(src_addr); bool defer = false; if (!from_dispatch_queue) { - if (deferring_messages_enabled_ && !is_special && !is_special_account && msg.msg_idx != 0) { - if (++sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER) { + if (deferring_messages_enabled_ && collator_opts_->deferring_enabled && !is_special && !is_special_account && + msg.msg_idx != 0) { + if (++sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after || + out_msg_queue_size_ > defer_out_queue_size_limit_) { defer = true; } } @@ -3626,18 +3634,24 @@ int Collator::process_external_message(Ref msg) { * @returns True if the processing was successful, false otherwise. */ bool Collator::process_dispatch_queue() { + if (out_msg_queue_size_ > defer_out_queue_size_limit_ && old_out_msg_queue_size_ > hard_defer_out_queue_size_limit_) { + return true; + } have_unprocessed_account_dispatch_queue_ = true; - size_t max_total_count[3] = {1 << 30, 150, 150}; - size_t max_per_initiator[3] = {1 << 30, 20, 0}; - if (out_msg_queue_size_ <= 256) { + size_t max_total_count[3] = {1 << 30, collator_opts_->dispatch_phase_2_max_total, + collator_opts_->dispatch_phase_3_max_total}; + size_t max_per_initiator[3] = {1 << 30, collator_opts_->dispatch_phase_2_max_per_initiator, 0}; + if (collator_opts_->dispatch_phase_3_max_per_initiator) { + max_per_initiator[2] = collator_opts_->dispatch_phase_3_max_per_initiator.value(); + } else if (out_msg_queue_size_ <= 256) { max_per_initiator[2] = 10; } else if (out_msg_queue_size_ <= 512) { max_per_initiator[2] = 2; - } else if (out_msg_queue_size_ <= 2048) { + } else if (out_msg_queue_size_ <= 1500) { max_per_initiator[2] = 1; } for (int iter = 0; iter < 3; ++iter) { - if (max_per_initiator[iter] == 0) { + if (max_per_initiator[iter] == 0 || max_total_count[iter] == 0) { continue; } vm::AugmentedDictionary cur_dispatch_queue{dispatch_queue_->get_root(), 256, block::tlb::aug_DispatchQueue}; @@ -3676,7 +3690,8 @@ bool Collator::process_dispatch_queue() { // Remove message from DispatchQueue bool ok; - if (iter == 0 || (iter == 1 && sender_generated_messages_count_[src_addr] >= DEFER_MESSAGES_AFTER)) { + if (iter == 0 || + (iter == 1 && sender_generated_messages_count_[src_addr] >= collator_opts_->defer_messages_after)) { ok = cur_dispatch_queue.lookup_delete(src_addr).not_null(); } else { dict.lookup_delete(key); diff --git a/validator/impl/fabric.cpp b/validator/impl/fabric.cpp index 997fa9a18..d69492393 100644 --- a/validator/impl/fabric.cpp +++ b/validator/impl/fabric.cpp @@ -213,8 +213,8 @@ void run_validate_query(ShardIdFull shard, UnixTime min_ts, BlockIdExt min_maste void run_collate_query(ShardIdFull shard, td::uint32 min_ts, const BlockIdExt& min_masterchain_block_id, std::vector prev, Ed25519_PublicKey collator_id, td::Ref validator_set, - td::actor::ActorId manager, td::Timestamp timeout, - td::Promise promise) { + td::Ref collator_opts, td::actor::ActorId manager, + td::Timestamp timeout, td::Promise promise) { BlockSeqno seqno = 0; for (auto& p : prev) { if (p.seqno() > seqno) { @@ -223,7 +223,8 @@ void run_collate_query(ShardIdFull shard, td::uint32 min_ts, const BlockIdExt& m } td::actor::create_actor(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, false, min_ts, min_masterchain_block_id, std::move(prev), std::move(validator_set), - collator_id, std::move(manager), timeout, std::move(promise)) + collator_id, std::move(collator_opts), std::move(manager), timeout, + std::move(promise)) .release(); } @@ -238,7 +239,8 @@ void run_collate_hardfork(ShardIdFull shard, const BlockIdExt& min_masterchain_b } td::actor::create_actor(PSTRING() << "collate" << shard.to_str() << ":" << (seqno + 1), shard, true, 0, min_masterchain_block_id, std::move(prev), td::Ref{}, - Ed25519_PublicKey{Bits256::zero()}, std::move(manager), timeout, std::move(promise)) + Ed25519_PublicKey{Bits256::zero()}, td::Ref{true}, + std::move(manager), timeout, std::move(promise)) .release(); } diff --git a/validator/impl/validate-query.cpp b/validator/impl/validate-query.cpp index 5345ed725..8b062ce98 100644 --- a/validator/impl/validate-query.cpp +++ b/validator/impl/validate-query.cpp @@ -3405,7 +3405,7 @@ bool ValidateQuery::check_account_dispatch_queue_update(td::Bits256 addr, Ref 0 && max_removed_lt == 0) { - have_unprocessed_account_dispatch_queue_ = true; + ++processed_account_dispatch_queues_; } return true; } @@ -3432,6 +3432,27 @@ bool ValidateQuery::unpack_dispatch_queue_update() { if (!res) { return reject_query("invalid DispatchQueue dictionary in the new state"); } + + if (old_out_msg_queue_size_ <= compute_phase_cfg_.size_limits.defer_out_queue_size_limit) { + // Check that at least one message was taken from each AccountDispatchQueue + try { + have_unprocessed_account_dispatch_queue_ = false; + td::uint64 total_account_dispatch_queues = 0; + ps_.dispatch_queue_->check_for_each([&](Ref, td::ConstBitPtr, int n) -> bool { + assert(n == 352); + ++total_account_dispatch_queues; + if (total_account_dispatch_queues > processed_account_dispatch_queues_) { + return false; + } + return true; + }); + have_unprocessed_account_dispatch_queue_ = + (total_account_dispatch_queues != processed_account_dispatch_queues_); + } catch (vm::VmVirtError&) { + // VmVirtError can happen if we have only a proof of ShardState + have_unprocessed_account_dispatch_queue_ = true; + } + } } catch (vm::VmError& err) { return reject_query("invalid DispatchQueue dictionary difference between the old and the new state: "s + err.get_msg()); @@ -3694,7 +3715,8 @@ bool ValidateQuery::check_in_msg(td::ConstBitPtr key, Ref in_msg) } if (have_unprocessed_account_dispatch_queue_ && tag != block::gen::InMsg::msg_import_ext && tag != block::gen::InMsg::msg_import_deferred_tr && tag != block::gen::InMsg::msg_import_deferred_fin) { - // Collator is requeired to take at least one message from each AccountDispatchQueue (unless the block is full) + // Collator is requeired to take at least one message from each AccountDispatchQueue + // (unless the block is full or unless out_msg_queue_size is big) // If some AccountDispatchQueue is unporcessed then it's not allowed to import other messages except for externals return reject_query("required DispatchQueue processing is not done, but some other internal messages are imported"); } diff --git a/validator/impl/validate-query.hpp b/validator/impl/validate-query.hpp index ec77d2c89..824afb49d 100644 --- a/validator/impl/validate-query.hpp +++ b/validator/impl/validate-query.hpp @@ -241,6 +241,7 @@ class ValidateQuery : public td::actor::Actor { bool deferring_messages_enabled_ = false; bool store_out_msg_queue_size_ = false; + td::uint64 processed_account_dispatch_queues_ = 0; bool have_unprocessed_account_dispatch_queue_ = false; td::PerfWarningTimer perf_timer_; diff --git a/validator/manager-disk.cpp b/validator/manager-disk.cpp index 17b793c7a..5678408c6 100644 --- a/validator/manager-disk.cpp +++ b/validator/manager-disk.cpp @@ -128,8 +128,8 @@ void ValidatorManagerImpl::sync_complete(td::Promise promise) { } Ed25519_PublicKey created_by{td::Bits256::zero()}; td::as(created_by.as_bits256().data() + 32 - 4) = ((unsigned)std::time(nullptr) >> 8); - run_collate_query(shard_id, 0, last_masterchain_block_id_, prev, created_by, val_set, actor_id(this), - td::Timestamp::in(10.0), std::move(P)); + run_collate_query(shard_id, 0, last_masterchain_block_id_, prev, created_by, val_set, td::Ref{true}, + actor_id(this), td::Timestamp::in(10.0), std::move(P)); } void ValidatorManagerImpl::validate_fake(BlockCandidate candidate, std::vector prev, BlockIdExt last, diff --git a/validator/manager.cpp b/validator/manager.cpp index 2af818e6f..3dbca852d 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -3133,10 +3133,16 @@ void ValidatorManagerImpl::get_validator_groups_info_for_litequery( } void ValidatorManagerImpl::update_options(td::Ref opts) { - // Currently options can be updated only to change state_serializer_enabled flag + // Currently options can be updated only to change state_serializer_enabled flag and collator_options if (!serializer_.empty()) { td::actor::send_closure(serializer_, &AsyncStateSerializer::update_options, opts); } + for (auto &group : validator_groups_) { + td::actor::send_closure(group.second.actor, &ValidatorGroup::update_options, opts); + } + for (auto &group : next_validator_groups_) { + td::actor::send_closure(group.second.actor, &ValidatorGroup::update_options, opts); + } opts_ = std::move(opts); } diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 68e2b07ec..fc3ebe541 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -52,8 +52,8 @@ void ValidatorGroup::generate_block_candidate( })); run_collate_query( shard_, min_ts_, min_masterchain_block_id_, prev_block_ids_, - Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_, manager_, td::Timestamp::in(10.0), - [SelfId = actor_id(this), cache = cached_collated_block_](td::Result R) { + Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, validator_set_, opts_->get_collator_options(), manager_, + td::Timestamp::in(10.0), [SelfId = actor_id(this), cache = cached_collated_block_](td::Result R) { td::actor::send_closure(SelfId, &ValidatorGroup::generated_block_candidate, std::move(cache), std::move(R)); }); } diff --git a/validator/validator-group.hpp b/validator/validator-group.hpp index f99402647..3499da9d7 100644 --- a/validator/validator-group.hpp +++ b/validator/validator-group.hpp @@ -64,6 +64,10 @@ class ValidatorGroup : public td::actor::Actor { void get_validator_group_info_for_litequery( td::Promise> promise); + void update_options(td::Ref opts) { + opts_ = std::move(opts); + } + ValidatorGroup(ShardIdFull shard, PublicKeyHash local_id, ValidatorSessionId session_id, td::Ref validator_set, validatorsession::ValidatorSessionOptions config, td::actor::ActorId keyring, td::actor::ActorId adnl, diff --git a/validator/validator-options.hpp b/validator/validator-options.hpp index 37006bdad..9e7767796 100644 --- a/validator/validator-options.hpp +++ b/validator/validator-options.hpp @@ -144,6 +144,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { bool get_state_serializer_enabled() const override { return state_serializer_enabled_; } + td::Ref get_collator_options() const override { + return collator_options_; + } void set_zero_block_id(BlockIdExt block_id) override { zero_block_id_ = block_id; @@ -227,6 +230,9 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { void set_state_serializer_enabled(bool value) override { state_serializer_enabled_ = value; } + void set_collator_options(td::Ref value) override { + collator_options_ = std::move(value); + } ValidatorManagerOptionsImpl *make_copy() const override { return new ValidatorManagerOptionsImpl(*this); @@ -279,6 +285,7 @@ struct ValidatorManagerOptionsImpl : public ValidatorManagerOptions { bool celldb_preload_all_ = false; td::optional catchain_max_block_delay_; bool state_serializer_enabled_ = true; + td::Ref collator_options_{true}; }; } // namespace validator diff --git a/validator/validator.h b/validator/validator.h index c4082c555..5bbb66475 100644 --- a/validator/validator.h +++ b/validator/validator.h @@ -51,6 +51,21 @@ struct PerfTimerStats { std::deque> stats; // }; +struct CollatorOptions : public td::CntObject { + bool deferring_enabled = true; + + // Defer messages from account after Xth message in block (excluding first messages from transactions) + td::uint32 defer_messages_after = 10; + // Defer all messages if out msg queue size is greater than X (excluding first messages from transactions) + td::uint64 defer_out_queue_size_limit = 2048; + + // See Collator::process_dispatch_queue + td::uint32 dispatch_phase_2_max_total = 150; + td::uint32 dispatch_phase_3_max_total = 150; + td::uint32 dispatch_phase_2_max_per_initiator = 20; + td::optional dispatch_phase_3_max_per_initiator; // Default - depends on out msg queue size +}; + struct ValidatorManagerOptions : public td::CntObject { public: enum class ShardCheckMode { m_monitor, m_validate }; @@ -91,6 +106,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual bool get_celldb_preload_all() const = 0; virtual td::optional get_catchain_max_block_delay() const = 0; virtual bool get_state_serializer_enabled() const = 0; + virtual td::Ref get_collator_options() const = 0; virtual void set_zero_block_id(BlockIdExt block_id) = 0; virtual void set_init_block_id(BlockIdExt block_id) = 0; @@ -120,6 +136,7 @@ struct ValidatorManagerOptions : public td::CntObject { virtual void set_celldb_preload_all(bool value) = 0; virtual void set_catchain_max_block_delay(double value) = 0; virtual void set_state_serializer_enabled(bool value) = 0; + virtual void set_collator_options(td::Ref value) = 0; static td::Ref create( BlockIdExt zero_block_id, BlockIdExt init_block_id,