From 615e535a8cc6c1c4c9fc4d0854697f27108e0c60 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Thu, 14 Nov 2024 09:30:26 -0800 Subject: [PATCH] Close ledgers in parallel when flag is enabled --- src/bucket/BucketList.cpp | 8 +- src/bucket/BucketList.h | 4 +- src/bucket/BucketListSnapshot.cpp | 2 - src/bucket/BucketManager.h | 10 +- src/bucket/BucketManagerImpl.cpp | 83 +++++---- src/bucket/BucketManagerImpl.h | 12 +- src/bucket/BucketSnapshotManager.cpp | 1 - src/bucket/test/BucketListTests.cpp | 3 +- src/bucket/test/BucketTestUtils.cpp | 10 +- src/catchup/ApplyCheckpointWork.cpp | 2 +- src/catchup/CatchupManagerImpl.cpp | 10 +- src/catchup/CatchupWork.cpp | 5 +- src/catchup/DownloadApplyTxsWork.h | 2 +- src/database/Database.cpp | 40 +++- src/database/Database.h | 14 +- src/herder/HerderImpl.cpp | 5 +- src/herder/LedgerCloseData.h | 2 + src/herder/TxSetFrame.cpp | 2 +- src/herder/test/HerderTests.cpp | 47 +++-- src/herder/test/UpgradesTests.cpp | 18 +- src/history/HistoryManagerImpl.cpp | 2 +- src/history/test/HistoryTestsUtils.cpp | 1 + src/ledger/LedgerHeaderUtils.cpp | 4 +- src/ledger/LedgerHeaderUtils.h | 2 +- src/ledger/LedgerManager.h | 8 +- src/ledger/LedgerManagerImpl.cpp | 175 +++++++++++------- src/ledger/LedgerManagerImpl.h | 11 +- src/ledger/LedgerTxn.cpp | 15 +- src/ledger/LedgerTxnImpl.h | 2 +- src/main/AppConnector.cpp | 13 +- src/main/AppConnector.h | 5 +- src/main/Application.h | 3 + src/main/ApplicationImpl.cpp | 76 +++++++- src/main/ApplicationImpl.h | 10 +- src/main/ApplicationUtils.cpp | 2 +- src/main/Config.cpp | 13 ++ src/main/Config.h | 4 + src/main/ExternalQueue.cpp | 20 +- src/main/test/ApplicationUtilsTests.cpp | 4 +- src/main/test/ConfigTests.cpp | 6 +- src/overlay/OverlayManagerImpl.cpp | 22 +-- src/simulation/LoadGenerator.cpp | 7 + src/simulation/test/LoadGeneratorTests.cpp | 1 - src/test/FuzzerImpl.cpp | 5 +- src/test/TestUtils.cpp | 8 +- src/test/TxTests.cpp | 13 +- .../ExtendFootprintTTLOpFrame.cpp | 6 +- .../InvokeHostFunctionOpFrame.cpp | 5 +- src/transactions/OperationFrame.cpp | 3 +- src/transactions/RestoreFootprintOpFrame.cpp | 12 +- src/transactions/TransactionFrame.cpp | 11 +- src/transactions/TransactionUtils.cpp | 2 +- src/transactions/test/SorobanTxTestUtils.cpp | 2 +- 53 files changed, 482 insertions(+), 271 deletions(-) diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index e714280a7e..ed701ea7e3 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -11,6 +11,7 @@ #include "crypto/SHA.h" #include "ledger/LedgerManager.h" #include "ledger/LedgerTxn.h" +#include "ledger/NetworkConfig.h" #include "main/Application.h" #include "util/GlobalChecks.h" #include "util/Logging.h" @@ -56,7 +57,6 @@ BucketLevel::getNext() void BucketLevel::setNext(FutureBucket const& fb) { - releaseAssert(threadIsMain()); mNextCurr = fb; } @@ -75,7 +75,6 @@ BucketLevel::getSnap() const void BucketLevel::setCurr(std::shared_ptr b) { - releaseAssert(threadIsMain()); mNextCurr.clear(); mCurr = b; } @@ -761,7 +760,8 @@ void BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx, uint32_t ledgerSeq, EvictionCounters& counters, - std::shared_ptr stats) + std::shared_ptr stats, + SorobanNetworkConfig& networkConfig) { releaseAssert(stats); @@ -770,8 +770,6 @@ BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx, return iter.isCurrBucket ? level.getCurr() : level.getSnap(); }; - auto const& networkConfig = - app.getLedgerManager().getSorobanNetworkConfig(); auto const firstScanLevel = networkConfig.stateArchivalSettings().startingEvictionScanLevel; auto evictionIter = networkConfig.evictionIterator(); diff --git a/src/bucket/BucketList.h b/src/bucket/BucketList.h index 09549ac1ad..f9f3b3f728 100644 --- a/src/bucket/BucketList.h +++ b/src/bucket/BucketList.h @@ -349,6 +349,7 @@ struct BucketEntryCounters; class Config; struct EvictionCounters; struct InflationWinner; +class SorobanNetworkConfig; namespace testutil { @@ -478,7 +479,8 @@ class BucketList void scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx, uint32_t ledgerSeq, EvictionCounters& counters, - std::shared_ptr stats); + std::shared_ptr stats, + SorobanNetworkConfig& networkConfig); // Restart any merges that might be running on background worker threads, // merging buckets between levels. This needs to be called after forcing a diff --git a/src/bucket/BucketListSnapshot.cpp b/src/bucket/BucketListSnapshot.cpp index 5d26fd8296..ddc8856398 100644 --- a/src/bucket/BucketListSnapshot.cpp +++ b/src/bucket/BucketListSnapshot.cpp @@ -17,8 +17,6 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl, LedgerHeader header) : mHeader(std::move(header)) { - releaseAssert(threadIsMain()); - for (uint32_t i = 0; i < BucketList::kNumLevels; ++i) { auto const& level = bl.getLevel(i); diff --git a/src/bucket/BucketManager.h b/src/bucket/BucketManager.h index a64bd9181f..dea63e685d 100644 --- a/src/bucket/BucketManager.h +++ b/src/bucket/BucketManager.h @@ -35,6 +35,7 @@ struct HistoryArchiveState; struct InflationWinner; struct LedgerHeader; struct MergeKey; +class SorobanNetworkConfig; // A fine-grained merge-operation-counter structure for tracking various // events during merges. These are not medida counters because we do not @@ -288,12 +289,15 @@ class BucketManager : NonMovableOrCopyable // pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries // have been evicted or maxEvictionScanSize bytes have been scanned. virtual void scanForEvictionLegacy(AbstractLedgerTxn& ltx, - uint32_t ledgerSeq) = 0; + uint32_t ledgerSeq, + SorobanNetworkConfig& networkConfig) = 0; - virtual void startBackgroundEvictionScan(uint32_t ledgerSeq) = 0; + virtual void startBackgroundEvictionScan(uint32_t ledgerSeq, + bool callFromLedgerClose) = 0; virtual void resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq, - LedgerKeySet const& modifiedKeys) = 0; + LedgerKeySet const& modifiedKeys, + SorobanNetworkConfig& networkConfig) = 0; virtual medida::Meter& getBloomMissMeter() const = 0; virtual medida::Meter& getBloomLookupMeter() const = 0; diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index b228f93e83..d5b8583157 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -91,7 +91,7 @@ void BucketManagerImpl::initialize() { ZoneScoped; - std::string d = mApp.getConfig().BUCKET_DIR_PATH; + std::string d = mConfig.BUCKET_DIR_PATH; if (!fs::exists(d)) { @@ -122,17 +122,17 @@ BucketManagerImpl::initialize() mLockedBucketDir = std::make_unique(d); mTmpDirManager = std::make_unique(d + "/tmp"); - if (mApp.getConfig().MODE_ENABLES_BUCKETLIST) + if (mConfig.MODE_ENABLES_BUCKETLIST) { mBucketList = std::make_unique(); - if (mApp.getConfig().isUsingBucketListDB()) + if (mConfig.isUsingBucketListDB()) { mSnapshotManager = std::make_unique( mApp, std::make_unique(*mBucketList, LedgerHeader()), - mApp.getConfig().QUERY_SNAPSHOT_LEDGERS); + mConfig.QUERY_SNAPSHOT_LEDGERS); } } @@ -141,11 +141,10 @@ BucketManagerImpl::initialize() // BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database // so create the remaining ledger header, transactions and results // directories - createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig()); - createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, - mApp.getConfig()); - createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig()); - HistoryManager::createPublishQueueDir(mApp.getConfig()); + createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mConfig); + createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mConfig); + createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mConfig); + HistoryManager::createPublishQueueDir(mConfig); } void @@ -178,6 +177,7 @@ EvictionCounters::EvictionCounters(Application& app) BucketManagerImpl::BucketManagerImpl(Application& app) : mApp(app) + , mConfig(app.getConfig()) , mBucketList(nullptr) , mSnapshotManager(nullptr) , mTmpDirManager(nullptr) @@ -299,7 +299,7 @@ void BucketManagerImpl::deleteEntireBucketDir() { ZoneScoped; - std::string d = mApp.getConfig().BUCKET_DIR_PATH; + std::string d = mConfig.BUCKET_DIR_PATH; if (fs::exists(d)) { // First clean out the contents of the tmpdir, as usual. @@ -332,7 +332,7 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir() // Then delete the lockfile $BUCKET_DIR_PATH/stellar-core.lock if (mLockedBucketDir) { - std::string d = mApp.getConfig().BUCKET_DIR_PATH; + std::string d = mConfig.BUCKET_DIR_PATH; std::string lock = d + "/" + kLockFilename; releaseAssert(fs::exists(lock)); fs::unlockFile(lock); @@ -343,14 +343,14 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir() BucketList& BucketManagerImpl::getBucketList() { - releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST); + releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST); return *mBucketList; } BucketSnapshotManager& BucketManagerImpl::getBucketSnapshotManager() const { - releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB()); + releaseAssertOrThrow(mConfig.isUsingBucketListDB()); releaseAssert(mSnapshotManager); return *mSnapshotManager; } @@ -476,7 +476,7 @@ BucketManagerImpl::renameBucketDirFile(std::filesystem::path const& src, std::filesystem::path const& dst) { ZoneScoped; - if (mApp.getConfig().DISABLE_XDR_FSYNC) + if (mConfig.DISABLE_XDR_FSYNC) { return rename(src.string().c_str(), dst.string().c_str()) == 0; } @@ -492,7 +492,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename, std::unique_ptr index) { ZoneScoped; - releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST); + releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST); std::lock_guard lock(mBucketMutex); if (mergeKey) @@ -566,7 +566,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename, void BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey) { - releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST); + releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST); // We _do_ want to remove the mergeKey from mLiveFutures, both so that that // map does not grow without bound and more importantly so that we drop the @@ -681,7 +681,7 @@ BucketManagerImpl::putMergeFuture( MergeKey const& key, std::shared_future> wp) { ZoneScoped; - releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST); + releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST); std::lock_guard lock(mBucketMutex); CLOG_TRACE( Bucket, @@ -704,7 +704,7 @@ BucketManagerImpl::getBucketListReferencedBuckets() const { ZoneScoped; std::set referenced; - if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST) + if (!mConfig.MODE_ENABLES_BUCKETLIST) { return referenced; } @@ -743,7 +743,7 @@ BucketManagerImpl::getAllReferencedBuckets() const { ZoneScoped; auto referenced = getBucketListReferencedBuckets(); - if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST) + if (!mConfig.MODE_ENABLES_BUCKETLIST) { return referenced; } @@ -788,7 +788,7 @@ void BucketManagerImpl::cleanupStaleFiles() { ZoneScoped; - if (mApp.getConfig().DISABLE_BUCKET_GC) + if (mConfig.DISABLE_BUCKET_GC) { return; } @@ -867,7 +867,7 @@ BucketManagerImpl::forgetUnreferencedBuckets() CLOG_TRACE(Bucket, "BucketManager::forgetUnreferencedBuckets dropping {}", filename); - if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC) + if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC) { CLOG_TRACE(Bucket, "removing bucket file: {}", filename); std::filesystem::remove(filename); @@ -974,7 +974,7 @@ BucketManagerImpl::snapshotLedger(LedgerHeader& currentHeader) { ZoneScoped; Hash hash; - if (mApp.getConfig().MODE_ENABLES_BUCKETLIST) + if (mConfig.MODE_ENABLES_BUCKETLIST) { hash = mBucketList->getHash(); } @@ -1005,25 +1005,29 @@ BucketManagerImpl::maybeSetIndex(std::shared_ptr b, void BucketManagerImpl::scanForEvictionLegacy(AbstractLedgerTxn& ltx, - uint32_t ledgerSeq) + uint32_t ledgerSeq, + SorobanNetworkConfig& networkConfig) { ZoneScoped; releaseAssert(protocolVersionStartsFrom(ltx.getHeader().ledgerVersion, SOROBAN_PROTOCOL_VERSION)); - mBucketList->scanForEvictionLegacy( - mApp, ltx, ledgerSeq, mBucketListEvictionCounters, mEvictionStatistics); + mBucketList->scanForEvictionLegacy(mApp, ltx, ledgerSeq, + mBucketListEvictionCounters, + mEvictionStatistics, networkConfig); } void -BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq) +BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq, + bool callFromLedgerClose) { - releaseAssert(mApp.getConfig().isUsingBucketListDB()); + releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose()); + releaseAssert(mConfig.isUsingBucketListDB()); releaseAssert(mSnapshotManager); releaseAssert(!mEvictionFuture.valid()); releaseAssert(mEvictionStatistics); auto searchableBL = mSnapshotManager->copySearchableBucketListSnapshot(); - auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfig(); + auto cfg = mApp.getLedgerManager().getSorobanNetworkConfig(); auto const& sas = cfg.stateArchivalSettings(); using task_t = std::packaged_task; @@ -1045,28 +1049,25 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq) void BucketManagerImpl::resolveBackgroundEvictionScan( AbstractLedgerTxn& ltx, uint32_t ledgerSeq, - LedgerKeySet const& modifiedKeys) + LedgerKeySet const& modifiedKeys, SorobanNetworkConfig& networkConfig) { ZoneScoped; - releaseAssert(threadIsMain()); + releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose()); releaseAssert(mEvictionStatistics); if (!mEvictionFuture.valid()) { - startBackgroundEvictionScan(ledgerSeq); + startBackgroundEvictionScan(ledgerSeq, false); } auto evictionCandidates = mEvictionFuture.get(); - auto const& networkConfig = - mApp.getLedgerManager().getSorobanNetworkConfig(); - // If eviction related settings changed during the ledger, we have to // restart the scan if (!evictionCandidates.isValid(ledgerSeq, networkConfig.stateArchivalSettings())) { - startBackgroundEvictionScan(ledgerSeq); + startBackgroundEvictionScan(ledgerSeq, false); evictionCandidates = mEvictionFuture.get(); } @@ -1176,7 +1177,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, uint32_t maxProtocolVersion, bool restartMerges) { ZoneScoped; - releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST); + releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST); for (uint32_t i = 0; i < BucketList::kNumLevels; ++i) { @@ -1203,7 +1204,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, // Buckets on the BucketList should always be indexed when // BucketListDB enabled - if (mApp.getConfig().isUsingBucketListDB()) + if (mConfig.isUsingBucketListDB()) { releaseAssert(curr->isEmpty() || curr->isIndexed()); releaseAssert(snap->isEmpty() || snap->isIndexed()); @@ -1328,7 +1329,7 @@ BucketManagerImpl::mergeBuckets(HistoryArchiveState const& has) BucketMetadata meta; MergeCounters mc; auto& ctx = mApp.getClock().getIOContext(); - meta.ledgerVersion = mApp.getConfig().LEDGER_PROTOCOL_VERSION; + meta.ledgerVersion = mConfig.LEDGER_PROTOCOL_VERSION; BucketOutputIterator out(getTmpDir(), /*keepDeadEntries=*/false, meta, mc, ctx, /*doFsync=*/true); for (auto const& pair : ledgerMap) @@ -1539,13 +1540,13 @@ BucketManagerImpl::scheduleVerifyReferencedBucketsWork() Config const& BucketManagerImpl::getConfig() const { - return mApp.getConfig(); + return mConfig; } std::shared_ptr BucketManagerImpl::getSearchableBucketListSnapshot() { - releaseAssert(mApp.getConfig().isUsingBucketListDB()); + releaseAssert(mConfig.isUsingBucketListDB()); // Any other threads must maintain their own snapshot releaseAssert(threadIsMain()); if (!mSearchableBucketListSnapshot) @@ -1560,7 +1561,7 @@ BucketManagerImpl::getSearchableBucketListSnapshot() void BucketManagerImpl::reportBucketEntryCountMetrics() { - if (!mApp.getConfig().isUsingBucketListDB()) + if (!mConfig.isUsingBucketListDB()) { return; } diff --git a/src/bucket/BucketManagerImpl.h b/src/bucket/BucketManagerImpl.h index 50b6479ede..a26a46cb4e 100644 --- a/src/bucket/BucketManagerImpl.h +++ b/src/bucket/BucketManagerImpl.h @@ -3,6 +3,7 @@ #include "bucket/BucketList.h" #include "bucket/BucketManager.h" #include "bucket/BucketMergeMap.h" +#include "ledger/NetworkConfig.h" #include "xdr/Stellar-ledger.h" #include @@ -41,6 +42,7 @@ class BucketManagerImpl : public BucketManager static std::string const kLockFilename; Application& mApp; + Config const mConfig; std::unique_ptr mBucketList; std::unique_ptr mSnapshotManager; std::unique_ptr mTmpDirManager; @@ -149,12 +151,14 @@ class BucketManagerImpl : public BucketManager void snapshotLedger(LedgerHeader& currentHeader) override; void maybeSetIndex(std::shared_ptr b, std::unique_ptr&& index) override; - void scanForEvictionLegacy(AbstractLedgerTxn& ltx, - uint32_t ledgerSeq) override; - void startBackgroundEvictionScan(uint32_t ledgerSeq) override; + void scanForEvictionLegacy(AbstractLedgerTxn& ltx, uint32_t ledgerSeq, + SorobanNetworkConfig& networkConfig) override; + void startBackgroundEvictionScan(uint32_t ledgerSeq, + bool callFromLedgerClose) override; void resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq, - LedgerKeySet const& modifiedKeys) override; + LedgerKeySet const& modifiedKeys, + SorobanNetworkConfig& networkConfig) override; medida::Meter& getBloomMissMeter() const override; medida::Meter& getBloomLookupMeter() const override; diff --git a/src/bucket/BucketSnapshotManager.cpp b/src/bucket/BucketSnapshotManager.cpp index 52f907307b..8d253b1386 100644 --- a/src/bucket/BucketSnapshotManager.cpp +++ b/src/bucket/BucketSnapshotManager.cpp @@ -111,7 +111,6 @@ BucketSnapshotManager::updateCurrentSnapshot( std::unique_ptr&& newSnapshot) { releaseAssert(newSnapshot); - releaseAssert(threadIsMain()); // Updating the BucketSnapshotManager canonical snapshot, must lock // exclusively for write access. diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index edf229d439..a3893937c3 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -706,7 +706,8 @@ TEST_CASE_VERSIONS("network config snapshots BucketList size", "[bucketlist]") for_versions_from(20, *app, [&] { LedgerManagerForBucketTests& lm = app->getLedgerManager(); - auto& networkConfig = app->getLedgerManager().getSorobanNetworkConfig(); + auto& networkConfig = + app->getLedgerManager().getMutableSorobanNetworkConfig(); uint32_t windowSize = networkConfig.stateArchivalSettings() .bucketListSizeWindowSampleSize; diff --git a/src/bucket/test/BucketTestUtils.cpp b/src/bucket/test/BucketTestUtils.cpp index ee4b959857..c0258da26f 100644 --- a/src/bucket/test/BucketTestUtils.cpp +++ b/src/bucket/test/BucketTestUtils.cpp @@ -170,12 +170,16 @@ LedgerManagerForBucketTests::transferLedgerEntriesToBucketList( if (mApp.getConfig().isUsingBackgroundEviction()) { mApp.getBucketManager().resolveBackgroundEvictionScan( - ltxEvictions, lh.ledgerSeq, keys); + ltxEvictions, lh.ledgerSeq, keys, + mApp.getLedgerManager() + .getMutableSorobanNetworkConfig()); } else { - mApp.getBucketManager().scanForEvictionLegacy(ltxEvictions, - lh.ledgerSeq); + mApp.getBucketManager().scanForEvictionLegacy( + ltxEvictions, lh.ledgerSeq, + mApp.getLedgerManager() + .getMutableSorobanNetworkConfig()); } if (ledgerCloseMeta) diff --git a/src/catchup/ApplyCheckpointWork.cpp b/src/catchup/ApplyCheckpointWork.cpp index 39180e9cdb..28ab85d1c3 100644 --- a/src/catchup/ApplyCheckpointWork.cpp +++ b/src/catchup/ApplyCheckpointWork.cpp @@ -154,7 +154,7 @@ ApplyCheckpointWork::getNextLedgerCloseData() auto& lm = mApp.getLedgerManager(); - auto const& lclHeader = lm.getLastClosedLedgerHeader(); + auto lclHeader = lm.getLastClosedLedgerHeader(); // If we are >1 before LCL, skip if (header.ledgerSeq + 1 < lclHeader.header.ledgerSeq) diff --git a/src/catchup/CatchupManagerImpl.cpp b/src/catchup/CatchupManagerImpl.cpp index 700a7bd889..8963368dee 100644 --- a/src/catchup/CatchupManagerImpl.cpp +++ b/src/catchup/CatchupManagerImpl.cpp @@ -476,24 +476,20 @@ CatchupManagerImpl::tryApplySyncingLedgers() break; } - if (mApp.getConfig() - .ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING.count() > 0) + if (mApp.getConfig().parallelLedgerClose()) { // Close a ledger asynchronously, with an added delay // Usefult to test async extrnalize flow - mApp.postOnMainThread( + mApp.postOnLedgerCloseThread( [&app = mApp, lcd]() { if (app.isStopping()) { return; } - std::this_thread::sleep_for( - app.getConfig() - .ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING); app.getLedgerManager().closeLedger(lcd, /* externalize */ true); }, - "closeLedger"); + "closeLedger queue"); } else { diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp index 2d06425863..e88bb24dc8 100644 --- a/src/catchup/CatchupWork.cpp +++ b/src/catchup/CatchupWork.cpp @@ -138,7 +138,7 @@ CatchupWork::doReset() mTransactionsVerifyApplySeq.reset(); mGetHistoryArchiveStateWork.reset(); mApplyBufferedLedgersWork.reset(); - auto const& lcl = mApp.getLedgerManager().getLastClosedLedgerHeader(); + auto lcl = mApp.getLedgerManager().getLastClosedLedgerHeader(); mLastClosedLedgerHashPair = LedgerNumHashPair( lcl.header.ledgerSeq, std::make_optional(lcl.hash)); mCatchupSeq.reset(); @@ -531,6 +531,7 @@ CatchupWork::runCatchupStep() // persistently available locally so it will return us to the // correct state. auto& ps = mApp.getPersistentState(); + mApp.getDatabase().clearPreparedStatementCache(); for (auto let : xdr::xdr_traits::enum_values()) { ps.clearRebuildForType(static_cast(let)); @@ -548,7 +549,7 @@ CatchupWork::runCatchupStep() // In this case we should actually have been caught-up during // the replay process and, if judged successful, our LCL should // be the one provided as well. - auto& lastClosed = + auto lastClosed = mApp.getLedgerManager().getLastClosedLedgerHeader(); releaseAssert(mLastApplied.hash == lastClosed.hash); releaseAssert(mLastApplied.header == lastClosed.header); diff --git a/src/catchup/DownloadApplyTxsWork.h b/src/catchup/DownloadApplyTxsWork.h index 3259fdacfe..d44637fe6d 100644 --- a/src/catchup/DownloadApplyTxsWork.h +++ b/src/catchup/DownloadApplyTxsWork.h @@ -25,7 +25,7 @@ class DownloadApplyTxsWork : public BatchWork { LedgerRange const mRange; TmpDir const& mDownloadDir; - LedgerHeaderHistoryEntry& mLastQueuedToApply; + LedgerHeaderHistoryEntry mLastQueuedToApply; uint32_t mCheckpointToQueue; std::shared_ptr mLastYieldedWork; bool const mWaitForPublish; diff --git a/src/database/Database.cpp b/src/database/Database.cpp index da5b363b2d..14042bc4b9 100644 --- a/src/database/Database.cpp +++ b/src/database/Database.cpp @@ -304,7 +304,7 @@ Database::populateMiscDatabase() void Database::applyMiscSchemaUpgrade(unsigned long vers) { - clearPreparedStatementCache(); + clearPreparedStatementCache(mMiscSession); soci::transaction tx(mMiscSession.session()); switch (vers) { @@ -345,7 +345,7 @@ dropMiscTablesFromMain(Application& app) void Database::applySchemaUpgrade(unsigned long vers) { - clearPreparedStatementCache(); + clearPreparedStatementCache(mSession); soci::transaction tx(mSession.session()); switch (vers) @@ -568,15 +568,29 @@ Database::canUseMiscDB() const void Database::clearPreparedStatementCache() +{ + for (auto& c : mCaches) + { + for (auto& st : c.second) + { + st.second->clean_up(true); + } + } + mCaches.clear(); + mStatementsSize.set_count(0); +} + +void +Database::clearPreparedStatementCache(SessionWrapper& session) { // Flush all prepared statements; in sqlite they represent open cursors // and will conflict with any DROP TABLE commands issued below - for (auto st : mStatements) + for (auto st : mCaches[session.getSessionName()]) { st.second->clean_up(true); + mStatementsSize.dec(); } - mStatements.clear(); - mStatementsSize.set_count(mStatements.size()); + mCaches.erase(session.getSessionName()); } void @@ -757,16 +771,22 @@ StatementContext Database::getPreparedStatement(std::string const& query, SessionWrapper& session) { - auto cacheKey = PrepStatementCacheKey(session.getSessionName(), query); - auto i = mStatements.find(cacheKey); + auto findCache = mCaches.find(session.getSessionName()); + if (findCache == mCaches.end()) + { + mCaches[session.getSessionName()] = PreparedStatementCache(); + } + + auto& cache = mCaches[session.getSessionName()]; + auto i = cache.find(query); std::shared_ptr p; - if (i == mStatements.end()) + if (i == findCache->second.end()) { p = std::make_shared(session.session()); p->alloc(); p->prepare(query); - mStatements.insert(std::make_pair(cacheKey, p)); - mStatementsSize.set_count(mStatements.size()); + cache.insert(std::make_pair(query, p)); + mStatementsSize.inc(); } else { diff --git a/src/database/Database.h b/src/database/Database.h index 907af57778..31b98c4e41 100644 --- a/src/database/Database.h +++ b/src/database/Database.h @@ -82,6 +82,11 @@ class SessionWrapper : NonCopyable : mSessionName(std::move(sessionName)) { } + SessionWrapper(std::string sessionName, soci::connection_pool& pool) + : mSession(pool), mSessionName(std::move(sessionName)) + { + } + soci::session& session() { @@ -140,10 +145,10 @@ class Database : NonMovableOrCopyable std::unique_ptr mMiscPool; // Cache key -> session name <> query - using PrepStatementCacheKey = std::pair; - std::unordered_map, - PairHash> - mStatements; + using PreparedStatementCache = + std::unordered_map>; + std::unordered_map mCaches; + medida::Counter& mStatementsSize; static bool gDriversRegistered; @@ -182,6 +187,7 @@ class Database : NonMovableOrCopyable // Purge all cached prepared statements, closing their handles with the // database. + void clearPreparedStatementCache(SessionWrapper& session); void clearPreparedStatementCache(); // Return metric-gathering timers for various families of SQL operation. diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index fa29e42358..6656f4e4e7 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -2104,7 +2104,7 @@ HerderImpl::maybeHandleUpgrade() // no-op on any earlier protocol return; } - auto const& conf = mApp.getLedgerManager().getSorobanNetworkConfig(); + auto conf = mApp.getLedgerManager().getSorobanNetworkConfig(); auto maybeNewMaxTxSize = conf.txMaxSizeBytes() + getFlowControlExtraBuffer(); @@ -2155,8 +2155,7 @@ HerderImpl::start() .header.ledgerVersion; if (protocolVersionStartsFrom(version, SOROBAN_PROTOCOL_VERSION)) { - auto const& conf = - mApp.getLedgerManager().getSorobanNetworkConfig(); + auto conf = mApp.getLedgerManager().getSorobanNetworkConfig(); mMaxTxSize = std::max(mMaxTxSize, conf.txMaxSizeBytes() + getFlowControlExtraBuffer()); } diff --git a/src/herder/LedgerCloseData.h b/src/herder/LedgerCloseData.h index 31f9192277..575e79233d 100644 --- a/src/herder/LedgerCloseData.h +++ b/src/herder/LedgerCloseData.h @@ -79,6 +79,8 @@ class LedgerCloseData private: uint32_t mLedgerSeq; + // TODO: confirm TxSet pointers don't get purged if ledger close is holding + // a strong reference to it TxSetXDRFrameConstPtr mTxSet; StellarValue mValue; std::optional mExpectedLedgerHash; diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index e4f951f458..9c30c10842 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -875,7 +875,7 @@ ApplicableTxSetFrame::checkValid(Application& app, uint64_t upperBoundCloseTimeOffset) const { ZoneScoped; - auto& lcl = app.getLedgerManager().getLastClosedLedgerHeader(); + auto lcl = app.getLedgerManager().getLastClosedLedgerHeader(); // Start by checking previousLedgerHash if (lcl.hash != mPreviousLedgerHash) diff --git a/src/herder/test/HerderTests.cpp b/src/herder/test/HerderTests.cpp index 1ec955200a..d8ff4d87ce 100644 --- a/src/herder/test/HerderTests.cpp +++ b/src/herder/test/HerderTests.cpp @@ -3126,10 +3126,10 @@ TEST_CASE("soroban txs each parameter surge priced", "[soroban][herder]") bool hadSorobanSurgePricing = false; simulation->crankUntil( [&]() { - auto& lclHeader = nodes[0] - ->getLedgerManager() - .getLastClosedLedgerHeader() - .header; + auto lclHeader = nodes[0] + ->getLedgerManager() + .getLastClosedLedgerHeader() + .header; auto txSet = nodes[0]->getHerder().getTxSet( lclHeader.scpValue.txSetHash); GeneralizedTransactionSet xdrTxSet; @@ -3343,14 +3343,34 @@ TEST_CASE("overlay parallel processing") { auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); - // Set threshold to 1 so all have to vote - auto simulation = - Topologies::core(4, 1, Simulation::OVER_TCP, networkID, [](int i) { - auto cfg = getTestConfig(i); - cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 100; - cfg.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING = true; - return cfg; - }); + std::shared_ptr simulation; + + SECTION("background traffic processing") + { + // Set threshold to 1 so all have to vote + simulation = + Topologies::core(4, 1, Simulation::OVER_TCP, networkID, [](int i) { + auto cfg = getTestConfig(i); + cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 100; + cfg.EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING = true; + return cfg; + }); + } + SECTION("background ledger close") + { + // Set threshold to 1 so all have to vote + simulation = + Topologies::core(4, 1, Simulation::OVER_TCP, networkID, [](int i) { + auto cfg = + getTestConfig(i, Config::TESTDB_BUCKET_DB_PERSISTENT); + cfg.TESTING_UPGRADE_MAX_TX_SET_SIZE = 100; + cfg.EXPERIMENTAL_PARALLEL_LEDGER_CLOSE = true; + cfg.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING = + std::chrono::seconds(3); + return cfg; + }); + } + simulation->startAllNodes(); auto nodes = simulation->getNodes(); uint32_t desiredTxRate = 1; @@ -3851,6 +3871,7 @@ herderExternalizesValuesWithProtocol(uint32_t version) Herder::ENVELOPE_STATUS_READY); REQUIRE(herder.recvSCPEnvelope(newMsgB.first, qset, newMsgB.second) == Herder::ENVELOPE_STATUS_READY); + simulation->crankForAtLeast(std::chrono::seconds(1), false); }; auto testOutOfOrder = [&](bool partial) { @@ -4664,8 +4685,6 @@ TEST_CASE("do not flood too many transactions", "[herder][transactionqueue]") cfg.FORCE_SCP = false; cfg.FLOOD_TX_PERIOD_MS = 100; cfg.FLOOD_OP_RATE_PER_LEDGER = 2.0; - cfg.ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING = - std::chrono::seconds(0); return cfg; }); diff --git a/src/herder/test/UpgradesTests.cpp b/src/herder/test/UpgradesTests.cpp index c1f86fb7ce..cd9a8f3d2e 100644 --- a/src/herder/test/UpgradesTests.cpp +++ b/src/herder/test/UpgradesTests.cpp @@ -253,7 +253,7 @@ makeBucketListSizeWindowSampleSizeTestUpgrade(Application& app, { // Modify window size auto sas = app.getLedgerManager() - .getSorobanNetworkConfig() + .getMutableSorobanNetworkConfig() .stateArchivalSettings(); sas.bucketListSizeWindowSampleSize = newWindowSize; @@ -839,7 +839,7 @@ TEST_CASE("config upgrades applied to ledger", "[soroban][upgrades]") executeUpgrade(*app, makeProtocolVersionUpgrade( static_cast(SOROBAN_PROTOCOL_VERSION))); auto const& sorobanConfig = - app->getLedgerManager().getSorobanNetworkConfig(); + app->getLedgerManager().getMutableSorobanNetworkConfig(); SECTION("unknown config upgrade set is ignored") { auto contractID = autocheck::generator()(5); @@ -907,7 +907,7 @@ TEST_CASE("config upgrades applied to ledger", "[soroban][upgrades]") auto const newSize = 20; populateValuesAndUpgradeSize(newSize); auto const& cfg2 = - app->getLedgerManager().getSorobanNetworkConfig(); + app->getLedgerManager().getMutableSorobanNetworkConfig(); // Verify that we popped the 10 oldest values auto sum = 0; @@ -929,7 +929,7 @@ TEST_CASE("config upgrades applied to ledger", "[soroban][upgrades]") auto const newSize = 40; populateValuesAndUpgradeSize(newSize); auto const& cfg2 = - app->getLedgerManager().getSorobanNetworkConfig(); + app->getLedgerManager().getMutableSorobanNetworkConfig(); // Verify that we backfill 10 copies of the oldest value auto sum = 0; @@ -959,7 +959,7 @@ TEST_CASE("config upgrades applied to ledger", "[soroban][upgrades]") LedgerTxn ltx2(app->getLedgerTxnRoot()); auto const& cfg = - app->getLedgerManager().getSorobanNetworkConfig(); + app->getLedgerManager().getMutableSorobanNetworkConfig(); initialSize = cfg.mStateArchivalSettings.bucketListSizeWindowSampleSize; initialWindow = cfg.mBucketListSizeSnapshots; @@ -974,7 +974,8 @@ TEST_CASE("config upgrades applied to ledger", "[soroban][upgrades]") REQUIRE(configUpgradeSet); executeUpgrade(*app, makeConfigUpgrade(*configUpgradeSet)); - auto const& cfg = app->getLedgerManager().getSorobanNetworkConfig(); + auto const& cfg = + app->getLedgerManager().getMutableSorobanNetworkConfig(); REQUIRE(cfg.mStateArchivalSettings.bucketListSizeWindowSampleSize == initialSize); REQUIRE(cfg.mBucketListSizeSnapshots == initialWindow); @@ -1088,7 +1089,7 @@ TEST_CASE("Soroban max tx set size upgrade applied to ledger", static_cast(SOROBAN_PROTOCOL_VERSION))); auto const& sorobanConfig = - app->getLedgerManager().getSorobanNetworkConfig(); + app->getLedgerManager().getMutableSorobanNetworkConfig(); executeUpgrade(*app, makeMaxSorobanTxSizeUpgrade(123)); REQUIRE(sorobanConfig.ledgerMaxTxCount() == 123); @@ -2246,7 +2247,8 @@ TEST_CASE("configuration initialized in version upgrade", "[upgrades]") InitialSorobanNetworkConfig::MAX_CONTRACT_SIZE); // Check that BucketList size window initialized with current BL size - auto& networkConfig = app->getLedgerManager().getSorobanNetworkConfig(); + auto networkConfig = + app->getLedgerManager().getMutableSorobanNetworkConfig(); REQUIRE(networkConfig.getAverageBucketListSize() == blSize); // Check in memory window diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index 8f4851e357..24a3879729 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -195,7 +195,7 @@ HistoryManagerImpl::dropSQLBasedPublish() firstLedgerInCheckpointContaining(lcl), freq, mCheckpointBuilder); } - db.clearPreparedStatementCache(); + db.clearPreparedStatementCache(sess); // Now it's safe to drop obsolete SQL tables sess.session() << "DROP TABLE IF EXISTS publishqueue;"; diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 4cd221efd5..e2ead4426b 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -532,6 +532,7 @@ CatchupSimulation::generateRandomLedger(uint32_t version) auto lastSucceeded = txsSucceeded.count(); lm.closeLedger(mLedgerCloseDatas.back()); + testutil::crankFor(getApp().getClock(), std::chrono::milliseconds(10)); if (check) { diff --git a/src/ledger/LedgerHeaderUtils.cpp b/src/ledger/LedgerHeaderUtils.cpp index 41eca7352e..952b0c34d2 100644 --- a/src/ledger/LedgerHeaderUtils.cpp +++ b/src/ledger/LedgerHeaderUtils.cpp @@ -190,10 +190,10 @@ loadBySequence(Database& db, soci::session& sess, uint32_t seq) } void -deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) +deleteOldEntries(soci::session& sess, uint32_t ledgerSeq, uint32_t count) { ZoneScoped; - DatabaseUtils::deleteOldEntriesHelper(db.getRawSession(), ledgerSeq, count, + DatabaseUtils::deleteOldEntriesHelper(sess, ledgerSeq, count, "ledgerheaders", "ledgerseq"); } diff --git a/src/ledger/LedgerHeaderUtils.h b/src/ledger/LedgerHeaderUtils.h index d67bed277d..ae50c082d7 100644 --- a/src/ledger/LedgerHeaderUtils.h +++ b/src/ledger/LedgerHeaderUtils.h @@ -30,7 +30,7 @@ std::shared_ptr loadBySequence(Database& db, soci::session& sess, uint32_t loadMaxLedgerSeq(Database& db); -void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count); +void deleteOldEntries(soci::session& sess, uint32_t ledgerSeq, uint32_t count); size_t copyToStream(Database& db, soci::session& sess, uint32_t ledgerSeq, uint32_t ledgerCount, CheckpointBuilder& checkpointBuilder); diff --git a/src/ledger/LedgerManager.h b/src/ledger/LedgerManager.h index 3a9d4b7cc1..bb8691265c 100644 --- a/src/ledger/LedgerManager.h +++ b/src/ledger/LedgerManager.h @@ -94,8 +94,7 @@ class LedgerManager bool isLatestSlot) = 0; // Return the LCL header and (complete, immutable) hash. - virtual LedgerHeaderHistoryEntry const& - getLastClosedLedgerHeader() const = 0; + virtual LedgerHeaderHistoryEntry getLastClosedLedgerHeader() const = 0; // return the HAS that corresponds to the last closed ledger as persisted in // the database @@ -129,7 +128,8 @@ class LedgerManager // The config is automatically refreshed on protocol upgrades. // Ledger txn here is needed for the sake of lazy load; it won't be // used most of the time. - virtual SorobanNetworkConfig const& getSorobanNetworkConfig() = 0; + // Return a copy for thread-safety + virtual SorobanNetworkConfig getSorobanNetworkConfig() = 0; virtual bool hasSorobanNetworkConfig() const = 0; #ifdef BUILD_TESTS @@ -203,5 +203,7 @@ class LedgerManager virtual ~LedgerManager() { } + + virtual void shutdown() = 0; }; } diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index eece56d3a8..d8add571d8 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -342,6 +342,7 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist, { // In no-history mode, this method should only be called when // the LCL is genesis. + std::lock_guard lock(mLedgerStateMutex); releaseAssertOrThrow(mLastClosedLedger.hash == lastLedgerHash); releaseAssertOrThrow(mLastClosedLedger.header.ledgerSeq == GENESIS_LEDGER_SEQ); @@ -458,12 +459,15 @@ LedgerManagerImpl::getDatabase() uint32_t LedgerManagerImpl::getLastMaxTxSetSize() const { + std::lock_guard guard(mLedgerStateMutex); return mLastClosedLedger.header.maxTxSetSize; } uint32_t LedgerManagerImpl::getLastMaxTxSetSizeOps() const { + std::lock_guard guard(mLedgerStateMutex); + auto n = mLastClosedLedger.header.maxTxSetSize; return protocolVersionStartsFrom(mLastClosedLedger.header.ledgerVersion, ProtocolVersion::V_11) @@ -500,7 +504,7 @@ LedgerManagerImpl::maxSorobanTransactionResources() { ZoneScoped; - auto const& conf = mApp.getLedgerManager().getSorobanNetworkConfig(); + auto conf = mApp.getLedgerManager().getSorobanNetworkConfig(); int64_t const opCount = 1; std::vector limits = {opCount, conf.txMaxInstructions(), @@ -515,6 +519,8 @@ LedgerManagerImpl::maxSorobanTransactionResources() int64_t LedgerManagerImpl::getLastMinBalance(uint32_t ownerCount) const { + std::lock_guard guard(mLedgerStateMutex); + auto const& lh = mLastClosedLedger.header; if (protocolVersionIsBefore(lh.ledgerVersion, ProtocolVersion::V_9)) return (2 + ownerCount) * lh.baseReserve; @@ -525,18 +531,23 @@ LedgerManagerImpl::getLastMinBalance(uint32_t ownerCount) const uint32_t LedgerManagerImpl::getLastReserve() const { + std::lock_guard guard(mLedgerStateMutex); + return mLastClosedLedger.header.baseReserve; } uint32_t LedgerManagerImpl::getLastTxFee() const { + std::lock_guard guard(mLedgerStateMutex); + return mLastClosedLedger.header.baseFee; } -LedgerHeaderHistoryEntry const& +LedgerHeaderHistoryEntry LedgerManagerImpl::getLastClosedLedgerHeader() const { + std::lock_guard guard(mLedgerStateMutex); return mLastClosedLedger; } @@ -555,18 +566,22 @@ LedgerManagerImpl::getLastClosedLedgerHAS() uint32_t LedgerManagerImpl::getLastClosedLedgerNum() const { + std::lock_guard guard(mLedgerStateMutex); + return mLastClosedLedger.header.ledgerSeq; } -SorobanNetworkConfig const& +SorobanNetworkConfig LedgerManagerImpl::getSorobanNetworkConfig() { + std::lock_guard guard(mLedgerStateMutex); return *mSorobanNetworkConfig; } bool LedgerManagerImpl::hasSorobanNetworkConfig() const { + std::lock_guard guard(mLedgerStateMutex); return mSorobanNetworkConfig.has_value(); } @@ -592,6 +607,8 @@ LedgerManagerImpl::getSorobanMetrics() void LedgerManagerImpl::publishSorobanMetrics() { + std::lock_guard guard(mLedgerStateMutex); + releaseAssert(mSorobanNetworkConfig); // first publish the network config limits mSorobanMetrics.mConfigContractDataKeySizeBytes.set_count( @@ -674,7 +691,7 @@ LedgerManagerImpl::valueExternalized(LedgerCloseData const& ledgerData, { CLOG_INFO(Ledger, "Can't close ledger: {} in LM because catchup is running", - ledgerAbbrev(mLastClosedLedger)); + ledgerAbbrev(getLastClosedLedgerHeader())); return; } @@ -684,9 +701,10 @@ LedgerManagerImpl::valueExternalized(LedgerCloseData const& ledgerData, if (mState != LM_CATCHING_UP_STATE) { // Out of sync, buffer what we just heard and start catchup. - CLOG_INFO( - Ledger, "Lost sync, local LCL is {}, network closed ledger {}", - mLastClosedLedger.header.ledgerSeq, ledgerData.getLedgerSeq()); + CLOG_INFO(Ledger, + "Lost sync, local LCL is {}, network closed ledger {}", + getLastClosedLedgerHeader().header.ledgerSeq, + ledgerData.getLedgerSeq()); } setState(LM_CATCHING_UP_STATE); @@ -781,6 +799,14 @@ maybeSimulateSleep(Config const& cfg, size_t opSize, } } +asio::io_context& +getMetaIOContext(Application& app) +{ + return app.getConfig().parallelLedgerClose() + ? app.getLedgerCloseIOContext() + : app.getClock().getIOContext(); +} + void ledgerCloseComplete(Application& app, uint32_t lcl, bool externalize, LedgerCloseData const& ledgerData) @@ -815,6 +841,14 @@ void LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, bool externalize) { + // Don't close new ledgers if app is shutting down + if (mApp.isStopping()) + { + return; + } + + mClosing.store(true); + #ifdef BUILD_TESTS mLastLedgerTxMeta.clear(); #endif @@ -828,7 +862,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, auto header = ltx.loadHeader(); auto initialLedgerVers = header.current().ledgerVersion; ++header.current().ledgerSeq; - header.current().previousLedgerHash = mLastClosedLedger.hash; + header.current().previousLedgerHash = getLastClosedLedgerHeader().hash; CLOG_DEBUG(Ledger, "starting closeLedger() on ledgerSeq={}", header.current().ledgerSeq); @@ -836,6 +870,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, auto now = mApp.getClock().now(); mLedgerAgeClosed.Update(now - mLastClose); + // mLastClose is only accessed by a single thread mLastClose = now; mLedgerAge.set_count(0); @@ -1003,9 +1038,10 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, } ledgerClosed(ltx, ledgerCloseMeta, initialLedgerVers); + auto lcl = getLastClosedLedgerHeader(); if (ledgerData.getExpectedHash() && - *ledgerData.getExpectedHash() != mLastClosedLedger.hash) + *ledgerData.getExpectedHash() != lcl.hash) { throw std::runtime_error("Local node's ledger corrupted during close"); } @@ -1013,7 +1049,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, if (mMetaStream || mMetaDebugStream) { releaseAssert(ledgerCloseMeta); - ledgerCloseMeta->ledgerHeader() = mLastClosedLedger; + ledgerCloseMeta->ledgerHeader() = lcl; // At this point we've got a complete meta and we can store it to the // member variable: if we throw while committing below, we will at worst @@ -1029,82 +1065,84 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, } } - // The next 5 steps happen in a relatively non-obvious, subtle order. - // This is unfortunate and it would be nice if we could make it not - // be so subtle, but for the time being this is where we are. - // - // 1. Queue any history-checkpoint to the database, _within_ the current - // transaction. This way if there's a crash after commit and before - // we've published successfully, we'll re-publish on restart. - // - // 2. Commit the current transaction. - // - // 3. Finalize any new checkpoint files _after_ the commit. If a crash - // occurs - // between commit and this step, core will attempt finalizing files again - // on restart. - // - // 4. Start any queued checkpoint publishing, _after_ the commit so that - // it takes its snapshot of history-rows from the committed state, but - // _before_ we GC any buckets (because this is the step where the - // bucket refcounts are incremented for the duration of the publish). - // - // 5. Start background eviction scan for the next ledger, _after_ the commit - // so that it takes its snapshot of network setting from the - // committed state. - // - // 6. GC unreferenced buckets. Only do this once publishes are in progress. + // TODO: add documentation of the new flow - // step 1 auto& hm = mApp.getHistoryManager(); hm.maybeQueueHistoryCheckpoint(); - // step 2 ltx.commit(); + std::chrono::duration ledgerTimeSeconds = ledgerTime.Stop(); + CLOG_INFO(Perf, "Applied ledger {} in {} seconds", ledgerSeq, + ledgerTimeSeconds.count()); + #ifdef BUILD_TESTS mLatestTxResultSet = txResultSet; #endif - // step 3 - hm.maybeCheckpointComplete(); - - // step 4 - hm.publishQueuedHistory(); - hm.logAndUpdatePublishStatus(); - - // step 5 if (protocolVersionStartsFrom(initialLedgerVers, SOROBAN_PROTOCOL_VERSION) && mApp.getConfig().isUsingBackgroundEviction()) { - mApp.getBucketManager().startBackgroundEvictionScan(ledgerSeq + 1); + mApp.getBucketManager().startBackgroundEvictionScan(ledgerSeq + 1, + true); } - // step 6 - mApp.getBucketManager().forgetUnreferencedBuckets(); + auto completionHandler = [this, initialLedgerVers, txs, ledgerSeq, + externalize, ledgerData, + clt = std::move(closeLedgerTime)]() mutable { + auto& hm = mApp.getHistoryManager(); + hm.maybeCheckpointComplete(); - maybeSimulateSleep(mApp.getConfig(), txs.size(), closeLedgerTime); + hm.publishQueuedHistory(); + hm.logAndUpdatePublishStatus(); - std::chrono::duration ledgerTimeSeconds = ledgerTime.Stop(); - CLOG_DEBUG(Perf, "Applied ledger in {} seconds", ledgerTimeSeconds.count()); + mApp.getBucketManager().forgetUnreferencedBuckets(); + + maybeSimulateSleep(mApp.getConfig(), txs.size(), clt); - ledgerCloseComplete(mApp, ledgerSeq, externalize, ledgerData); + ledgerCloseComplete(mApp, ledgerSeq, externalize, ledgerData); + CLOG_INFO(Ledger, "Closed ledger: {}", ledgerSeq); + }; + if (threadIsMain()) + { + completionHandler(); + } + else + { + mApp.postOnMainThread(completionHandler, "ledgerCloseComplete"); + } + mClosing.store(false); FrameMark; } +void +LedgerManagerImpl::shutdown() +{ + // Wait until current ledger is done closing + while (mClosing.load()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + void LedgerManagerImpl::deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) { ZoneScoped; - soci::transaction txscope(db.getRawSession()); - db.clearPreparedStatementCache(); - LedgerHeaderUtils::deleteOldEntries(db, ledgerSeq, count); - HerderPersistence::deleteOldEntries(db, ledgerSeq, count); - db.clearPreparedStatementCache(); - txscope.commit(); + if (mApp.getConfig().parallelLedgerClose()) + { + auto session = + std::make_unique(mApp.getDatabase().getPool()); + LedgerHeaderUtils::deleteOldEntries(*session, ledgerSeq, count); + } + else + { + LedgerHeaderUtils::deleteOldEntries(db.getRawSession(), ledgerSeq, + count); + } } void @@ -1155,9 +1193,9 @@ LedgerManagerImpl::setupLedgerCloseMetaStream() { // We can't be sure we're writing to a stream that supports fsync; // pipes typically error when you try. So we don't do it. - mMetaStream = std::make_unique( - mApp.getClock().getIOContext(), - /*fsyncOnClose=*/false); + mMetaStream = + std::make_unique(getMetaIOContext(mApp), + /*fsyncOnClose=*/false); std::regex fdrx("^fd:([0-9]+)$"); std::smatch sm; if (std::regex_match(cfg.METADATA_OUTPUT_STREAM, sm, fdrx)) @@ -1222,9 +1260,9 @@ LedgerManagerImpl::maybeResetLedgerCloseMetaDebugStream(uint32_t ledgerSeq) // such stream or a replacement for the one we just handed off to // flush-and-rotate. Either way, we should not have an existing one! releaseAssert(!mMetaDebugStream); - auto tmpStream = std::make_unique( - mApp.getClock().getIOContext(), - /*fsyncOnClose=*/true); + auto tmpStream = + std::make_unique(getMetaIOContext(mApp), + /*fsyncOnClose=*/true); auto metaDebugPath = metautils::getMetaDebugFilePath( mApp.getBucketManager().getBucketDir(), ledgerSeq); @@ -1275,6 +1313,7 @@ void LedgerManagerImpl::advanceLedgerPointers(LedgerHeader const& header, bool debugLog) { + std::lock_guard guard(mLedgerStateMutex); auto ledgerHash = xdrSha256(header); if (debugLog) @@ -1302,6 +1341,7 @@ void LedgerManagerImpl::updateNetworkConfig(AbstractLedgerTxn& rootLtx) { ZoneScoped; + std::lock_guard guard(mLedgerStateMutex); uint32_t ledgerVersion = rootLtx.loadHeader().current().ledgerVersion; @@ -1662,6 +1702,7 @@ LedgerManagerImpl::transferLedgerEntriesToBucketList( if (blEnabled && protocolVersionStartsFrom(initialLedgerVers, SOROBAN_PROTOCOL_VERSION)) { + std::lock_guard guard(mLedgerStateMutex); { auto keys = ltx.getAllTTLKeysWithoutSealing(); LedgerTxn ltxEvictions(ltx); @@ -1669,12 +1710,12 @@ LedgerManagerImpl::transferLedgerEntriesToBucketList( if (mApp.getConfig().isUsingBackgroundEviction()) { mApp.getBucketManager().resolveBackgroundEvictionScan( - ltxEvictions, lh.ledgerSeq, keys); + ltxEvictions, lh.ledgerSeq, keys, *mSorobanNetworkConfig); } else { - mApp.getBucketManager().scanForEvictionLegacy(ltxEvictions, - lh.ledgerSeq); + mApp.getBucketManager().scanForEvictionLegacy( + ltxEvictions, lh.ledgerSeq, *mSorobanNetworkConfig); } if (ledgerCloseMeta) diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index 1905a4741c..cddbcbb929 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -69,6 +69,10 @@ class LedgerManagerImpl : public LedgerManager VirtualClock::time_point mLastClose; bool mRebuildInMemoryState{false}; + // Use mutex to guard read access to LCL and Soroban network config + mutable std::recursive_mutex mLedgerStateMutex; + std::atomic mClosing{false}; + std::unique_ptr mStartCatchup; medida::Timer& mCatchupDuration; @@ -160,7 +164,7 @@ class LedgerManagerImpl : public LedgerManager uint32_t getLastReserve() const override; uint32_t getLastTxFee() const override; uint32_t getLastClosedLedgerNum() const override; - SorobanNetworkConfig const& getSorobanNetworkConfig() override; + SorobanNetworkConfig getSorobanNetworkConfig() override; bool hasSorobanNetworkConfig() const override; #ifdef BUILD_TESTS @@ -180,7 +184,8 @@ class LedgerManagerImpl : public LedgerManager virtual bool rebuildingInMemoryState() override; virtual void setupInMemoryStateRebuild() override; - LedgerHeaderHistoryEntry const& getLastClosedLedgerHeader() const override; + // TODO: fix if this shows up in profilers (switch to thread-safe copy) + LedgerHeaderHistoryEntry getLastClosedLedgerHeader() const override; HistoryArchiveState getLastClosedLedgerHAS() override; @@ -205,5 +210,7 @@ class LedgerManagerImpl : public LedgerManager void maybeResetLedgerCloseMetaDebugStream(uint32_t ledgerSeq); SorobanMetrics& getSorobanMetrics() override; + + void shutdown() override; }; } diff --git a/src/ledger/LedgerTxn.cpp b/src/ledger/LedgerTxn.cpp index 90d51a42cb..5bb025191a 100644 --- a/src/ledger/LedgerTxn.cpp +++ b/src/ledger/LedgerTxn.cpp @@ -2554,9 +2554,10 @@ LedgerTxnRoot::Impl::~Impl() SessionWrapper& LedgerTxnRoot::Impl::getSession() const { - // For now, return main app-wide session; - // When application is done in parallel, mSession will be set to a session - // established from the connection pool. + if (mSession) + { + return *mSession; + } return mApp.getDatabase().getSession(); } @@ -2597,10 +2598,10 @@ LedgerTxnRoot::Impl::addChild(AbstractLedgerTxn& child, TransactionMode mode) if (mode == TransactionMode::READ_WRITE_WITH_SQL_TXN) { - if (mApp.getConfig().EXPERIMENTAL_PARALLEL_LEDGER_CLOSE) + if (mApp.getConfig().parallelLedgerClose()) { - mSession = - std::make_unique(mApp.getDatabase().getPool()); + mSession = std::make_unique( + "ledgerClose", mApp.getDatabase().getPool()); } mTransaction = std::make_unique(getSession().session()); @@ -2872,7 +2873,7 @@ LedgerTxnRoot::Impl::commitChild(EntryIterator iter, // committing; on postgres this doesn't matter but on SQLite the passive // WAL-auto-checkpointing-at-commit behaviour will starve if there are // still prepared statements open at commit time. - mApp.getDatabase().clearPreparedStatementCache(); + mApp.getDatabase().clearPreparedStatementCache(getSession()); ZoneNamedN(commitZone, "SOCI commit", true); mTransaction->commit(); } diff --git a/src/ledger/LedgerTxnImpl.h b/src/ledger/LedgerTxnImpl.h index 72ee43f24a..6106f6a716 100644 --- a/src/ledger/LedgerTxnImpl.h +++ b/src/ledger/LedgerTxnImpl.h @@ -732,7 +732,7 @@ class LedgerTxnRoot::Impl size_t const mMaxBestOffersBatchSize; Application& mApp; - std::shared_ptr mSession; + std::unique_ptr mSession; std::unique_ptr mHeader; mutable EntryCache mEntryCache; diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index a5953fc6ea..5284d7ad5c 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -45,24 +45,21 @@ AppConnector::getBanManager() return mApp.getBanManager(); } -SorobanNetworkConfig const& +SorobanNetworkConfig AppConnector::getSorobanNetworkConfig() const { - releaseAssert(threadIsMain()); return mApp.getLedgerManager().getSorobanNetworkConfig(); } medida::MetricsRegistry& AppConnector::getMetrics() const { - releaseAssert(threadIsMain()); return mApp.getMetrics(); } SorobanMetrics& AppConnector::getSorobanMetrics() const { - releaseAssert(threadIsMain()); return mApp.getLedgerManager().getSorobanMetrics(); } @@ -71,7 +68,6 @@ AppConnector::checkOnOperationApply(Operation const& operation, OperationResult const& opres, LedgerTxnDelta const& ltxDelta) { - releaseAssert(threadIsMain()); mApp.getInvariantManager().checkOnOperationApply(operation, opres, ltxDelta); } @@ -79,7 +75,6 @@ AppConnector::checkOnOperationApply(Operation const& operation, Hash const& AppConnector::getNetworkID() const { - releaseAssert(threadIsMain()); return mApp.getNetworkID(); } @@ -129,4 +124,10 @@ AppConnector::getOverlayMetrics() return mApp.getOverlayManager().getOverlayMetrics(); } +LedgerHeaderHistoryEntry +AppConnector::getLastClosedLedgerHeader() const +{ + return mApp.getLedgerManager().getLastClosedLedgerHeader(); +} + } \ No newline at end of file diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 6e6d19ce52..130603005e 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -33,8 +33,6 @@ class AppConnector OverlayManager& getOverlayManager(); BanManager& getBanManager(); bool shouldYield() const; - SorobanNetworkConfig const& getSorobanNetworkConfig() const; - medida::MetricsRegistry& getMetrics() const; SorobanMetrics& getSorobanMetrics() const; void checkOnOperationApply(Operation const& operation, OperationResult const& opres, @@ -51,5 +49,8 @@ class AppConnector Config const& getConfig() const; bool overlayShuttingDown() const; OverlayMetrics& getOverlayMetrics(); + SorobanNetworkConfig getSorobanNetworkConfig() const; + medida::MetricsRegistry& getMetrics() const; + LedgerHeaderHistoryEntry getLastClosedLedgerHeader() const; }; } \ No newline at end of file diff --git a/src/main/Application.h b/src/main/Application.h index 0e5bac078f..467b07a6d2 100644 --- a/src/main/Application.h +++ b/src/main/Application.h @@ -229,6 +229,7 @@ class Application virtual asio::io_context& getWorkerIOContext() = 0; virtual asio::io_context& getEvictionIOContext() = 0; virtual asio::io_context& getOverlayIOContext() = 0; + virtual asio::io_context& getLedgerCloseIOContext() = 0; virtual void postOnMainThread( std::function&& f, std::string&& name, @@ -242,6 +243,8 @@ class Application std::string jobName) = 0; virtual void postOnOverlayThread(std::function&& f, std::string jobName) = 0; + virtual void postOnLedgerCloseThread(std::function&& f, + std::string jobName) = 0; // Perform actions necessary to transition from BOOTING_STATE to other // states. In particular: either reload or reinitialize the database, and diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 18c750b182..2d2ae3cf45 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -102,6 +102,13 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) , mOverlayWork(mOverlayIOContext ? std::make_unique( *mOverlayIOContext) : nullptr) + , mLedgerCloseIOContext(mConfig.parallelLedgerClose() + ? std::make_unique(1) + : nullptr) + , mLedgerCloseWork( + mLedgerCloseIOContext + ? std::make_unique(*mLedgerCloseIOContext) + : nullptr) , mWorkerThreads() , mEvictionThread() , mStopSignals(clock.getIOContext(), SIGINT) @@ -117,6 +124,8 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) mMetrics->NewTimer({"app", "post-on-background-thread", "delay"})) , mPostOnOverlayThreadDelay( mMetrics->NewTimer({"app", "post-on-overlay-thread", "delay"})) + , mPostOnLedgerCloseThreadDelay( + mMetrics->NewTimer({"app", "post-on-ledger-close-thread", "delay"})) , mStartedOn(clock.system_now()) { #ifdef SIGQUIT @@ -186,6 +195,12 @@ ApplicationImpl::ApplicationImpl(VirtualClock& clock, Config const& cfg) // Keep priority unchanged as overlay processes time-sensitive tasks mOverlayThread = std::thread{[this]() { mOverlayIOContext->run(); }}; } + + if (mConfig.parallelLedgerClose()) + { + mLedgerCloseThread = + std::thread{[this]() { mLedgerCloseIOContext->run(); }}; + } } static void @@ -296,6 +311,7 @@ maybeRebuildLedger(Application& app, bool applyBuckets) } } + app.getDatabase().clearPreparedStatementCache(); for (auto let : toRebuild) { ps.clearRebuildForType(let); @@ -507,7 +523,7 @@ ApplicationImpl::getJsonInfo(bool verbose) info["protocol_version"] = getConfig().LEDGER_PROTOCOL_VERSION; info["state"] = getStateHuman(); info["startedOn"] = VirtualClock::systemPointToISOString(mStartedOn); - auto const& lcl = lm.getLastClosedLedgerHeader(); + auto lcl = lm.getLastClosedLedgerHeader(); info["ledger"]["num"] = (int)lcl.header.ledgerSeq; info["ledger"]["hash"] = binToHex(lcl.hash); info["ledger"]["closeTime"] = (Json::UInt64)lcl.header.scpValue.closeTime; @@ -669,8 +685,16 @@ ApplicationImpl::getNetworkID() const ApplicationImpl::~ApplicationImpl() { LOG_INFO(DEFAULT_LOG, "Application destructing"); + mStopping = true; try { + // Drain the ledger close queue + // TODO: add a comment about shutdown logic - we're not tearing down + // work, but rather gracefully letting it finish + if (mLedgerManager) + { + mLedgerManager->shutdown(); + } shutdownWorkScheduler(); if (mProcessManager) { @@ -969,6 +993,10 @@ ApplicationImpl::gracefulStop() return; } mStopping = true; + if (mLedgerManager) + { + mLedgerManager->shutdown(); + } if (mOverlayManager) { mOverlayManager->shutdown(); @@ -1029,22 +1057,19 @@ ApplicationImpl::joinAllThreads() { mOverlayWork.reset(); } - - LOG_INFO(DEFAULT_LOG, "Joining {} worker threads", mWorkerThreads.size()); - for (auto& w : mWorkerThreads) + if (mLedgerCloseWork) { - w.join(); + mLedgerCloseWork.reset(); } - if (mEvictionWork) { mEvictionWork.reset(); } - if (mEvictionThread) + LOG_INFO(DEFAULT_LOG, "Joining {} worker threads", mWorkerThreads.size()); + for (auto& w : mWorkerThreads) { - LOG_INFO(DEFAULT_LOG, "Joining eviction thread"); - mEvictionThread->join(); + w.join(); } if (mOverlayThread) @@ -1053,6 +1078,18 @@ ApplicationImpl::joinAllThreads() mOverlayThread->join(); } + if (mLedgerCloseThread) + { + LOG_INFO(DEFAULT_LOG, "Joining the ledger close thread"); + mLedgerCloseThread->join(); + } + + if (mEvictionThread) + { + LOG_INFO(DEFAULT_LOG, "Joining eviction thread"); + mEvictionThread->join(); + } + LOG_INFO(DEFAULT_LOG, "Joined all {} threads", (mWorkerThreads.size() + 1)); } @@ -1541,6 +1578,13 @@ ApplicationImpl::getOverlayIOContext() return *mOverlayIOContext; } +asio::io_context& +ApplicationImpl::getLedgerCloseIOContext() +{ + releaseAssert(mLedgerCloseIOContext); + return *mLedgerCloseIOContext; +} + void ApplicationImpl::postOnMainThread(std::function&& f, std::string&& name, Scheduler::ActionType type) @@ -1598,6 +1642,19 @@ ApplicationImpl::postOnOverlayThread(std::function&& f, }); } +void +ApplicationImpl::postOnLedgerCloseThread(std::function&& f, + std::string jobName) +{ + releaseAssert(mLedgerCloseIOContext); + LogSlowExecution isSlow{std::move(jobName), LogSlowExecution::Mode::MANUAL, + "executed after"}; + asio::post(*mLedgerCloseIOContext, [this, f = std::move(f), isSlow]() { + mPostOnLedgerCloseThreadDelay.Update(isSlow.checkElapsedTime()); + f(); + }); +} + void ApplicationImpl::enableInvariantsFromConfig() { @@ -1640,7 +1697,6 @@ ApplicationImpl::createDatabase() AbstractLedgerTxnParent& ApplicationImpl::getLedgerTxnRoot() { - releaseAssert(threadIsMain()); return mConfig.MODE_USES_IN_MEMORY_LEDGER ? *mNeverCommittingLedgerTxn : *mLedgerTxnRoot; } diff --git a/src/main/ApplicationImpl.h b/src/main/ApplicationImpl.h index a7553214f9..827bdcaa42 100644 --- a/src/main/ApplicationImpl.h +++ b/src/main/ApplicationImpl.h @@ -82,6 +82,7 @@ class ApplicationImpl : public Application virtual asio::io_context& getWorkerIOContext() override; virtual asio::io_context& getEvictionIOContext() override; virtual asio::io_context& getOverlayIOContext() override; + virtual asio::io_context& getLedgerCloseIOContext() override; virtual void postOnMainThread(std::function&& f, std::string&& name, Scheduler::ActionType type) override; @@ -92,6 +93,8 @@ class ApplicationImpl : public Application virtual void postOnOverlayThread(std::function&& f, std::string jobName) override; + virtual void postOnLedgerCloseThread(std::function&& f, + std::string jobName) override; virtual void start() override; void startServices(); @@ -160,6 +163,9 @@ class ApplicationImpl : public Application std::unique_ptr mOverlayIOContext; std::unique_ptr mOverlayWork; + std::unique_ptr mLedgerCloseIOContext; + std::unique_ptr mLedgerCloseWork; + std::unique_ptr mBucketManager; std::unique_ptr mDatabase; std::unique_ptr mOverlayManager; @@ -205,6 +211,7 @@ class ApplicationImpl : public Application std::vector mWorkerThreads; std::optional mOverlayThread; + std::optional mLedgerCloseThread; // Unlike mWorkerThreads (which are low priority), eviction scans require a // medium priority thread. In the future, this may become a more general @@ -215,7 +222,7 @@ class ApplicationImpl : public Application asio::signal_set mStopSignals; bool mStarted; - bool mStopping; + std::atomic mStopping; VirtualTimer mStoppingTimer; VirtualTimer mSelfCheckTimer; @@ -224,6 +231,7 @@ class ApplicationImpl : public Application medida::Timer& mPostOnMainThreadDelay; medida::Timer& mPostOnBackgroundThreadDelay; medida::Timer& mPostOnOverlayThreadDelay; + medida::Timer& mPostOnLedgerCloseThreadDelay; VirtualClock::system_time_point mStartedOn; diff --git a/src/main/ApplicationUtils.cpp b/src/main/ApplicationUtils.cpp index 376012804f..a222dc89e4 100644 --- a/src/main/ApplicationUtils.cpp +++ b/src/main/ApplicationUtils.cpp @@ -430,7 +430,7 @@ setAuthenticatedLedgerHashPair(Application::pointer app, if (lm.isSynced()) { - auto const& lhe = lm.getLastClosedLedgerHeader(); + auto lhe = lm.getLastClosedLedgerHeader(); tryCheckpoint(lhe.header.ledgerSeq, lhe.hash); } else diff --git a/src/main/Config.cpp b/src/main/Config.cpp index fb72c1215b..190d39c4d3 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -1638,6 +1638,12 @@ Config::processConfig(std::shared_ptr t) } } + if (EXPERIMENTAL_PARALLEL_LEDGER_CLOSE && DEPRECATED_SQL_LEDGER_STATE) + { + throw std::invalid_argument( + "EXPERIMENTAL_PARALLEL_LEDGER_CLOSE requires BucketListDB"); + } + if (!OP_APPLY_SLEEP_TIME_DURATION_FOR_TESTING.empty() || !OP_APPLY_SLEEP_TIME_WEIGHT_FOR_TESTING.empty()) { @@ -2334,6 +2340,13 @@ Config::modeStoresAnyHistory() const return MODE_STORES_HISTORY_LEDGERHEADERS || MODE_STORES_HISTORY_MISC; } +bool +Config::parallelLedgerClose() const +{ + return isUsingBucketListDB() && EXPERIMENTAL_PARALLEL_LEDGER_CLOSE && + !(DATABASE.value == ("sqlite3://:memory:")); +} + void Config::setNoListen() { diff --git a/src/main/Config.h b/src/main/Config.h index c19d145036..a3f3c3ef54 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -262,6 +262,9 @@ class Config : public std::enable_shared_from_this // This config should only be enabled when testing. std::chrono::microseconds ARTIFICIALLY_SLEEP_MAIN_THREAD_FOR_TESTING; + // A config parameter that forces stellar-core to sleep every time it closes + // a ledger if order to simulate slow application. This config should only + // be enabled when testing. std::chrono::milliseconds ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING; // Timeout before publishing externalized values to archive @@ -744,6 +747,7 @@ class Config : public std::enable_shared_from_this bool isPersistingBucketListDBIndexes() const; bool modeStoresAllHistory() const; bool modeStoresAnyHistory() const; + bool parallelLedgerClose() const; void logBasicInfo(); void setNoListen(); void setNoPublish(); diff --git a/src/main/ExternalQueue.cpp b/src/main/ExternalQueue.cpp index 511b38f95a..78429ff030 100644 --- a/src/main/ExternalQueue.cpp +++ b/src/main/ExternalQueue.cpp @@ -6,6 +6,7 @@ #include "Application.h" #include "database/Database.h" +#include "herder/HerderPersistence.h" #include "ledger/LedgerManager.h" #include "util/GlobalChecks.h" #include "util/Logging.h" @@ -208,7 +209,24 @@ ExternalQueue::deleteOldEntries(uint32 count) "Trimming history <= ledger {} (rmin={}, qmin={}, lmin={})", cmin, rmin, qmin, lmin); - mApp.getLedgerManager().deleteOldEntries(mApp.getDatabase(), cmin, count); + // Run on main + mApp.getDatabase().clearPreparedStatementCache( + mApp.getDatabase().getMiscSession()); + HerderPersistence::deleteOldEntries(mApp.getDatabase(), cmin, count); + + // Run in the background + if (mApp.getConfig().parallelLedgerClose()) + { + mApp.postOnBackgroundThread( + [&db = mApp.getDatabase(), &lm = mApp.getLedgerManager(), cmin, + count]() { lm.deleteOldEntries(db, cmin, count); }, + "deleteOldEntries"); + } + else + { + mApp.getLedgerManager().deleteOldEntries(mApp.getDatabase(), cmin, + count); + } } void diff --git a/src/main/test/ApplicationUtilsTests.cpp b/src/main/test/ApplicationUtilsTests.cpp index bdbf6942ea..47c676ecc0 100644 --- a/src/main/test/ApplicationUtilsTests.cpp +++ b/src/main/test/ApplicationUtilsTests.cpp @@ -285,7 +285,7 @@ class SimulationHelper return std::make_pair(selectedLedger, selectedHash); } - LedgerHeaderHistoryEntry const& + LedgerHeaderHistoryEntry getMainNodeLCL() { return mSimulation->getNode(mMainNodeID) @@ -293,7 +293,7 @@ class SimulationHelper .getLastClosedLedgerHeader(); } - LedgerHeaderHistoryEntry const& + LedgerHeaderHistoryEntry getTestNodeLCL() { return mSimulation->getNode(mTestNodeID) diff --git a/src/main/test/ConfigTests.cpp b/src/main/test/ConfigTests.cpp index 2b1dcc559f..ed857a1b4e 100644 --- a/src/main/test/ConfigTests.cpp +++ b/src/main/test/ConfigTests.cpp @@ -286,7 +286,7 @@ TEST_CASE("bad validators configs", "[config]") NODE_SEED="SA7FGJMMUIHNE3ZPI2UO5I632A7O5FBAZTXFAIEVFA4DSSGLHXACLAIT a3" {NODE_HOME_DOMAIN} NODE_IS_VALIDATOR=true -DEPRECATED_SQL_LEDGER_STATE=true +DEPRECATED_SQL_LEDGER_STATE=false ############################ # list of HOME_DOMAINS @@ -474,7 +474,7 @@ TEST_CASE("nesting level", "[config]") return secretKey.getStrKeyPublic(); }; std::string configNesting = - "DEPRECATED_SQL_LEDGER_STATE=true\n" // Required for all configs + "DEPRECATED_SQL_LEDGER_STATE=false\n" // Required for all configs "UNSAFE_QUORUM=true"; std::string quorumSetNumber = ""; std::string quorumSetTemplate = R"( @@ -536,7 +536,7 @@ TEST_CASE("operation filter configuration", "[config]") }; std::stringstream ss; - ss << "DEPRECATED_SQL_LEDGER_STATE=true\n"; // required for all configs + ss << "DEPRECATED_SQL_LEDGER_STATE=false\n"; // required for all configs ss << "UNSAFE_QUORUM=true\n"; toConfigStr(vals, ss); ss << "\n[QUORUM_SET]\n"; diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index 5bbc181901..a777cee998 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -497,17 +497,17 @@ OverlayManagerImpl::triggerPeerResolution() // Trigger DNS resolution on the background thread using task_t = std::packaged_task; - std::shared_ptr task = std::make_shared([this]() { - if (!this->mShuttingDown) - { - auto known = resolvePeers(this->mApp.getConfig().KNOWN_PEERS); - auto preferred = - resolvePeers(this->mApp.getConfig().PREFERRED_PEERS); - return ResolvedPeers{known.first, preferred.first, - known.second || preferred.second}; - } - return ResolvedPeers{{}, {}, false}; - }); + std::shared_ptr task = + std::make_shared([this, cfg = mApp.getConfig()]() { + if (!this->mShuttingDown) + { + auto known = resolvePeers(cfg.KNOWN_PEERS); + auto preferred = resolvePeers(cfg.PREFERRED_PEERS); + return ResolvedPeers{known.first, preferred.first, + known.second || preferred.second}; + } + return ResolvedPeers{{}, {}, false}; + }); mResolvedPeers = task->get_future(); mApp.postOnBackgroundThread(bind(&task_t::operator(), task), diff --git a/src/simulation/LoadGenerator.cpp b/src/simulation/LoadGenerator.cpp index fd124f321d..dd4e60809d 100644 --- a/src/simulation/LoadGenerator.cpp +++ b/src/simulation/LoadGenerator.cpp @@ -1188,6 +1188,13 @@ LoadGenerator::checkAccountSynced(Application& app, bool isCreate) account->getAccountId()); result.push_back(account); } + else if (app.getHerder().sourceAccountPending( + account->getPublicKey())) + { + CLOG_TRACE(LoadGen, "Account {} is pending!", + account->getAccountId()); + result.push_back(account); + } } else if (!reloadRes) { diff --git a/src/simulation/test/LoadGeneratorTests.cpp b/src/simulation/test/LoadGeneratorTests.cpp index b488fbaf8e..24953438c6 100644 --- a/src/simulation/test/LoadGeneratorTests.cpp +++ b/src/simulation/test/LoadGeneratorTests.cpp @@ -843,7 +843,6 @@ TEST_CASE("apply load", "[loadgen][applyload]") VirtualClock clock(VirtualClock::REAL_TIME); auto app = createTestApplication(clock, cfg); - auto const& lm = app->getLedgerManager(); uint64_t ledgerMaxInstructions = 500'000'000; uint64_t ledgerMaxReadLedgerEntries = 2000; diff --git a/src/test/FuzzerImpl.cpp b/src/test/FuzzerImpl.cpp index 23e4c400df..eb3d543de1 100644 --- a/src/test/FuzzerImpl.cpp +++ b/src/test/FuzzerImpl.cpp @@ -888,7 +888,10 @@ resetTxInternalState(Application& app) app.getLedgerTxnRoot().resetForFuzzer(); app.getInvariantManager().resetForFuzzer(); #endif // BUILD_TESTS - app.getDatabase().clearPreparedStatementCache(); + app.getDatabase().clearPreparedStatementCache( + app.getDatabase().getSession()); + app.getDatabase().clearPreparedStatementCache( + app.getDatabase().getMiscSession()); } // FuzzTransactionFrame is a specialized TransactionFrame that includes diff --git a/src/test/TestUtils.cpp b/src/test/TestUtils.cpp index 873bf61e3d..908a90a6d5 100644 --- a/src/test/TestUtils.cpp +++ b/src/test/TestUtils.cpp @@ -244,7 +244,7 @@ upgradeSorobanNetworkConfig(std::function modifyFn, lg.generateLoad(createAccountsLoadConfig); simulation->crankUntil( [&]() { return complete.count() == completeCount + 1; }, - 300 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + 3 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); } // Create upload wasm transaction. @@ -254,7 +254,7 @@ upgradeSorobanNetworkConfig(std::function modifyFn, completeCount = complete.count(); simulation->crankUntil( [&]() { return complete.count() == completeCount + 1; }, - 300 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + 3 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); // Create upgrade transaction. auto createUpgradeLoadGenConfig = GeneratedLoadConfig::txLoad( @@ -270,7 +270,7 @@ upgradeSorobanNetworkConfig(std::function modifyFn, completeCount = complete.count(); simulation->crankUntil( [&]() { return complete.count() == completeCount + 1; }, - 4 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + 3 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); // Arm for upgrade. for (auto app : nodes) @@ -289,7 +289,7 @@ upgradeSorobanNetworkConfig(std::function modifyFn, auto netCfg = app.getLedgerManager().getSorobanNetworkConfig(); return netCfg == cfg; }, - 2 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); + 3 * Herder::EXP_LEDGER_TIMESPAN_SECONDS, false); } void diff --git a/src/test/TxTests.cpp b/src/test/TxTests.cpp index 68be2f5e59..7385abd373 100644 --- a/src/test/TxTests.cpp +++ b/src/test/TxTests.cpp @@ -639,15 +639,15 @@ loadAccount(AbstractLedgerTxn& ltx, PublicKey const& k, bool mustExist) bool doesAccountExist(Application& app, PublicKey const& k) { - LedgerTxn ltx(app.getLedgerTxnRoot()); - return (bool)stellar::loadAccountWithoutRecord(ltx, k); + LedgerSnapshot lss(app); + return (bool)lss.getAccount(k); } xdr::xvector getAccountSigners(PublicKey const& k, Application& app) { - LedgerTxn ltx(app.getLedgerTxnRoot()); - auto account = stellar::loadAccount(ltx, k); + LedgerSnapshot lss(app); + auto account = lss.getAccount(k); return account.current().data.account().signers; } @@ -709,8 +709,9 @@ transactionFromOperations(Application& app, SecretKey const& from, { uint32_t ledgerVersion; { - LedgerTxn ltx(app.getLedgerTxnRoot()); - ledgerVersion = ltx.loadHeader().current().ledgerVersion; + ledgerVersion = app.getLedgerManager() + .getLastClosedLedgerHeader() + .header.ledgerVersion; } if (protocolVersionIsBefore(ledgerVersion, ProtocolVersion::V_13)) { diff --git a/src/transactions/ExtendFootprintTTLOpFrame.cpp b/src/transactions/ExtendFootprintTTLOpFrame.cpp index fcf23d1c8a..7786b3e553 100644 --- a/src/transactions/ExtendFootprintTTLOpFrame.cpp +++ b/src/transactions/ExtendFootprintTTLOpFrame.cpp @@ -57,14 +57,12 @@ ExtendFootprintTTLOpFrame::doApply( releaseAssertOrThrow(sorobanData); ZoneNamedN(applyZone, "ExtendFootprintTTLOpFrame apply", true); - ExtendFootprintTTLMetrics metrics( - app.getLedgerManager().getSorobanMetrics()); + ExtendFootprintTTLMetrics metrics(app.getSorobanMetrics()); auto timeScope = metrics.getExecTimer(); auto const& resources = mParentTx.sorobanResources(); auto const& footprint = resources.footprint; - auto const& sorobanConfig = - app.getLedgerManager().getSorobanNetworkConfig(); + auto sorobanConfig = app.getSorobanNetworkConfig(); rust::Vec rustEntryRentChanges; rustEntryRentChanges.reserve(footprint.readOnly.size()); diff --git a/src/transactions/InvokeHostFunctionOpFrame.cpp b/src/transactions/InvokeHostFunctionOpFrame.cpp index 28a6ec1f83..7c89bb3097 100644 --- a/src/transactions/InvokeHostFunctionOpFrame.cpp +++ b/src/transactions/InvokeHostFunctionOpFrame.cpp @@ -327,10 +327,9 @@ InvokeHostFunctionOpFrame::doApply( ZoneNamedN(applyZone, "InvokeHostFunctionOpFrame apply", true); Config const& appConfig = app.getConfig(); - HostFunctionMetrics metrics(app.getLedgerManager().getSorobanMetrics()); + HostFunctionMetrics metrics(app.getSorobanMetrics()); auto timeScope = metrics.getExecTimer(); - auto const& sorobanConfig = - app.getLedgerManager().getSorobanNetworkConfig(); + auto sorobanConfig = app.getSorobanNetworkConfig(); // Get the entries for the footprint rust::Vec ledgerEntryCxxBufs; diff --git a/src/transactions/OperationFrame.cpp b/src/transactions/OperationFrame.cpp index 69c2660c60..04a7190b85 100644 --- a/src/transactions/OperationFrame.cpp +++ b/src/transactions/OperationFrame.cpp @@ -262,8 +262,7 @@ OperationFrame::checkValid(AppConnector& app, isSoroban()) { releaseAssertOrThrow(sorobanData); - auto const& sorobanConfig = - app.getLedgerManager().getSorobanNetworkConfig(); + auto sorobanConfig = app.getSorobanNetworkConfig(); validationResult = doCheckValidForSoroban(sorobanConfig, app.getConfig(), diff --git a/src/transactions/RestoreFootprintOpFrame.cpp b/src/transactions/RestoreFootprintOpFrame.cpp index 36d5a20953..47dd5fd1e0 100644 --- a/src/transactions/RestoreFootprintOpFrame.cpp +++ b/src/transactions/RestoreFootprintOpFrame.cpp @@ -57,14 +57,13 @@ RestoreFootprintOpFrame::doApply( { ZoneNamedN(applyZone, "RestoreFootprintOpFrame apply", true); - RestoreFootprintMetrics metrics(app.getLedgerManager().getSorobanMetrics()); + RestoreFootprintMetrics metrics(app.getSorobanMetrics()); auto timeScope = metrics.getExecTimer(); auto const& resources = mParentTx.sorobanResources(); auto const& footprint = resources.footprint; auto ledgerSeq = ltx.loadHeader().current().ledgerSeq; - auto const& sorobanConfig = - app.getLedgerManager().getSorobanNetworkConfig(); + auto sorobanConfig = app.getSorobanNetworkConfig(); auto const& appConfig = app.getConfig(); auto const& archivalSettings = sorobanConfig.stateArchivalSettings(); @@ -148,14 +147,11 @@ RestoreFootprintOpFrame::doApply( int64_t rentFee = rust_bridge::compute_rent_fee( app.getConfig().CURRENT_LEDGER_PROTOCOL_VERSION, ledgerVersion, rustEntryRentChanges, - app.getLedgerManager() - .getSorobanNetworkConfig() - .rustBridgeRentFeeConfiguration(), + app.getSorobanNetworkConfig().rustBridgeRentFeeConfiguration(), ledgerSeq); if (!sorobanData->consumeRefundableSorobanResources( 0, rentFee, ltx.loadHeader().current().ledgerVersion, - app.getLedgerManager().getSorobanNetworkConfig(), app.getConfig(), - mParentTx)) + app.getSorobanNetworkConfig(), app.getConfig(), mParentTx)) { innerResult(res).code(RESTORE_FOOTPRINT_INSUFFICIENT_REFUNDABLE_FEE); return false; diff --git a/src/transactions/TransactionFrame.cpp b/src/transactions/TransactionFrame.cpp index 1eb017d768..79e6e61d95 100644 --- a/src/transactions/TransactionFrame.cpp +++ b/src/transactions/TransactionFrame.cpp @@ -1445,7 +1445,7 @@ TransactionFrame::checkValidWithOptionallyChargedFee( { sorobanResourceFee = computePreApplySorobanResourceFee( ls.getLedgerHeader().current().ledgerVersion, - app.getLedgerManager().getSorobanNetworkConfig(), app.getConfig()); + app.getSorobanNetworkConfig(), app.getConfig()); } bool res = commonValid(app, signatureChecker, ls, current, false, chargeFee, lowerBoundCloseTimeOffset, upperBoundCloseTimeOffset, @@ -1504,8 +1504,7 @@ TransactionFrame::checkSorobanResourceAndSetError( AppConnector& app, uint32_t ledgerVersion, MutableTxResultPtr txResult) const { - auto const& sorobanConfig = - app.getLedgerManager().getSorobanNetworkConfig(); + auto const& sorobanConfig = app.getSorobanNetworkConfig(); if (!validateSorobanResources(sorobanConfig, app.getConfig(), ledgerVersion, *txResult->getSorobanData())) { @@ -1694,8 +1693,7 @@ TransactionFrame::applyOperations(SignatureChecker& signatureChecker, // If transaction fails, we don't charge for any // refundable resources. auto preApplyFee = computePreApplySorobanResourceFee( - ledgerVersion, - app.getLedgerManager().getSorobanNetworkConfig(), + ledgerVersion, app.getSorobanNetworkConfig(), app.getConfig()); txResult.getSorobanData()->setSorobanFeeRefund( @@ -1797,8 +1795,7 @@ TransactionFrame::apply(AppConnector& app, AbstractLedgerTxn& ltx, isSoroban()) { sorobanResourceFee = computePreApplySorobanResourceFee( - ledgerVersion, app.getLedgerManager().getSorobanNetworkConfig(), - app.getConfig()); + ledgerVersion, app.getSorobanNetworkConfig(), app.getConfig()); auto& sorobanData = *txResult->getSorobanData(); sorobanData.setSorobanConsumedNonRefundableFee( diff --git a/src/transactions/TransactionUtils.cpp b/src/transactions/TransactionUtils.cpp index bcdb02c257..d1c9d8b69b 100644 --- a/src/transactions/TransactionUtils.cpp +++ b/src/transactions/TransactionUtils.cpp @@ -1988,7 +1988,7 @@ isTransactionXDRValidForCurrentProtocol(AppConnector& app, { uint32_t maxProtocol = app.getConfig().CURRENT_LEDGER_PROTOCOL_VERSION; uint32_t currProtocol = - app.getLedgerManager().getLastClosedLedgerHeader().header.ledgerVersion; + app.getLastClosedLedgerHeader().header.ledgerVersion; // If we could parse the XDR when ledger is using the maximum supported // protocol version, then XDR has to be valid. // This check also is pointless before protocol 21 as Soroban environment diff --git a/src/transactions/test/SorobanTxTestUtils.cpp b/src/transactions/test/SorobanTxTestUtils.cpp index 0ae49359f6..f583a09b09 100644 --- a/src/transactions/test/SorobanTxTestUtils.cpp +++ b/src/transactions/test/SorobanTxTestUtils.cpp @@ -1055,7 +1055,7 @@ SorobanTest::getDummyAccount() SorobanNetworkConfig const& SorobanTest::getNetworkCfg() { - return getApp().getLedgerManager().getSorobanNetworkConfig(); + return getApp().getLedgerManager().getMutableSorobanNetworkConfig(); } uint32_t