Skip to content

Commit

Permalink
MB-54729: Enable history scan for CDC backfill
Browse files Browse the repository at this point in the history
Replace the todo markers with code that now utilises the magma history
API - this now means scanAllVersions for example is hooked into the
magma history scanning API.

Add new tests that validate multiple versions can be stored and
returned.

Also required are changes to unit tests to respect new expectation
checks that occur in magma - primarily that flushing writes ordered
batches - this is only a problem for tests which bypass the flusher
and call KVStore directly.

**** ISSUES ****

ep-engine_ep_unit_tests does not pass:

1) Exception from magma

MagmaKVStoreRollbackTest.Rollback hits the following exception

GSL: Precondition failure: 'levelSize >= compactionState[level].history.Size' at /Users/jimwalker/Code/couchbase/neo/magma/lsm/lsm_tree.cc:895

2) Seg-fault in magma

Seen in a number of tests, 1 example:

CollectionsDcpEphemeralOrPersistent/CollectionsDcpParameterizedTest.DefaultCollectionDropped/persistent_magma_value_only

Process 78731 stopped
* thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT)
    frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt]
   72   }
   73
   74   Slice DocSequenceBuffer::GetKey() {
-> 75       seqFmt.Set(sortedList[offset]->seqno);
   76       return seqFmt.Encode();
   77   }
   78

* thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT)
  * frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt]
    frame couchbase#1: 0x0000000101361e2e ep-engine_ep_unit_tests`magma::mvccIteratorAdaptor::GetKey(this=0x0000000118536c00) at mvcc.h:249:25 [opt]
    frame couchbase#2: 0x000000010132b688 ep-engine_ep_unit_tests`magma::IteratorWithFilter::filterKeys(this=0x0000000118128350) at iterator.cc:214:32 [opt]
    frame couchbase#3: 0x000000010132de5b ep-engine_ep_unit_tests`magma::KVReader::ReadKVs(this=0x00007ff7bfefd550) at common.cc:70:19 [opt]
    frame couchbase#4: 0x0000000101378f63 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, w=0x00007ff7bfefd890, itr=0x0000000118128350, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefd860)>) at lsm_tree.cc:719:15 [opt]
    frame couchbase#5: 0x0000000101376ee8 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, appendMode=<unavailable>, itr=0x0000000118128350, sizeEstimate=<unavailable>, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefdb60)>) at lsm_tree.cc:682:17 [opt]
    frame couchbase#6: 0x00000001013761b2 ep-engine_ep_unit_tests`magma::LSMTree::writeMemtable(this=0x000000011855a820, memtable=0x000000011854c7a0) at lsm_tree.cc:449:21 [opt]
    frame #7: 0x000000010137753f ep-engine_ep_unit_tests`magma::LSMTree::doMemtableFlushWork(this=0x000000011855a820) at lsm_tree.cc:531:18 [opt]
    frame #8: 0x000000010139fe62 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] magma::LSMTree::newFlush(this=<unavailable>)::$_16::operator()() const at lsm_tree.cc:993:34 [opt]
    frame #9: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] decltype(__f=<unavailable>)::$_16&>(fp)()) std::__1::__invoke<magma::LSMTree::newFlush()::$_16&>(magma::LSMTree::newFlush()::$_16&) at type_traits:3918:1 [opt]
    frame #10: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::tuple<magma::Status, magma::CheckpointTransaction> std::__1::__invoke_void_return_wrapper<std::__1::tuple<magma::Status, magma::CheckpointTransaction>, false>::__call<magma::LSMTree::newFlush(__args=<unavailable>)::$_16&>(magma::LSMTree::newFlush()::$_16&) at invoke.h:30:16 [opt]
    frame #11: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:178:16 [opt]
    frame #12: 0x000000010139fe59 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:352:12 [opt]
    frame #13: 0x00000001012f72af ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::__function::__value_func<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt]
    frame #14: 0x00000001012f7296 ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::function<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=0x0000000118131560)() const at function.h:1182:12 [opt]
    frame #15: 0x00000001012f7292 ep-engine_ep_unit_tests`magma::FlushWork::Execute(this=0x0000000118131560) at flush_work.cc:61:29 [opt]
    frame #16: 0x0000000101389d5e ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x00007ff7bfefe1c0)::$_38::operator()() at kvstore.cc:515:27 [opt]
    frame #17: 0x0000000101388fac ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x000000010442a420, wal=<unavailable>, offset=(SegID = 1, SegOffset = 4096), flushMode=<unavailable>, blockMode=Blocking) at kvstore.cc:582:16 [opt]
    frame #18: 0x0000000101389a5a ep-engine_ep_unit_tests`magma::KVStore::FlushMemTables(this=<unavailable>, wal=<unavailable>, flushMode=<unavailable>, blockMode=<unavailable>) at kvstore.cc:387:12 [opt]
    frame #19: 0x00000001012fd9ba ep-engine_ep_unit_tests`magma::Magma::Impl::syncKVStore(this=0x000000011814f000, kvID=<unavailable>, checkpoint=true) at db.cc:1352:21 [opt]
    frame #20: 0x000000010132678a ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=0x00007ff7bfefe400)>)::$_7::operator()() const at db.cc:880:23 [opt]
    frame #21: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=<unavailable>)>)::$_8::operator()() const at db.cc:891:21 [opt]
    frame #22: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] decltype(__f=<unavailable>)>)::$_8&>(fp)()) std::__1::__invoke<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at type_traits:3918:1 [opt]
    frame #23: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] void std::__1::__invoke_void_return_wrapper<void, true>::__call<magma::Magma::Impl::CompactKVStore(__args=<unavailable>)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at invoke.h:61:9 [opt]
    frame #24: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:178:16 [opt]
    frame #25: 0x0000000101326764 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:352:12 [opt]
    frame #26: 0x0000000101303138 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::__function::__value_func<void ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt]
    frame #27: 0x000000010130312d ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::function<void ()>::operator(this=0x00007ff7bfefe4b0)() const at function.h:1182:12 [opt]
    frame #28: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:92:9 [opt]
    frame #29: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:91:14 [opt]
    frame #30: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(this=<unavailable>, kvID=<unavailable>, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefe550)>) at db.cc:895:1 [opt]
    frame #31: 0x000000010130336c ep-engine_ep_unit_tests`magma::Magma::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=<unavailable>, makeCallback=<unavailable>)>) at db.cc:901:18 [opt]
    frame #32: 0x000000010004fd3d ep-engine_ep_unit_tests`MagmaMemoryTrackingProxy::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefea00)>) at magma-memory-tracking-proxy.cc:190:19 [opt]
    frame #33: 0x00000001000a9eeb ep-engine_ep_unit_tests`MagmaKVStore::compactDBInternal(this=<unavailable>, vbLock=0x00007ff7bfefeda0, ctx=std::__1::shared_ptr<CompactionContext>::element_type @ 0x00000001184acc20 strong=3 weak=1) at magma-kvstore.cc:2590:29 [opt]
    frame #34: 0x00000001000a93ad ep-engine_ep_unit_tests`MagmaKVStore::compactDB(this=0x00000001067e6500, vbLock=0x00007ff7bfefeda0, ctx=nullptr) at magma-kvstore.cc:2445:12 [opt]
    frame #35: 0x00000001001d7eb0 ep-engine_ep_unit_tests`EPBucket::compactInternal(this=0x00000001067e6000, vb=0x00007ff7bfefed90, config=<unavailable>) at ep_bucket.cc:1398:25 [opt]
    frame #36: 0x00000001001d83f6 ep-engine_ep_unit_tests`EPBucket::doCompact(this=0x00000001067e6000, vbid=(vbid = 0), config=0x00007ff7bfefedf0, cookies=size=0) at ep_bucket.cc:1476:14 [opt]

3) Key sorting issue

Magma now checks for sorted keys - it turns out KV flushing is violating that ordering.
Need to know if KV should fix or is the magma check required??

Example:

CollectionsDcpEphemeralOrPersistent/CollectionsLegacyDcpTest.default_collection_is_not_vbucket_highseqno_with_pending/persistent_nexus_couchstore_magma_value_only

