diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 7b008f187b4..25c85376af5 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -90,6 +90,32 @@ enum MDFSRequestKind MDFS_MAX }; + +//-------------------------------------------------------------------------------------------------------------------- + +static unsigned dadfsHookId = 0; +static bool expertVerifyWriteSyncSizes = false; + +//Add any code for caching global configuration here... +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration) + { + Owned componentConfig = getComponentConfig(); + Owned globalConfig = getGlobalConfig(); + expertVerifyWriteSyncSizes = componentConfig->getPropBool("expert/@verifyWriteSyncSizes", globalConfig->getPropBool("expert/@verifyWriteSyncSizes", false)); + }; + dadfsHookId = installConfigUpdateHook(updateFunc, true); + return true; +} + +MODULE_EXIT() +{ + removeConfigUpdateHook(dadfsHookId); +} + +//-------------------------------------------------------------------------------------------------------------------- + // Mutex for physical operations (remove/rename) static CriticalSection physicalChange; @@ -3779,6 +3805,67 @@ protected: friend class CDistributedFilePart; clusters.kill(); } + //Ensure that enough time has passed from when the file was last modified for reads to be consistent + //Important for blob storage or remote, geographically synchronized storage + void checkWriteSync() + { + time_t modifiedTime = 0; + time_t now = 0; + + Owned iter = root->getElements("Cluster"); + ForEach(*iter) + { + const char * name = iter->query().queryProp("@name"); + unsigned marginMs = getWriteSyncMarginMs(name); + if (marginMs) + { + if (0 == modifiedTime) + { + CDateTime modified; + if (!getModificationTime(modified)) + return; + modifiedTime = modified.getSimple(); + } + + if (0 == now) + now = time(&now); + + //Round the elapsed time down - so that a change on the last ms of one time period does not count as a whole second of elapsed time + //This could be avoided if the modified time was more granular + unsigned __int64 elapsedMs = (now - modifiedTime) * 1000; + if (elapsedMs >= 1000) + elapsedMs -= 999; + + if (unlikely(elapsedMs < marginMs)) + { + LOG(MCuserProgress, "Delaying access to %s on %s for %ums to ensure write sync", queryLogicalName(), name, (unsigned)(marginMs - elapsedMs)); + if (expertVerifyWriteSyncSizes) + checkSizeConsistency("Before write sync delay"); + MilliSleep(marginMs - elapsedMs); + if (expertVerifyWriteSyncSizes) + checkSizeConsistency("After write sync delay"); + now = 0; // re-evaluate now - unlikely to actually happen + } + } + } + } + + void checkSizeConsistency(const char * when) + { + auto checkPartSize = [this,when](unsigned idx) -> void + { + IDistributedFilePart &part = queryPart(idx); + offset_t size = part.getDiskSize(false, false); + if (size != (offset_t)-1) + { + offset_t physicalSize = part.getDiskSize(true, true); + if (size != physicalSize) + OWARNLOG("%s: Part %d of file %s is inconsistent: logical size = %" I64F "d, physical size = %" I64F "d", when, idx, queryLogicalName(), size, physicalSize); + } + }; + asyncFor(numParts(), checkPartSize); + } + bool hasDirPerPart() const { return FileDescriptorFlags::none != (fileFlags & FileDescriptorFlags::dirperpart); @@ -8299,6 +8386,7 @@ IDistributedFile *CDistributedFileDirectory::dolookup(CDfsLogicalFileName &_logi } CDistributedFile *ret = new CDistributedFile(this,fcl.detach(),*logicalname,accessMode,user); // found ret->setSuperOwnerLock(superOwnerLock.detach()); + ret->checkWriteSync(); return ret; } // now super file diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 312019680bc..1d853f13619 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -603,6 +603,11 @@ "eclwatchVisible": { "type": "boolean" }, + "writeSyncMarginMs": { + "description": "Time that is required to elapse between writing a file and all read copies to be consistently updated", + "type": "integer", + "default": 0 + }, "components": {}, "prefix": {}, "subPath": {}, diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index b5a330dd775..8afe9594806 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7911,7 +7911,8 @@ static const std::array planeAttributeI { PlaneAttrType::integer, 1024, false, "blockedFileIOKB" }, // enum PlaneAttributeType::BlockedSequentialIO {0} { PlaneAttrType::integer, 1024, false, "blockedRandomIOKB" }, // enum PlaneAttributeType::blockedRandomIOKB {1} { PlaneAttrType::boolean, 0, true, "fileSyncWriteClose" }, // enum PlaneAttributeType::fileSyncWriteClose {2} - { PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" } // enum PlaneAttributeType::concurrentWriteSupport {3} + { PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" },// enum PlaneAttributeType::concurrentWriteSupport {3} + { PlaneAttrType::integer, 1, false, "writeSyncMarginMs" }, // enum PlaneAttributeType::WriteSyncMarginMs {4} }}; // {prefix, {key1: value1, key2: value2, ...}} @@ -8089,6 +8090,12 @@ bool getFileSyncWriteCloseEnabled(const char *planeName) return 0 != getPlaneAttributeValue(planeName, FileSyncWriteClose, defaultFileSyncWriteCloseEnabled ? 1 : 0); } +unsigned getWriteSyncMarginMs(const char * planeName) +{ + constexpr unsigned dft = 0; + return (unsigned)getPlaneAttributeValue(planeName, WriteSyncMarginMs, dft); +} + static constexpr bool defaultConcurrentWriteSupport = isContainerized() ? false : true; bool getConcurrentWriteSupported(const char *planeName) { diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index 543fb355e79..53080ff5f35 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -744,12 +744,14 @@ extern jlib_decl IPropertyTreeIterator * getRemoteStoragesIterator(); extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name); extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize); +//MORE: Should use enum class to avoid potential symbol clashes enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp { BlockedSequentialIO, BlockedRandomIO, FileSyncWriteClose, ConcurrentWriteSupport, + WriteSyncMarginMs, PlaneAttributeCount }; extern jlib_decl const char *getPlaneAttributeString(PlaneAttributeType attr); @@ -761,6 +763,7 @@ extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t d extern jlib_decl size32_t getBlockedRandomIOSize(const char *planeName, size32_t defaultSize=0); extern jlib_decl bool getFileSyncWriteCloseEnabled(const char *planeName); extern jlib_decl bool getConcurrentWriteSupported(const char *planeName); +extern jlib_decl unsigned getWriteSyncMarginMs(const char * planeName); //---- Pluggable file type related functions ----------------------------------------------