From 3a3d94d3966c2a850fb2e5c59f3fc941c23fa24c Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Wed, 25 Oct 2023 05:21:13 +0100 Subject: [PATCH] HPCC-30564 Prevent pending write externals being flushed from cache. Flushing some externals from the external cache (when full), whilst the transactions are still pending to be written could cause subsequent ext. cache lookups to attempt to look to disk for the external, and fail with "Missing external file ". Also fix a bug with cachedSz. Transactions that removed externals, were being removed from the cache, but their size was not deducted from cachedSz. Consequently, over time, the extCache would falsely appear full, meaning it was purged prematurely. Signed-off-by: Jake Smith --- dali/base/dasds.cpp | 63 ++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 21 deletions(-) diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index 6a3055903fb..14f8c317c92 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -1070,7 +1070,8 @@ class CTransactionItem : public CSimpleInterface { void *data; unsigned dataLength; - }; + std::atomic pendingWrite; + } ext; }; CTransactionItem(char *path, IPropertyTree *_deltaTree) : deltaTree(_deltaTree) @@ -1078,9 +1079,13 @@ class CTransactionItem : public CSimpleInterface type = f_delta; name = path; } - CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data) + CTransactionItem(char *_name) : name(_name) + { + type = f_delext; + } + CTransactionItem(char *_name, size32_t _dataLength, void *_data, bool _pendingWrite) : name(_name), ext{ _data, _dataLength, _pendingWrite } { - type = data ? f_addext : f_delext; + type = f_addext; } ~CTransactionItem() { @@ -1091,7 +1096,7 @@ class CTransactionItem : public CSimpleInterface ::Release(deltaTree); break; case f_addext: - free(data); + free(ext.data); break; case f_delext: default: @@ -1335,9 +1340,10 @@ class CDeltaWriter : implements IThreaded // a cold restart between the two. // NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore // because the saved in-memory store has been altered at this point, i.e. it depends on the external file (see flush()) - writeExt(dataPath, item->name, item->dataLength, item->data); + writeExt(dataPath, item->name, item->ext.dataLength, item->ext.data); if (backupPath.length()) - writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30); + writeExt(backupPath, item->name, item->ext.dataLength, item->ext.data, 60, 30); + item->ext.pendingWrite = false; } else { @@ -1416,14 +1422,14 @@ class CDeltaWriter : implements IThreaded } CTransactionItem *addExt(char *name, size32_t length, void *content) { - CTransactionItem *item = new CTransactionItem(name, length, content); + CTransactionItem *item = new CTransactionItem(name, length, content, true); CriticalBlock b(pendingCrit); addToQueue(LINK(item)); return item; } void removeExt(char *name) { - CTransactionItem *item = new CTransactionItem(name, 0, nullptr); + CTransactionItem *item = new CTransactionItem(name); CriticalBlock b(pendingCrit); addToQueue(item); } @@ -1902,11 +1908,16 @@ class CExtCache for (auto it = order.begin(); it != order.end();) { CTransactionItem *item = *it; - cachedSz -= item->dataLength; - extTable.erase(item->name); - it = order.erase(it); - if (cachedSz <= cacheSzLimit) - break; + if (item->ext.pendingWrite) + it++; + else + { + cachedSz -= item->ext.dataLength; + extTable.erase(item->name); + it = order.erase(it); + if (cachedSz <= cacheSzLimit) + break; + } } } void doAdd(CTransactionItem *item) @@ -1916,7 +1927,7 @@ class CExtCache if (it != extTable.end()) { Linked &existingItem = *(it->second); - cachedSz -= existingItem->dataLength; + cachedSz -= existingItem->ext.dataLength; existingItem.set(item); order.splice(order.end(), order, it->second); // move to front of FIFO list } @@ -1925,7 +1936,7 @@ class CExtCache auto listIt = order.insert(order.end(), item); extTable[item->name] = listIt; } - cachedSz += item->dataLength; + cachedSz += item->ext.dataLength; if (cachedSz > cacheSzLimit) purge(); } @@ -1940,12 +1951,12 @@ class CExtCache return; MemoryAttr ma; ma.set(sz, data); - CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach()); + CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach(), false); doAdd(item); } void add(CTransactionItem *item) { - if (item->dataLength > cacheSzLimit) + if (item->ext.dataLength > cacheSzLimit) return; doAdd(item); } @@ -1956,6 +1967,8 @@ class CExtCache if (mapIt != extTable.end()) { auto listIter = mapIt->second; + Linked &existingItem = *listIter; + cachedSz -= existingItem->ext.dataLength; assertex(listIter != order.erase(listIter)); assertex(mapIt != extTable.erase(mapIt)); } @@ -1968,7 +1981,7 @@ class CExtCache return false; Linked item = *(it->second); b.leave(); - mb.append(item->dataLength, item->data); + mb.append(item->ext.dataLength, item->ext.data); return true; } }; @@ -2258,10 +2271,10 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree) void CDeltaWriter::addToQueue(CTransactionItem *item) { pending.push(item); - // add actual size for externals, and nonimal '100 byte' value for delta transactions + // add actual size for externals, and nominal '100 byte' value for delta transactions // it will act. as a rough guide to appoaching size threshold. It is not worth // synchronously preparing and serializing here (which will be done asynchronously later) - pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100; + pendingSz += (CTransactionItem::f_addext == item->type) ? item->ext.dataLength : 100; size_t items = pending.size(); if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit)) { @@ -6073,7 +6086,15 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const doTimeComparison = false; if (config.getPropBool("@lightweightCoalesce", true)) coalesce.setown(new CLightCoalesceThread(config, iStoreHelper)); - size_t extCacheSize = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB) * 0x100000; + + // if enabled (default), ensure the ext cache size is bigger than the pending delta writer cache + // to ensure that when the ext cache is full, there are transaction items to flush that aren't still + // pending on write. + unsigned extCacheSizeMB = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB); + unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB); + if (extCacheSizeMB && deltaTransactionMaxMemMB && (extCacheSizeMB < deltaTransactionMaxMemMB)) + extCacheSizeMB = deltaTransactionMaxMemMB + 1; + size32_t extCacheSize = extCacheSizeMB * 0x100000; extCache.init(extCacheSize); }