Skip to content

Commit

Permalink
HPCC-30564 Prevent pending write externals being flushed from cache.
Browse files Browse the repository at this point in the history
Flushing some externals from the external cache (when full), whilst
the transactions are still pending to be written could cause
subsequent ext. cache lookups to attempt to look to disk for the
external, and fail with "Missing external file ".

Also fix a bug with cachedSz. Transactions that removed externals,
were being removed from the cache, but their size was not
deducted from cachedSz.
Consequently, over time, the extCache would falsely appear full,
meaning it was purged prematurely.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 25, 2023
1 parent c0ec1dc commit 588dccf
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,17 +1070,22 @@ class CTransactionItem : public CSimpleInterface
{
void *data;
unsigned dataLength;
};
std::atomic<bool> pendingWrite;
} ext;
};

CTransactionItem(char *path, IPropertyTree *_deltaTree) : deltaTree(_deltaTree)
{
type = f_delta;
name = path;
}
CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data)
CTransactionItem(char *_name) : name(_name)
{
type = f_delext;
}
CTransactionItem(char *_name, size32_t _dataLength, void *_data, bool _pendingWrite) : name(_name), ext{ _data, _dataLength, _pendingWrite }
{
type = data ? f_addext : f_delext;
type = f_addext;
}
~CTransactionItem()
{
Expand All @@ -1091,7 +1096,7 @@ class CTransactionItem : public CSimpleInterface
::Release(deltaTree);
break;
case f_addext:
free(data);
free(ext.data);
break;
case f_delext:
default:
Expand Down Expand Up @@ -1335,9 +1340,10 @@ class CDeltaWriter : implements IThreaded
// a cold restart between the two.
// NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore
// because the saved in-memory store has been altered at this point, i.e. it depends on the external file (see flush())
writeExt(dataPath, item->name, item->dataLength, item->data);
writeExt(dataPath, item->name, item->ext.dataLength, item->ext.data);
if (backupPath.length())
writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30);
writeExt(backupPath, item->name, item->ext.dataLength, item->ext.data, 60, 30);
item->ext.pendingWrite = false;
}
else
{
Expand Down Expand Up @@ -1416,14 +1422,14 @@ class CDeltaWriter : implements IThreaded
}
CTransactionItem *addExt(char *name, size32_t length, void *content)
{
CTransactionItem *item = new CTransactionItem(name, length, content);
CTransactionItem *item = new CTransactionItem(name, length, content, true);
CriticalBlock b(pendingCrit);
addToQueue(LINK(item));
return item;
}
void removeExt(char *name)
{
CTransactionItem *item = new CTransactionItem(name, 0, nullptr);
CTransactionItem *item = new CTransactionItem(name);
CriticalBlock b(pendingCrit);
addToQueue(item);
}
Expand Down Expand Up @@ -1902,11 +1908,16 @@ class CExtCache
for (auto it = order.begin(); it != order.end();)
{
CTransactionItem *item = *it;
cachedSz -= item->dataLength;
extTable.erase(item->name);
it = order.erase(it);
if (cachedSz <= cacheSzLimit)
break;
if (item->ext.pendingWrite)
it++;
else
{
cachedSz -= item->ext.dataLength;
extTable.erase(item->name);
it = order.erase(it);
if (cachedSz <= cacheSzLimit)
break;
}
}
}
void doAdd(CTransactionItem *item)
Expand All @@ -1916,7 +1927,7 @@ class CExtCache
if (it != extTable.end())
{
Linked<CTransactionItem> &existingItem = *(it->second);
cachedSz -= existingItem->dataLength;
cachedSz -= existingItem->ext.dataLength;
existingItem.set(item);
order.splice(order.end(), order, it->second); // move to front of FIFO list
}
Expand All @@ -1925,7 +1936,7 @@ class CExtCache
auto listIt = order.insert(order.end(), item);
extTable[item->name] = listIt;
}
cachedSz += item->dataLength;
cachedSz += item->ext.dataLength;
if (cachedSz > cacheSzLimit)
purge();
}
Expand All @@ -1940,12 +1951,12 @@ class CExtCache
return;
MemoryAttr ma;
ma.set(sz, data);
CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach());
CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach(), false);
doAdd(item);
}
void add(CTransactionItem *item)
{
if (item->dataLength > cacheSzLimit)
if (item->ext.dataLength > cacheSzLimit)
return;
doAdd(item);
}
Expand All @@ -1956,6 +1967,8 @@ class CExtCache
if (mapIt != extTable.end())
{
auto listIter = mapIt->second;
Linked<CTransactionItem> &existingItem = *listIter;
cachedSz -= existingItem->ext.dataLength;
assertex(listIter != order.erase(listIter));
assertex(mapIt != extTable.erase(mapIt));
}
Expand All @@ -1968,7 +1981,7 @@ class CExtCache
return false;
Linked<CTransactionItem> item = *(it->second);
b.leave();
mb.append(item->dataLength, item->data);
mb.append(item->ext.dataLength, item->ext.data);
return true;
}
};
Expand Down Expand Up @@ -2258,10 +2271,10 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree)
void CDeltaWriter::addToQueue(CTransactionItem *item)
{
pending.push(item);
// add actual size for externals, and nonimal '100 byte' value for delta transactions
// add actual size for externals, and nominal '100 byte' value for delta transactions
// it will act. as a rough guide to appoaching size threshold. It is not worth
// synchronously preparing and serializing here (which will be done asynchronously later)
pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100;
pendingSz += (CTransactionItem::f_addext == item->type) ? item->ext.dataLength : 100;
size_t items = pending.size();
if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
{
Expand Down Expand Up @@ -6073,7 +6086,15 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const
doTimeComparison = false;
if (config.getPropBool("@lightweightCoalesce", true))
coalesce.setown(new CLightCoalesceThread(config, iStoreHelper));
size_t extCacheSize = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB) * 0x100000;

// if enabled (default), ensure the ext cache size is bigger than the pending delta writer cache
// to ensure that when the ext cache is full, there are transaction items to flush that aren't still
// pending on write.
unsigned extCacheSizeMB = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB);
unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB);
if (extCacheSizeMB && deltaTransactionMaxMemMB && (extCacheSizeMB <= deltaTransactionMaxMemMB))
extCacheSizeMB = deltaTransactionMaxMemMB + 1;
size32_t extCacheSize = extCacheSizeMB * 0x100000;
extCache.init(extCacheSize);
}

Expand Down

0 comments on commit 588dccf

Please sign in to comment.