Skip to content

Commit

Permalink
HPCC-30564 Prevent pending write externals being flushed from cache.
Browse files Browse the repository at this point in the history
Flushing some externals from the external cache (when full), whilst
the transactions are still pending to written could cause subsequent
ext. cache lookups to attempt to look to disk for the external,
and fail with "Missing external file ".

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Oct 25, 2023
1 parent c0ec1dc commit 75938dd
Showing 1 changed file with 40 additions and 20 deletions.
60 changes: 40 additions & 20 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,17 +1070,22 @@ class CTransactionItem : public CSimpleInterface
{
void *data;
unsigned dataLength;
};
std::atomic<bool> pendingWrite;
} ext;
};

CTransactionItem(char *path, IPropertyTree *_deltaTree) : deltaTree(_deltaTree)
{
type = f_delta;
name = path;
}
CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data)
CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name)
{
type = f_delext;
}
CTransactionItem(char *_name, size32_t _dataLength, void *_data, bool _pendingWrite) : name(_name), ext{ _data, _dataLength, _pendingWrite }
{
type = data ? f_addext : f_delext;
type = f_addext;
}
~CTransactionItem()
{
Expand All @@ -1091,7 +1096,7 @@ class CTransactionItem : public CSimpleInterface
::Release(deltaTree);
break;
case f_addext:
free(data);
free(ext.data);
break;
case f_delext:
default:
Expand Down Expand Up @@ -1335,9 +1340,10 @@ class CDeltaWriter : implements IThreaded
// a cold restart between the two.
// NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore
// because the saved in-memory store has been altered at this point, i.e. it depends on the external file (see flush())
writeExt(dataPath, item->name, item->dataLength, item->data);
writeExt(dataPath, item->name, item->ext.dataLength, item->ext.data);
if (backupPath.length())
writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30);
writeExt(backupPath, item->name, item->ext.dataLength, item->ext.data, 60, 30);
item->ext.pendingWrite = false;
}
else
{
Expand Down Expand Up @@ -1416,7 +1422,7 @@ class CDeltaWriter : implements IThreaded
}
CTransactionItem *addExt(char *name, size32_t length, void *content)
{
CTransactionItem *item = new CTransactionItem(name, length, content);
CTransactionItem *item = new CTransactionItem(name, length, content, true);
CriticalBlock b(pendingCrit);
addToQueue(LINK(item));
return item;
Expand Down Expand Up @@ -1902,11 +1908,16 @@ class CExtCache
for (auto it = order.begin(); it != order.end();)
{
CTransactionItem *item = *it;
cachedSz -= item->dataLength;
extTable.erase(item->name);
it = order.erase(it);
if (cachedSz <= cacheSzLimit)
break;
if (item->ext.pendingWrite)
it++;
else
{
cachedSz -= item->ext.dataLength;
extTable.erase(item->name);
it = order.erase(it);
if (cachedSz <= cacheSzLimit)
break;
}
}
}
void doAdd(CTransactionItem *item)
Expand All @@ -1916,7 +1927,7 @@ class CExtCache
if (it != extTable.end())
{
Linked<CTransactionItem> &existingItem = *(it->second);
cachedSz -= existingItem->dataLength;
cachedSz -= existingItem->ext.dataLength;
existingItem.set(item);
order.splice(order.end(), order, it->second); // move to front of FIFO list
}
Expand All @@ -1925,7 +1936,7 @@ class CExtCache
auto listIt = order.insert(order.end(), item);
extTable[item->name] = listIt;
}
cachedSz += item->dataLength;
cachedSz += item->ext.dataLength;
if (cachedSz > cacheSzLimit)
purge();
}
Expand All @@ -1940,12 +1951,12 @@ class CExtCache
return;
MemoryAttr ma;
ma.set(sz, data);
CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach());
CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach(), false);
doAdd(item);
}
void add(CTransactionItem *item)
{
if (item->dataLength > cacheSzLimit)
if (item->ext.dataLength > cacheSzLimit)
return;
doAdd(item);
}
Expand All @@ -1968,7 +1979,7 @@ class CExtCache
return false;
Linked<CTransactionItem> item = *(it->second);
b.leave();
mb.append(item->dataLength, item->data);
mb.append(item->ext.dataLength, item->ext.data);
return true;
}
};
Expand Down Expand Up @@ -2258,10 +2269,10 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree)
void CDeltaWriter::addToQueue(CTransactionItem *item)
{
pending.push(item);
// add actual size for externals, and nonimal '100 byte' value for delta transactions
// add actual size for externals, and nominal '100 byte' value for delta transactions
// it will act. as a rough guide to appoaching size threshold. It is not worth
// synchronously preparing and serializing here (which will be done asynchronously later)
pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100;
pendingSz += (CTransactionItem::f_addext == item->type) ? item->ext.dataLength : 100;
size_t items = pending.size();
if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
{
Expand Down Expand Up @@ -6073,7 +6084,16 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const
doTimeComparison = false;
if (config.getPropBool("@lightweightCoalesce", true))
coalesce.setown(new CLightCoalesceThread(config, iStoreHelper));
size_t extCacheSize = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB) * 0x100000;

// if enabled (default), ensure the ext cache size is bigger than the pending delta writer cache
// to ensure that when the ext cache is full, there are transaction items to flush that aren't still
// pending on write.
unsigned extCacheSizeMB = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB);
unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB);
if (extCacheSizeMB && (extCacheSizeMB < deltaTransactionMaxMemMB))
extCacheSizeMB++;
size32_t extCacheSize = extCacheSizeMB * 0x100000;

extCache.init(extCacheSize);
}

Expand Down

0 comments on commit 75938dd

Please sign in to comment.