From 8e7d4515edb605c1862d2b825bc94e93a30dfcd4 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Thu, 28 Nov 2024 10:23:32 +0000 Subject: [PATCH] HPCC-33043 Ensure blob storage files are completely written before being read Signed-off-by: Gavin Halliday --- dali/base/dadfs.cpp | 42 ++++++++++++++++++++++++++++++++++++ helm/hpcc/values.schema.json | 5 +++++ system/jlib/jfile.cpp | 9 +++++++- system/jlib/jfile.hpp | 3 +++ 4 files changed, 58 insertions(+), 1 deletion(-) diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 7b008f187b4..005767ac263 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -3779,6 +3779,47 @@ protected: friend class CDistributedFilePart; clusters.kill(); } + //Ensure that enough time has passed from when the file was last modified for reads to be consistent + //Important for blob storage or remote, geographically synchronized storage + void checkWriteSync() + { + time_t modifiedTime = 0; + time_t now = 0; + + Owned iter = root->getElements("Cluster"); + ForEach(*iter) + { + const char * name = iter->query().queryProp("@name"); + unsigned marginMs = getWriteSyncMarginMs(name); + if (marginMs) + { + if (0 == modifiedTime) + { + CDateTime modified; + if (!getModificationTime(modified)) + return; + modifiedTime = modified.getSimple(); + } + + if (0 == now) + now = time(&now); + + //Round the elapsed time down - so that a change on the last ms of one time period does not count as a whole second of elapsed time + //This could be avoided if the modified time was more granular + unsigned __int64 elapsedMs = (now - modifiedTime) * 1000; + if (elapsedMs >= 1000) + elapsedMs -= 999; + + if (unlikely(elapsedMs < marginMs)) + { + LOG(MCuserProgress, "Delaying access to %s on %s for %ums to ensure write sync", queryLogicalName(), name, (unsigned)(marginMs - elapsedMs)); + MilliSleep(marginMs - elapsedMs); + now = 0; // re-evaluate now - unlikely to actually happen + } + } + } + } + bool hasDirPerPart() const { return FileDescriptorFlags::none != (fileFlags & FileDescriptorFlags::dirperpart); @@ -8299,6 +8340,7 @@ IDistributedFile *CDistributedFileDirectory::dolookup(CDfsLogicalFileName &_logi } CDistributedFile *ret = new CDistributedFile(this,fcl.detach(),*logicalname,accessMode,user); // found ret->setSuperOwnerLock(superOwnerLock.detach()); + ret->checkWriteSync(); return ret; } // now super file diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 312019680bc..1d853f13619 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -603,6 +603,11 @@ "eclwatchVisible": { "type": "boolean" }, + "writeSyncMarginMs": { + "description": "Time that is required to elapse between writing a file and all read copies to be consistently updated", + "type": "integer", + "default": 0 + }, "components": {}, "prefix": {}, "subPath": {}, diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index b5a330dd775..8afe9594806 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7911,7 +7911,8 @@ static const std::array planeAttributeI { PlaneAttrType::integer, 1024, false, "blockedFileIOKB" }, // enum PlaneAttributeType::BlockedSequentialIO {0} { PlaneAttrType::integer, 1024, false, "blockedRandomIOKB" }, // enum PlaneAttributeType::blockedRandomIOKB {1} { PlaneAttrType::boolean, 0, true, "fileSyncWriteClose" }, // enum PlaneAttributeType::fileSyncWriteClose {2} - { PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" } // enum PlaneAttributeType::concurrentWriteSupport {3} + { PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" },// enum PlaneAttributeType::concurrentWriteSupport {3} + { PlaneAttrType::integer, 1, false, "writeSyncMarginMs" }, // enum PlaneAttributeType::WriteSyncMarginMs {4} }}; // {prefix, {key1: value1, key2: value2, ...}} @@ -8089,6 +8090,12 @@ bool getFileSyncWriteCloseEnabled(const char *planeName) return 0 != getPlaneAttributeValue(planeName, FileSyncWriteClose, defaultFileSyncWriteCloseEnabled ? 1 : 0); } +unsigned getWriteSyncMarginMs(const char * planeName) +{ + constexpr unsigned dft = 0; + return (unsigned)getPlaneAttributeValue(planeName, WriteSyncMarginMs, dft); +} + static constexpr bool defaultConcurrentWriteSupport = isContainerized() ? false : true; bool getConcurrentWriteSupported(const char *planeName) { diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index 543fb355e79..53080ff5f35 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -744,12 +744,14 @@ extern jlib_decl IPropertyTreeIterator * getRemoteStoragesIterator(); extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name); extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize); +//MORE: Should use enum class to avoid potential symbol clashes enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp { BlockedSequentialIO, BlockedRandomIO, FileSyncWriteClose, ConcurrentWriteSupport, + WriteSyncMarginMs, PlaneAttributeCount }; extern jlib_decl const char *getPlaneAttributeString(PlaneAttributeType attr); @@ -761,6 +763,7 @@ extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t d extern jlib_decl size32_t getBlockedRandomIOSize(const char *planeName, size32_t defaultSize=0); extern jlib_decl bool getFileSyncWriteCloseEnabled(const char *planeName); extern jlib_decl bool getConcurrentWriteSupported(const char *planeName); +extern jlib_decl unsigned getWriteSyncMarginMs(const char * planeName); //---- Pluggable file type related functions ----------------------------------------------