CRITICAL [(SynchronousEPEngine:default) magma_0]Fatal error: Found: preceding key(d2) > current key(    _collection). If history is enabled, all keys in the batch must be sorted lexicographicall

The problem is that the test flushes a prepare(default collection, key=d2) and create-collection(fruit) together. The flusher orders these...

\0d2
\1create_fruit

This is correct.

But \0d2 is marked as a prepare, when flushed to disk it goes into a special namespace. This occurs in KVStore after the sorting.

\0d2 becomes \2\0d2

And magma actually sees

\2\0d2
\1create_fruit

and we have violated the expects

Change-Id: Ica9ea1b52c51f125c9e8839a0fca412834fc25f7
  • Loading branch information
jimwwalker committed Jan 18, 2023
1 parent 65a919c commit 0d1f2c5
Show file tree
Hide file tree
Showing 15 changed files with 214 additions and 69 deletions.
5 changes: 5 additions & 0 deletions engines/ep/src/kv_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,11 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const {

void KVBucket::setHistoryRetentionBytes(size_t bytes) {
historyRetentionBytes = bytes;
for (auto& i : vbMap.shards) {
KVShard* shard = i.get();
shard->getRWUnderlying()->setHistoryRetentionBytes(bytes);
}

}

size_t KVBucket::getHistoryRetentionBytes() const {
Expand Down
7 changes: 7 additions & 0 deletions engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4542,3 +4542,10 @@ std::unique_ptr<TransactionContext> CouchKVStore::begin(
return std::make_unique<CouchKVStoreTransactionContext>(
*this, vbid, std::move(pcb));
}

void CouchKVStore::setHistoryRetentionBytes(size_t size) {
// no-op.
// Note: StorageProperties reports that history scan is not supported, so
// we accept this attempt to set size, but will fail if a scanAllVersions
// is attempted.
}
2 changes: 2 additions & 0 deletions engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ class CouchKVStore : public KVStore
std::unique_ptr<TransactionContext> begin(
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;

void setHistoryRetentionBytes(size_t size) override;

protected:
/**
* RAII holder for a couchstore LocalDoc object
Expand Down
5 changes: 5 additions & 0 deletions engines/ep/src/kvstore/kvstore_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,11 @@ class KVStoreIface {
* @param vbid ID of the vbucket being created
*/
virtual void prepareToCreateImpl(Vbid vbid) = 0;

/**
* Method to configure the amount of history a vbucket should retain.
*/
virtual void setHistoryRetentionBytes(size_t size) = 0;
};

std::string to_string(KVStoreIface::ReadVBStateStatus status);
26 changes: 16 additions & 10 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const {
StorageProperties::AutomaticDeduplication::No,
StorageProperties::PrepareCounting::No,
StorageProperties::CompactionStaleItemCallbacks::Yes,
StorageProperties::HistoryRetentionAvailable::No);
StorageProperties::HistoryRetentionAvailable::Yes);
return rv;
}

Expand Down Expand Up @@ -1683,8 +1683,7 @@ std::unique_ptr<BySeqnoScanContext> MagmaKVStore::initBySeqnoScanContext(
getDroppedStatus.String());
}

// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
auto historyStartSeqno = 0;
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
if (logger->should_log(spdlog::level::info)) {
logger->info(
"MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}"
Expand Down Expand Up @@ -1797,8 +1796,7 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
return nullptr;
}

// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
auto historyStartSeqno = 0;
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
logger->info(
"MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} "
"KeyIterator:{}",
Expand All @@ -1818,13 +1816,16 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
historyStartSeqno);
}

scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot);
}

scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const {
// @todo use magma's mode
// return scan(ctx, magma::Magma::SeqIterator::Mode::History);
return scan(ctx);
return scan(ctx, magma::Magma::SeqIterator::Mode::History);
}

scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx,
magma::Magma::SeqIterator::Mode mode) const {
if (ctx.lastReadSeqno == ctx.maxSeqno) {
logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}",
ctx.vbid,
Expand All @@ -1837,7 +1838,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
startSeqno = ctx.lastReadSeqno + 1;
}
auto& mctx = dynamic_cast<MagmaScanContext&>(ctx);
for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid();
for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode);
mctx.itr->Valid();
mctx.itr->Next()) {
Slice keySlice, metaSlice, valSlice;
uint64_t seqno;
Expand Down Expand Up @@ -3710,3 +3712,7 @@ std::pair<Status, uint64_t> MagmaKVStore::getOldestRollbackableHighSeqno(

return {status, seqno};
}

void MagmaKVStore::setHistoryRetentionBytes(size_t size) {
magma->SetHistoryRetentionSize(size);
}
5 changes: 5 additions & 0 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ class MagmaKVStore : public KVStore {
std::unique_ptr<TransactionContext> begin(
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;

void setHistoryRetentionBytes(size_t size) override;

// Magma uses a unique logger with a prefix of magma so that all logging
// calls from the wrapper thru magma will be prefixed with magma.
std::shared_ptr<BucketLogger> logger;
Expand Down Expand Up @@ -768,6 +770,9 @@ class MagmaKVStore : public KVStore {
const magma::Slice& valSlice,
std::function<magma::Status(magma::Slice&)> valueRead) const;

scan_error_t scan(BySeqnoScanContext& ctx,
magma::Magma::SeqIterator::Mode mode) const;

MagmaKVStoreConfig& configuration;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ magma::Status MagmaMemoryTrackingProxy::WriteDocs(
docOperations,
kvsRev,
wrappedDocCallback,
wrappedPostCallback);
wrappedPostCallback,
// @todo: don't force this - must be passed down
magma::Magma::HistoryMode::Enabled);
}

magma::Status MagmaMemoryTrackingProxy::NewCheckpoint(
Expand Down
7 changes: 7 additions & 0 deletions engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3299,3 +3299,10 @@ Vbid::id_type NexusKVStore::getCacheSlot(Vbid vbid) const {
uint64_t NexusKVStore::getPurgeSeqno(Vbid vbid) const {
return purgeSeqno.at(getCacheSlot(vbid));
}

void NexusKVStore::setHistoryRetentionBytes(size_t size) {
// no-op.
// Note: StorageProperties reports that history scan is not supported, so
// we accept this attempt to set size, but will fail if a scanAllVersions
// is attempted.
}
1 change: 1 addition & 0 deletions engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class NexusKVStore : public KVStoreIface {
void delSystemEvent(TransactionContext& txnCtx,
const queued_item item) override;
void endTransaction(Vbid vbid) override;
void setHistoryRetentionBytes(size_t size) override;

/**
* Unit test only hook called before we compact the first KVStore. Public as
Expand Down
7 changes: 7 additions & 0 deletions engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1973,3 +1973,10 @@ RocksDBKVStoreTransactionContext::RocksDBKVStoreTransactionContext(
: TransactionContext(kvstore, vbid, std::move(cb)),
pendingReqs(std::make_unique<RocksDBKVStore::PendingRequestQueue>()) {
}

void RocksDBKVStore::setHistoryRetentionBytes(size_t size) {
// no-op.
// Note: StorageProperties reports that history scan is not supported, so
// we accept this attempt to set size, but will fail if a scanAllVersions
// is attempted.
}
2 changes: 2 additions & 0 deletions engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ class RocksDBKVStore : public KVStore {
std::unique_ptr<TransactionContext> begin(
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;

void setHistoryRetentionBytes(size_t size) override;

protected:
// Write a batch of updates to the given database; measuring the time
// taken and adding the timer to the commit histogram.
Expand Down
1 change: 1 addition & 0 deletions engines/ep/tests/mock/mock_kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class MockKVStore : public KVStore {
prepareToRollback,
(Vbid vbid),
(override));
MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override));

/**
* Helper function to replace the existing read-write KVStore in the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,53 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {

void applyEvents(TransactionContext& txnCtx,
VB::Commit& commitData,
const CollectionsManifest& cm) {
const CollectionsManifest& cm,
bool writeEventNow = true) {
manifest.update(*vbucket, makeManifest(cm));

std::vector<queued_item> events;
getEventsFromCheckpoint(events);

for (auto& ev : events) {
commitData.collections.recordSystemEvent(*ev);
if (writeEventNow) {
if (ev->isDeleted()) {
kvstore->delSystemEvent(txnCtx, ev);
} else {
kvstore->setSystemEvent(txnCtx, ev);
}
}
}
if (!writeEventNow) {
std::move(events.begin(),
events.end(),
std::back_inserter(allEvents));
}
}

void applyEvents(TransactionContext& txnCtx,
const CollectionsManifest& cm,
bool writeEventNow = true) {
return applyEvents(txnCtx, flush, cm, writeEventNow);
}

// This function is to be used in conjunction with applyEvents when
// writeEventNow=false allowing a test to better emulate the flusher and
// write keys in a sorted batch. Tests can applyEvents so that collection
// metadata management does updates, but defer the system event writing
// until ready to commit
void sortAndWriteAllEvents(TransactionContext& txnCtx) {
std::sort(allEvents.begin(),
allEvents.end(),
OrderItemsForDeDuplication{});
for (auto& ev : allEvents) {
if (ev->isDeleted()) {
kvstore->delSystemEvent(txnCtx, ev);
} else {
kvstore->setSystemEvent(txnCtx, ev);
}
}
}

void applyEvents(TransactionContext& txnCtx,
const CollectionsManifest& cm) {
applyEvents(txnCtx, flush, cm);
allEvents.clear();
}

void checkUid(const Collections::KVStore::Manifest& md,
Expand Down Expand Up @@ -219,7 +247,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
VB::Commit commitData(manifest);
auto ctx = kvstore->begin(vbucket->getId(),
std::make_unique<PersistenceCallback>());
applyEvents(*ctx, commitData, cm);
applyEvents(*ctx, commitData, cm, false);
sortAndWriteAllEvents(*ctx);
kvstore->commit(std::move(ctx), commitData);
auto [status, md] = kvstore->getCollectionsManifest(Vbid(0));
EXPECT_TRUE(status);
Expand All @@ -235,6 +264,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
VBucketPtr vbucket;
WriteCallback wc;
DeleteCallback dc;
std::vector<queued_item> allEvents;
};

class CollectionsKVStoreTest
Expand Down Expand Up @@ -578,19 +608,21 @@ class CollectionRessurectionKVStoreTest
auto ctx = kvstore->begin(vbucket->getId(),
std::make_unique<PersistenceCallback>());
cm.add(targetScope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
cm.add(target, targetScope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
sortAndWriteAllEvents(*ctx);
kvstore->commit(std::move(ctx), flush);
}

// runs a flush batch that will leave the target collection in dropped state
void dropScope() {
openScopeOpenCollection();
cm.remove(targetScope);
auto ctx = kvstore->begin(vbucket->getId(),
std::make_unique<PersistenceCallback>());
applyEvents(*ctx, cm);
cm.remove(targetScope);
applyEvents(*ctx, cm, false);
sortAndWriteAllEvents(*ctx);
kvstore->commit(std::move(ctx), flush);
}

Expand Down Expand Up @@ -704,9 +736,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
std::make_unique<PersistenceCallback>());
if (!cm.exists(targetScope)) {
cm.add(targetScope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
cm.add(target, targetScope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
}

std::string expectedName = target.name;
Expand All @@ -715,22 +747,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
// iterate cycles of remove/add
for (int ii = 0; ii < getCycles(); ii++) {
cm.remove(scope);
applyEvents(*ctx, cm);

applyEvents(*ctx, cm, false);
if (resurectWithNewName()) {
expectedName = target.name + "_" + std::to_string(ii);
scope.name = targetScope.name + "_" + std::to_string(ii);
}
cm.add(scope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
cm.add({expectedName, target.uid}, scope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
}

if (dropCollectionAtEnd()) {
cm.remove(scope);
applyEvents(*ctx, cm);
applyEvents(*ctx, cm, false);
}

sortAndWriteAllEvents(*ctx);
kvstore->commit(std::move(ctx), flush);

// Now validate
Expand Down
Loading

0 comments on commit 0d1f2c5

Please sign in to comment.