Skip to content
This repository has been archived by the owner on Nov 18, 2021. It is now read-only.

Commit

Permalink
Sync, Restore and DB Bugfixes
Browse files Browse the repository at this point in the history
- Fix unit tests
- Auto style updates
- Disable reactor monitor repeated printing for known long operations
- Remove racy second stage merkle search (duplicate from bringup)
- Minor logging update for error cases and for merkle hash search
- Force recreation of bloom filter and add logging for recovery process
- Remove incorrect assertion
- Fix issue with not clearing fixed size journal if file has been previously opened
- Fix issue with single object store not writing to beginning of file
  • Loading branch information
ejfitzgerald committed Apr 2, 2020
1 parent 5a5a144 commit 62a6db5
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 104 deletions.
32 changes: 28 additions & 4 deletions libs/constellation/src/constellation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,11 +991,35 @@ bool Constellation::CheckStateIntegrity()
return true;
}

// Walk back down the chain until we find a state we could revert to
while (current_block &&
!storage_->HashExists(current_block->merkle_hash, current_block->block_number))
{
current_block = chain_->GetBlock(current_block->previous_hash);
using Clock = std::chrono::steady_clock;
using Timepoint = Clock::time_point;

FETCH_LOG_INFO(LOGGING_NAME, "Searching for starting state hash...");

Timepoint last_notify = Clock::now();

// Walk back down the chain until we find a state we could revert to
while (current_block &&
!storage_->HashExists(current_block->merkle_hash, current_block->block_number))
{
current_block = chain_->GetBlock(current_block->previous_hash);

if (current_block && ((current_block->block_number & 0x3f) == 0))
{
auto const now = Clock::now();
auto const delta = now - last_notify;

if (delta >= std::chrono::seconds{5})
{
FETCH_LOG_INFO(LOGGING_NAME, "Searching for starting state hash... (current block: ",
current_block->block_number, ")");
last_notify = now;
}
}
}

FETCH_LOG_INFO(LOGGING_NAME, "Searching for starting state hash...complete");
}

if (!current_block)
Expand Down
5 changes: 5 additions & 0 deletions libs/core/include/core/runnable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class Runnable

virtual std::string GetId() const = 0;

virtual bool IsExpectedToBlock() const
{
return false;
}

virtual std::string GetDebug() const
{
return std::string("No debug info");
Expand Down
10 changes: 10 additions & 0 deletions libs/core/include/core/state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class StateMachine : public StateMachineInterface, public Runnable
void Execute() override;
std::string GetId() const override;
std::string GetDebug() const override;
bool IsExpectedToBlock() const override
{
return blocking_;
}
/// @}

State state() const
Expand All @@ -101,6 +105,11 @@ class StateMachine : public StateMachineInterface, public Runnable
return previous_state_;
}

void SetBlocking(bool blocking)
{
blocking_ = blocking;
}

template <typename R, typename P>
void Delay(std::chrono::duration<R, P> const &delay);

