diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index 6a3055903fb..14f8c317c92 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -1070,7 +1070,8 @@ class CTransactionItem : public CSimpleInterface { void *data; unsigned dataLength; - }; + std::atomic pendingWrite; + } ext; }; CTransactionItem(char *path, IPropertyTree *_deltaTree) : deltaTree(_deltaTree) @@ -1078,9 +1079,13 @@ class CTransactionItem : public CSimpleInterface type = f_delta; name = path; } - CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data) + CTransactionItem(char *_name) : name(_name) + { + type = f_delext; + } + CTransactionItem(char *_name, size32_t _dataLength, void *_data, bool _pendingWrite) : name(_name), ext{ _data, _dataLength, _pendingWrite } { - type = data ? f_addext : f_delext; + type = f_addext; } ~CTransactionItem() { @@ -1091,7 +1096,7 @@ class CTransactionItem : public CSimpleInterface ::Release(deltaTree); break; case f_addext: - free(data); + free(ext.data); break; case f_delext: default: @@ -1335,9 +1340,10 @@ class CDeltaWriter : implements IThreaded // a cold restart between the two. // NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore // because the saved in-memory store has been altered at this point, i.e. it depends on the external file (see flush()) - writeExt(dataPath, item->name, item->dataLength, item->data); + writeExt(dataPath, item->name, item->ext.dataLength, item->ext.data); if (backupPath.length()) - writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30); + writeExt(backupPath, item->name, item->ext.dataLength, item->ext.data, 60, 30); + item->ext.pendingWrite = false; } else { @@ -1416,14 +1422,14 @@ class CDeltaWriter : implements IThreaded } CTransactionItem *addExt(char *name, size32_t length, void *content) { - CTransactionItem *item = new CTransactionItem(name, length, content); + CTransactionItem *item = new CTransactionItem(name, length, content, true); CriticalBlock b(pendingCrit); addToQueue(LINK(item)); return item; } void removeExt(char *name) { - CTransactionItem *item = new CTransactionItem(name, 0, nullptr); + CTransactionItem *item = new CTransactionItem(name); CriticalBlock b(pendingCrit); addToQueue(item); } @@ -1902,11 +1908,16 @@ class CExtCache for (auto it = order.begin(); it != order.end();) { CTransactionItem *item = *it; - cachedSz -= item->dataLength; - extTable.erase(item->name); - it = order.erase(it); - if (cachedSz <= cacheSzLimit) - break; + if (item->ext.pendingWrite) + it++; + else + { + cachedSz -= item->ext.dataLength; + extTable.erase(item->name); + it = order.erase(it); + if (cachedSz <= cacheSzLimit) + break; + } } } void doAdd(CTransactionItem *item) @@ -1916,7 +1927,7 @@ class CExtCache if (it != extTable.end()) { Linked &existingItem = *(it->second); - cachedSz -= existingItem->dataLength; + cachedSz -= existingItem->ext.dataLength; existingItem.set(item); order.splice(order.end(), order, it->second); // move to front of FIFO list } @@ -1925,7 +1936,7 @@ class CExtCache auto listIt = order.insert(order.end(), item); extTable[item->name] = listIt; } - cachedSz += item->dataLength; + cachedSz += item->ext.dataLength; if (cachedSz > cacheSzLimit) purge(); } @@ -1940,12 +1951,12 @@ class CExtCache return; MemoryAttr ma; ma.set(sz, data); - CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach()); + CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach(), false); doAdd(item); } void add(CTransactionItem *item) { - if (item->dataLength > cacheSzLimit) + if (item->ext.dataLength > cacheSzLimit) return; doAdd(item); } @@ -1956,6 +1967,8 @@ class CExtCache if (mapIt != extTable.end()) { auto listIter = mapIt->second; + Linked &existingItem = *listIter; + cachedSz -= existingItem->ext.dataLength; assertex(listIter != order.erase(listIter)); assertex(mapIt != extTable.erase(mapIt)); } @@ -1968,7 +1981,7 @@ class CExtCache return false; Linked item = *(it->second); b.leave(); - mb.append(item->dataLength, item->data); + mb.append(item->ext.dataLength, item->ext.data); return true; } }; @@ -2258,10 +2271,10 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree) void CDeltaWriter::addToQueue(CTransactionItem *item) { pending.push(item); - // add actual size for externals, and nonimal '100 byte' value for delta transactions + // add actual size for externals, and nominal '100 byte' value for delta transactions // it will act. as a rough guide to appoaching size threshold. It is not worth // synchronously preparing and serializing here (which will be done asynchronously later) - pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100; + pendingSz += (CTransactionItem::f_addext == item->type) ? item->ext.dataLength : 100; size_t items = pending.size(); if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit)) { @@ -6073,7 +6086,15 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const doTimeComparison = false; if (config.getPropBool("@lightweightCoalesce", true)) coalesce.setown(new CLightCoalesceThread(config, iStoreHelper)); - size_t extCacheSize = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB) * 0x100000; + + // if enabled (default), ensure the ext cache size is bigger than the pending delta writer cache + // to ensure that when the ext cache is full, there are transaction items to flush that aren't still + // pending on write. + unsigned extCacheSizeMB = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB); + unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB); + if (extCacheSizeMB && deltaTransactionMaxMemMB && (extCacheSizeMB < deltaTransactionMaxMemMB)) + extCacheSizeMB = deltaTransactionMaxMemMB + 1; + size32_t extCacheSize = extCacheSizeMB * 0x100000; extCache.init(extCacheSize); }