Expand Down Expand Up @@ -129,6 +138,7 @@ class StateMachine : public StateMachineInterface, public Runnable
ProtectedCallbackMap callbacks_{};
std::atomic<State> current_state_;
std::atomic<State> previous_state_{current_state_.load()};
std::atomic<bool> blocking_{false};
Timepoint next_execution_{};
ProtectedStateChangeCallback state_change_callback_{};
telemetry::GaugePtr<uint64_t> state_gauge_;
Expand Down
25 changes: 20 additions & 5 deletions libs/core/src/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,23 +176,38 @@ void Reactor::StopWorkerAndWatcher()
void Reactor::ReactorWatch()
{
uint32_t last_seen_executed = 0;
uint64_t notifications = 0;

while (running_)
{
std::unique_lock<std::mutex> lock(cv_m_);
cv_.wait_for(lock, std::chrono::milliseconds(thread_watcher_check_ms_));

auto runnable_concrete = last_executed_runnable_.lock();
std::string runnable_name = runnable_concrete ? runnable_concrete->GetId() : "nullptr fail";
auto runnable_concrete = last_executed_runnable_.lock();
bool const runnable_expected_to_block =
runnable_concrete ? runnable_concrete->IsExpectedToBlock() : false;
std::string runnable_name = runnable_concrete ? runnable_concrete->GetId() : "nullptr fail";
std::string runnable_debug = runnable_concrete ? runnable_concrete->GetDebug() : "nullptr fail";

if ((last_seen_executed == execution_counter_) && currently_executing_)
{
FETCH_LOG_WARN(LOGGING_NAME,
"Very long execution noticed at execution counter: ", last_seen_executed,
". from runnable: ", runnable_name, " debug: ", runnable_debug);
bool const suppress_log = runnable_expected_to_block && (notifications > 0);

if (!suppress_log)
{
FETCH_LOG_WARN(LOGGING_NAME,
"Very long execution noticed at execution counter: ", last_seen_executed,
". from runnable: ", runnable_name, " debug: ", runnable_debug,
" blocking: ", runnable_expected_to_block);
}

executions_way_too_long_++;
way_too_long_total_->increment();
++notifications;
}
else
{
notifications = 0;
}

last_seen_executed = execution_counter_;
Expand Down
65 changes: 7 additions & 58 deletions libs/ledger/src/chain/block_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,64 +246,9 @@ BlockCoordinator::State BlockCoordinator::OnReloadState()
current_block_coord_state_->set(static_cast<uint64_t>(state_machine_->state()));
reload_state_count_->increment();

// By default we need to populate this.
current_block_ = MainChain::CreateGenesisBlock();

FETCH_LOG_INFO(LOGGING_NAME, "Loading block coordinator old state...");

auto block = chain_.GetHeaviestBlock();

if (block->IsGenesis())
{
FETCH_LOG_INFO(LOGGING_NAME, "The main chain's heaviest is genesis. Nothing to load.");
return State::RESET;
}

// Walk back down the chain until we find a state we can revert to
while (block && !storage_unit_.HashExists(block->merkle_hash, block->block_number))
{
block = chain_.GetBlock(block->previous_hash);
}

if (!block)
{
FETCH_LOG_WARN(LOGGING_NAME, "Failed to walk back the chain when recovering!");
}

if (block && storage_unit_.HashExists(block->merkle_hash, block->block_number))
{
FETCH_LOG_INFO(LOGGING_NAME, "Found a block to revert to! Block: ", block->block_number,
" hex: 0x", block->hash.ToHex(), " merkle hash: 0x", block->merkle_hash.ToHex());

if (!storage_unit_.RevertToHash(block->merkle_hash, block->block_number))
{
FETCH_LOG_WARN(LOGGING_NAME, "The revert operation failed!");
return State::RESET;
}

FETCH_LOG_INFO(LOGGING_NAME, "Reverted storage unit.");

// Need to revert the DAG too
if (dag_ && !dag_->RevertToEpoch(block->block_number))
{
FETCH_LOG_WARN(LOGGING_NAME, "Reverting the DAG failed!");
return State::RESET;
}

FETCH_LOG_INFO(LOGGING_NAME, "reverted dag.");

// we need to update the execution manager state and also our locally cached state about the
// 'last' block that has been executed
execution_manager_.SetLastProcessedBlock(block->hash);
last_executed_block_.ApplyVoid([&block](auto &digest) { digest = block->hash; });
current_block_ = block;

FETCH_LOG_INFO(LOGGING_NAME, "Success.");
}
else
{
FETCH_LOG_INFO(LOGGING_NAME, "Didn't find any prior merkle state to revert to.");
}
// keep everything in sync
last_executed_block_.ApplyVoid(
[this](auto &digest) { digest = execution_manager_.LastProcessedBlock(); });

return State::RESET;
}
Expand Down Expand Up @@ -398,9 +343,13 @@ BlockCoordinator::State BlockCoordinator::OnSynchronising()

if (blocks_to_common_ancestor_.empty())
{
state_machine_->SetBlocking(true);

lookup_success = chain_.GetPathToCommonAncestor(
blocks_to_common_ancestor_, current_hash, last_processed_block,
COMMON_PATH_TO_ANCESTOR_LENGTH_LIMIT, MainChain::BehaviourWhenLimit::RETURN_LEAST_RECENT);

state_machine_->SetBlocking(false);
}
else
{
Expand Down
46 changes: 27 additions & 19 deletions libs/ledger/src/chain/main_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,6 @@ namespace {

constexpr uint64_t DIRTY_TIMEOUT = 600;

bloom::HistoricalBloomFilter::Mode SelectMode(MainChain::Mode mode)
{
bloom::HistoricalBloomFilter::Mode bloom_mode{bloom::HistoricalBloomFilter::Mode ::LOAD_DATABASE};

switch (mode)
{
case MainChain::Mode::IN_MEMORY_DB:
case MainChain::Mode::CREATE_PERSISTENT_DB:
bloom_mode = bloom::HistoricalBloomFilter::Mode::NEW_DATABASE;
case MainChain::Mode::LOAD_PERSISTENT_DB:
break;
}

return bloom_mode;
}

} // namespace

/**
Expand All @@ -81,8 +65,9 @@ MainChain::MainChain(Mode mode, bool dirty_block_functionality)
MainChain::MainChain(Mode mode, Config const &cfg)
: mode_{mode}
, dirty_block_functionality_{cfg.enable_dirty_blocks}
, bloom_filter_{SelectMode(mode), "chain.hbloom.v3.db", "chain.hbloom.meta.v2.db",
cfg.bloom_filter_window, cfg.bloom_filter_cached_buckets}
, bloom_filter_{bloom::HistoricalBloomFilter::Mode::NEW_DATABASE, "chain.hbloom.v3.db",
"chain.hbloom.meta.v3.db", cfg.bloom_filter_window,
cfg.bloom_filter_cached_buckets}
, bloom_filter_queried_bit_count_(telemetry::Registry::Instance().CreateGauge<std::size_t>(
"ledger_main_chain_bloom_filter_queried_bit_number",
"Total number of bits checked during each query to the Ledger Main Chain Bloom filter"))
Expand Down Expand Up @@ -112,7 +97,9 @@ MainChain::MainChain(Mode mode, Config const &cfg)
// create the block store
block_store_ = std::make_unique<BlockStore>();

FETCH_LOG_INFO(LOGGING_NAME, "Starting main chain recovery...");
RecoverFromFile(mode_);
FETCH_LOG_INFO(LOGGING_NAME, "Starting main chain recovery...complete");
}

// create the genesis block and add it to the cache
Expand Down Expand Up @@ -1051,10 +1038,17 @@ void MainChain::RecoverFromFile(Mode mode)
// clear the transaction bloom filter
bloom_filter_.Reset();

using Clock = std::chrono::steady_clock;
using Timepoint = Clock::time_point;
using Duration = Clock::duration;

Timepoint last_notify = Clock::now();

bool recovery_complete{false};
if (!head_block_hash.empty() && LoadBlock(head_block_hash, *head))
{
auto block_index = head->block_number;
auto block_index = head->block_number;
auto const head_index = block_index;

// Copy head block so as to walk down the chain
IntBlockPtr next = std::make_shared<Block>(*head);
Expand All @@ -1072,6 +1066,20 @@ void MainChain::RecoverFromFile(Mode mode)
if ((block_index % bloom_filter_.window_size()) == 0)
{
bloom_filter_.TrimCache();

Timepoint const now = Clock::now();
Duration const delta_notify = now - last_notify;

if (delta_notify >= std::chrono::seconds{10})
{
// calculate the progress %
auto const progress = (100 * (head_index - block_index)) / (head_index + 1);

FETCH_LOG_INFO(LOGGING_NAME, "Chain recovery in progress: ", head_index - block_index,
"/", head_index + 1, " : ", progress, "%");

last_notify = now;
}
}

block_index = next->block_number;
Expand Down
4 changes: 4 additions & 0 deletions libs/ledger/src/chaincode/token_contract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ bool TokenContract::TransferTokens(chain::Transaction const &tx, chain::Address
WalletRecord from_record{};
if (!GetStateRecord(from_record, tx.from()))
{
FETCH_LOG_WARN(LOGGING_NAME, "Unable to lookup the token state record for ",
tx.from().display());
return false;
}

Expand All @@ -180,6 +182,7 @@ bool TokenContract::TransferTokens(chain::Transaction const &tx, chain::Address
// Verify that current transaction possesses authority to perform the transfer
if (!from_record.deed->Verify(tx, Deed::TRANSFER))
{
FETCH_LOG_WARN(LOGGING_NAME, "Multisig Deed Failure: ", tx.from().display());
return false;
}
}
Expand All @@ -191,6 +194,7 @@ bool TokenContract::TransferTokens(chain::Transaction const &tx, chain::Address
// SOURCE address MUST be present when NO preceding deed exists.
if (!tx.IsSignedByFromAddress())
{
FETCH_LOG_WARN(LOGGING_NAME, "Signature Failure: ", tx.from().display());
return false;
}
}
Expand Down
7 changes: 6 additions & 1 deletion libs/ledger/src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ Executor::Result Executor::Execute(Digest const &digest, BlockIndex block, Slice
// attempt to retrieve the transaction from the storage
if (!RetrieveTransaction(digest))
{
FETCH_LOG_WARN(LOGGING_NAME, "Unable to retrieve tx: 0x", digest.ToHex());

// signal that the contract failed to be executed
result.status = Status::TX_LOOKUP_FAILURE;
}
Expand All @@ -121,6 +123,8 @@ Executor::Result Executor::Execute(Digest const &digest, BlockIndex block, Slice

if (!success)
{
FETCH_LOG_WARN(LOGGING_NAME, "Failure to execute TX: 0x", digest.ToHex());

// in addition to avoid indeterminate data being partially flushed. In the case of the when
// the transaction execution fails then we also clear all the cached data.
storage_cache_->Clear();
Expand Down Expand Up @@ -210,6 +214,8 @@ bool Executor::ValidationChecks(Result &result)

if (status != Status::SUCCESS)
{
FETCH_LOG_WARN(LOGGING_NAME, "Basic Validation Checks Failed: ", ToString(status));

result.status = status;
return false;
}
Expand Down Expand Up @@ -238,7 +244,6 @@ bool Executor::ExecuteTransactionContract(Result &result)
contract_id = current_tx_->chain_code();
break;
case ContractMode::NOT_PRESENT:
break;
case ContractMode::SYNERGETIC:
// synergetic contracts are not supported through normal pipeline
break;
Expand Down
8 changes: 4 additions & 4 deletions libs/ledger/src/storage_unit/storage_unit_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ byte_array::ConstByteArray StorageUnitClient::Commit(uint64_t const commit_index
}
}

FETCH_LOG_INFO(LOGGING_NAME, "Committing merkle hash at index: ", commit_index, " to stack: 0x",
tree.root().ToHex());
FETCH_LOG_DEBUG(LOGGING_NAME, "Committing merkle hash at index: ", commit_index,
" to stack: 0x", tree.root().ToHex());

permanent_state_merkle_stack_.Push(tree);
permanent_state_merkle_stack_.Flush(false);
Expand All @@ -324,8 +324,8 @@ bool StorageUnitClient::HashInStack(Hash const &hash, uint64_t index)

if (index >= merkle_stack_size)
{
FETCH_LOG_WARN(LOGGING_NAME, "Tried to find hash in merkle tree when index more than stack: ",
merkle_stack_size, " index: ", index);
FETCH_LOG_DEBUG(LOGGING_NAME, "Tried to find hash in merkle tree when index more than stack: ",
merkle_stack_size, " index: ", index);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ TransactionStoreSyncService::State TransactionStoreSyncService::OnResolvingObjec
TransactionStoreSyncService::State TransactionStoreSyncService::OnQuerySubtree()
{
current_tss_state_->set(static_cast<uint64_t>(state_machine_->state()));
assert(!roots_to_sync_.empty());
auto const orig_num_of_roots{roots_to_sync_.size()};

auto const directly_connected_peers = muddle_.GetDirectlyConnectedPeers();
Expand Down
Loading

0 comments on commit 62a6db5

Please sign in to comment.