diff --git a/common/pkgfiles/CMakeLists.txt b/common/pkgfiles/CMakeLists.txt index 23540c6451a..3797d1cbe91 100644 --- a/common/pkgfiles/CMakeLists.txt +++ b/common/pkgfiles/CMakeLists.txt @@ -39,6 +39,7 @@ include_directories ( ${HPCC_SOURCE_DIR}/rtl/include ${HPCC_SOURCE_DIR}/rtl/eclrtl ${HPCC_SOURCE_DIR}/common/workunit + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ) if (NOT CONTAINERIZED) @@ -58,6 +59,7 @@ if(NOT PLUGIN) target_link_libraries( pkgfiles dfuwu + ws_dfsclient ) if (NOT CONTAINERIZED) target_link_libraries(pkgfiles environment) diff --git a/common/pkgfiles/pkgimpl.hpp b/common/pkgfiles/pkgimpl.hpp index 17c53308cc4..bda48aa24ca 100644 --- a/common/pkgfiles/pkgimpl.hpp +++ b/common/pkgfiles/pkgimpl.hpp @@ -85,7 +85,7 @@ class PKGFILES_API CPackageNode : implements IHpccPackage, public CInterface inline StringBuffer &makeSuperFileXPath(StringBuffer &xpath, const char *superFileName) const { - superFileName = skipForeign(superFileName); + superFileName = skipForeignOrRemote(superFileName); return xpath.append("SuperFile[@id='").appendLower(strlen(superFileName), superFileName).append("']"); } diff --git a/common/pkgfiles/referencedfilelist.cpp b/common/pkgfiles/referencedfilelist.cpp index 6a2d4b035fc..7350eb7688c 100644 --- a/common/pkgfiles/referencedfilelist.cpp +++ b/common/pkgfiles/referencedfilelist.cpp @@ -30,6 +30,7 @@ #include "environment.hpp" #endif +#include "ws_dfsclient.hpp" #define WF_LOOKUP_TIMEOUT (1000*15) // 15 seconds @@ -56,32 +57,33 @@ bool checkForeign(const char *lfn) } return false; } -const char *skipForeign(const char *name, StringBuffer *ip) + +static const char *skipNextLfnScope(const char *lfn, StringBuffer *s) +{ + const char *sep = strstr(lfn, "::"); + if (sep) + { + if (s) + s->append(sep-lfn, lfn).trim(); + sep += 2; + while (*sep == ' ') + sep++; + return sep; + } + return lfn; +} +const char *skipForeignOrRemote(const char *name, StringBuffer *ip, StringBuffer *remote) { unsigned maxTildas = 2; while (maxTildas-- && *name=='~') name++; - const char *d1 = strstr(name, "::"); - if (d1) + const char *sep = strstr(name, "::"); + if (sep) { - StringBuffer cmp; - if (strieq("foreign", cmp.append(d1-name, name).trim().str())) - { - // foreign scope - need to strip off the ip and port - d1 += 2; // skip :: - - const char *d2 = strstr(d1,"::"); - if (d2) - { - if (ip) - ip->append(d2-d1, d1).trim(); - d2 += 2; - while (*d2 == ' ') - d2++; - - name = d2; - } - } + if (strnicmp("remote", name, sep-name)==0) + return skipNextLfnScope(sep+2, remote); + else if (strnicmp("foreign", name, sep-name)==0) + return skipNextLfnScope(sep+2, ip); } return name; } @@ -135,13 +137,15 @@ class ReferencedFile : implements IReferencedFile, public CInterface { { //Scope ensures strings are assigned - StringAttrBuilder logicalNameText(logicalName), daliipText(daliip); - logicalNameText.set(skipForeign(lfn, &daliipText)).toLowerCase(); + StringAttrBuilder logicalNameText(logicalName), daliipText(daliip), remoteStorageText(remoteStorage); + logicalNameText.set(skipForeignOrRemote(lfn, &daliipText, &remoteStorageText)).toLowerCase(); } - if (daliip.length()) - flags |= RefFileForeign; + if (remoteStorage.length()) + flags |= RefFileLFNRemote; + else if (daliip.length()) + flags |= RefFileLFNForeign; else - daliip.set(sourceIP); + daliip.set(sourceIP); // can be declared in packagemap at different scopes fileSrcCluster.set(srcCluster); filePrefix.set(prefix); if (isSubFile) @@ -150,24 +154,27 @@ class ReferencedFile : implements IReferencedFile, public CInterface void reset() { - flags &= ~(RefFileNotOnCluster | RefFileNotFound | RefFileRemote | RefFileCopyInfoFailed | RefFileCloned | RefFileNotOnSource); //these flags are calculated during resolve + flags &= ~(RefFileNotOnCluster | RefFileNotFound | RefFileResolvedForeign | RefFileResolvedRemote | RefFileCopyInfoFailed | RefFileCloned | RefFileNotOnSource); //these flags are calculated during resolve } - IPropertyTree *getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix); - IPropertyTree *getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix); + IPropertyTree *getRemoteStorageFileTree(IUserDescriptor *user, const char *remoteStorageName, const char *remotePrefix); + IPropertyTree *getForeignFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix); + IPropertyTree *getFileOrProvidedForeignFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix); void processLocalFileInfo(IDistributedFile *df, const StringArray &locations, const char *srcCluster, StringArray *subfiles); void processLocalFileInfo(IDistributedFile *df, const char *dstCluster, const char *srcCluster, StringArray *subfiles); + void processForeignFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles); void processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles); void resolveLocal(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles); void resolveLocal(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles); - void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false); - void resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign=false); + void resolveRemote(IUserDescriptor *user, const char *remoteStorage, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign=false); + void resolveForeign(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign=false); + void resolveForeign(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign=false); - void resolve(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveForeign=false); - void resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveForeign=false); + void resolveLocalOrRemote(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, const char *remoteStorage, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveLFNForeign=false); + void resolveLocalOrForeign(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool trackSubFiles, bool resolveLFNForeign=false); virtual bool needsCopying(bool cloneForeign) const override; @@ -175,7 +182,7 @@ class ReferencedFile : implements IReferencedFile, public CInterface virtual unsigned getFlags() const {return flags;} virtual const SocketEndpoint &getForeignIP(SocketEndpoint &ep) const { - if ((flags & RefFileForeign) && daliip.length()) + if ((flags & RefFileLFNForeign) && daliip.length()) ep.set(daliip.str()); else ep.set(NULL); @@ -203,6 +210,7 @@ class ReferencedFile : implements IReferencedFile, public CInterface StringAttr logicalName; StringAttr pkgid; StringAttr daliip; + StringAttr remoteStorage; StringAttr filePrefix; StringAttr fileSrcCluster; __int64 fileSize; @@ -256,9 +264,9 @@ class ReferencedFileList : implements IReferencedFileList, public CInterface cloneFileInfo(publisherWuid, dstCluster, updateFlags, helper, cloneSuperInfo, cloneForeign, redundancy, channelsPerNode, replicateOffset, defReplicateFolder); cloneRelationships(); } - virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false) override; + virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char *_remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage) override; - void resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign); + void resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveLFNForeign); virtual bool filesNeedCopying(bool cloneForeign); virtual void setDfuQueue(const char *queue) override { @@ -270,6 +278,7 @@ class ReferencedFileList : implements IReferencedFileList, public CInterface Owned user; Owned remote; MapStringToMyClass map; + StringAttr remoteStorage; StringAttr srcCluster; StringAttr remotePrefix; StringAttr jobName; //used to populate DFU job name, but could be used elsewhere @@ -335,9 +344,9 @@ void ReferencedFile::processLocalFileInfo(IDistributedFile *df, const char *dstC processLocalFileInfo(df, locations, srcCluster, subfiles); } -void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles) +void ReferencedFile::processForeignFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles) { - flags |= RefFileRemote; + flags |= RefFileResolvedForeign; if (fileSrcCluster.length()) srcCluster = fileSrcCluster; if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile))) @@ -350,7 +359,7 @@ void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcC { const char *lfn = it->query().queryProp("@name"); StringBuffer foreignLfn; - if (flags & RefFileForeign) + if (flags & RefFileLFNForeign) lfn = foreignLfn.append("foreign::").append(this->daliip).append("::").append(lfn).str(); subfiles->append(lfn); if (trackSubFiles) @@ -367,7 +376,40 @@ void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcC numParts = tree->getPropInt("@numparts", 0); fileSize = tree->getPropInt64("Attr/@size", 0); } +} + +void ReferencedFile::processRemoteFileTree(IPropertyTree *tree, const char *srcCluster, StringArray *subfiles) +{ + flags |= RefFileResolvedRemote; + if (fileSrcCluster.length()) + srcCluster = fileSrcCluster; + if (streq(tree->queryName(), queryDfsXmlBranchName(DXB_SuperFile))) + { + flags |= RefFileSuper; + if (subfiles) + { + Owned it = tree->getElements("SubFile"); + ForEach(*it) + { + const char *lfn = it->query().queryProp("@name"); + StringBuffer remoteLfn; + if (flags & RefFileLFNForeign) + lfn = remoteLfn.append("remote::").append(this->remoteStorage).append("::").append(lfn).str(); + subfiles->append(lfn); + if (trackSubFiles) + subFileNames.append(lfn); + } + } + } + else if (srcCluster && *srcCluster) + { + VStringBuffer xpath("Cluster[@name='%s']", srcCluster); + if (!tree->hasProp(xpath)) + flags |= RefFileNotOnSource; + numParts = tree->getPropInt("@numparts", 0); + fileSize = tree->getPropInt64("Attr/@size", 0); + } } void ReferencedFile::resolveLocal(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, StringArray *subfiles) @@ -398,46 +440,46 @@ void ReferencedFile::resolveLocal(const char *dstCluster, const char *srcCluster resolveLocal(locations, srcCluster, user, subfiles); } -IPropertyTree *ReferencedFile::getRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix) +IPropertyTree *ReferencedFile::getForeignFileTree(IUserDescriptor *user, INode *daliNode, const char *remotePrefix) { - if (!remote) + if (!daliNode) return NULL; StringBuffer remoteLFN; if (remotePrefix && *remotePrefix) remoteLFN.append(remotePrefix).append("::").append(logicalName); - return queryDistributedFileDirectory().getFileTree(remoteLFN.length() ? remoteLFN.str() : logicalName.str(), user, remote, WF_LOOKUP_TIMEOUT, GetFileTreeOpts::none); + return queryDistributedFileDirectory().getFileTree(remoteLFN.length() ? remoteLFN.str() : logicalName.str(), user, daliNode, WF_LOOKUP_TIMEOUT, GetFileTreeOpts::none); } -IPropertyTree *ReferencedFile::getSpecifiedOrRemoteFileTree(IUserDescriptor *user, INode *remote, const char *remotePrefix) +IPropertyTree *ReferencedFile::getFileOrProvidedForeignFileTree(IUserDescriptor *user, INode *providedDaliNode, const char *remotePrefix) { if (daliip.length()) { - Owned daliNode; - daliNode.setown(createINode(daliip)); - return getRemoteFileTree(user, daliNode, filePrefix); + Owned fileDaliNode; + fileDaliNode.setown(createINode(daliip)); + return getForeignFileTree(user, fileDaliNode, filePrefix); } - if (!remote) + if (!providedDaliNode) return NULL; StringBuffer remoteLFN; - Owned fileTree = getRemoteFileTree(user, remote, remotePrefix); + Owned fileTree = getForeignFileTree(user, providedDaliNode, remotePrefix); if (!fileTree) return NULL; StringAttrBuilder daliipText(daliip); - remote->endpoint().getEndpointHostText(daliipText); + providedDaliNode->endpoint().getEndpointHostText(daliipText); filePrefix.set(remotePrefix); return fileTree.getClear(); } -void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign) +void ReferencedFile::resolveForeign(IUserDescriptor *user, INode *remote, const char *remotePrefix, const char *dstCluster, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign) { StringArray locations; if (!isEmptyString(dstCluster)) locations.append(dstCluster); - resolveRemote(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveForeign); + resolveForeign(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveLFNForeign); } -void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveForeign) +void ReferencedFile::resolveForeign(IUserDescriptor *user, INode *remote, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign) { - if ((flags & RefFileForeign) && !resolveForeign && !trackSubFiles) + if ((flags & RefFileLFNForeign) && !resolveLFNForeign && !trackSubFiles) return; if (flags & RefFileInPackage) return; @@ -456,10 +498,12 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c return; } } - Owned tree = getSpecifiedOrRemoteFileTree(user, remote, remotePrefix); + Owned tree = getFileOrProvidedForeignFileTree(user, remote, remotePrefix); if (tree) { - processRemoteFileTree(tree, srcCluster, subfiles); + DBGLOG("RemoteDALI xml"); + dbglogXML(tree, 1); + processForeignFileTree(tree, srcCluster, subfiles); return; } else if (!checkLocalFirst && (!srcCluster || !*srcCluster)) //haven't already checked and not told to use a specific copy @@ -474,21 +518,89 @@ void ReferencedFile::resolveRemote(IUserDescriptor *user, INode *remote, const c DBGLOG("Remote ReferencedFile not found %s [dali=%s, remote=%s, prefix=%s]", logicalName.str(), daliip.get(), remote ? remote->endpoint().getEndpointHostText(dest).str() : nullptr, remotePrefix); } -void ReferencedFile::resolve(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign) + +IPropertyTree *ReferencedFile::getRemoteStorageFileTree(IUserDescriptor *user, const char *remoteStorageName, const char *remotePrefix) +{ + VStringBuffer remoteLFN("remote::%s", remoteStorageName); + if (remotePrefix && *remotePrefix) + remoteLFN.append("::").append(remotePrefix); + remoteLFN.append("::").append(logicalName); + + Owned dfsFile = wsdfs::lookupDFSFile(remoteLFN.str(), AccessMode::readSequential, INFINITE, wsdfs::keepAliveExpiryFrequency, user); + IPropertyTree *tree = (dfsFile) ? dfsFile->queryFileMeta() : nullptr; + return tree ? tree->getPropTree("File") : nullptr; +} + +void ReferencedFile::resolveRemote(IUserDescriptor *user, const char *remoteStorageName, const char *remotePrefix, const StringArray &locations, const char *srcCluster, bool checkLocalFirst, StringArray *subfiles, bool resolveLFNForeign) +{ + if (isEmptyString(remoteStorageName)) + return; + if ((flags & RefFileLFNForeign) && !resolveLFNForeign && !trackSubFiles) + return; + if (flags & RefFileInPackage) + return; + if (noDfsResolution) + { + flags |= RefFileNotFound; + return; + } + reset(); + if (checkLocalFirst) //usually means we don't want to overwrite existing file info + { + Owned df = queryDistributedFileDirectory().lookup(logicalName.str(), user, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser); + if(df) + { + processLocalFileInfo(df, locations, NULL, subfiles); + return; + } + } + Owned tree = getRemoteStorageFileTree(user, remoteStorageName, remotePrefix); + if (tree) + { + remoteStorage.set(remoteStorageName); + DBGLOG("RemoteStorage xml"); + dbglogXML(tree, 1); + processRemoteFileTree(tree, srcCluster, subfiles); + return; + } + else if (!checkLocalFirst && (!srcCluster || !*srcCluster)) //haven't already checked and not told to use a specific copy + { + resolveLocal(locations, srcCluster, user, subfiles); + return; + } + + flags |= RefFileNotFound; + + StringBuffer dest; + DBGLOG("RemoteStorage ReferencedFile not found %s [remoteStorage=%s, prefix=%s]", logicalName.str(), remoteStorageName, remotePrefix); +} + +void ReferencedFile::resolveLocalOrRemote(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, const char *remoteStorageName, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveLFNForeign) +{ + trackSubFiles = _trackSubFiles; + if (!isEmptyString(remoteStorageName)) + resolveRemote(user, remoteStorageName, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveLFNForeign); + else + resolveLocal(locations, srcCluster, user, subfiles); +} + +void ReferencedFile::resolveLocalOrForeign(const StringArray &locations, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveLFNForeign) { trackSubFiles = _trackSubFiles; if (daliip.length() || remote) - resolveRemote(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveForeign); + resolveForeign(user, remote, remotePrefix, locations, srcCluster, checkLocalFirst, subfiles, resolveLFNForeign); else resolveLocal(locations, srcCluster, user, subfiles); } -void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveForeign) +/* +void ReferencedFile::resolve(const char *dstCluster, const char *srcCluster, IUserDescriptor *user, INode *remote, const char *remotePrefix, bool checkLocalFirst, StringArray *subfiles, bool _trackSubFiles, bool resolveLFNForeign) { StringArray locations; if (!isEmptyString(dstCluster)) locations.append(dstCluster); } +*/ static void setRoxieClusterPartDiskMapping(const char *clusterName, const char *defaultFolder, const char *defaultReplicateFolder, bool supercopy, IDFUfileSpec *wuFSpecDest, IDFUoptions *wuOptions) { @@ -530,7 +642,7 @@ static void getDefaultDFUName(StringBuffer &dfuQueueName) #endif } -static void dfuCopy(const IPropertyTree *directories, IDFUWorkUnit *publisherWu, IUserDescriptor *user, const char *sourceLogicalName, const char *destLogicalName, const char *destPlane, const char *srcDali, bool supercopy, bool overwrite, bool preserveCompression, bool nosplit) +static void dfuCopy(const IPropertyTree *directories, IDFUWorkUnit *publisherWu, IUserDescriptor *user, const char *sourceLogicalName, const char *destLogicalName, const char *destPlane, const char *srcLocation, bool supercopy, bool overwrite, bool preserveCompression, bool nosplit, bool useRemoteStorage) { if(!publisherWu) throw makeStringException(-1, "Failed to create Publisher DFU Workunit."); @@ -541,23 +653,40 @@ static void dfuCopy(const IPropertyTree *directories, IDFUWorkUnit *publisherWu, if(isEmptyString(destPlane)) throw makeStringException(-1, "Destination node group not specified."); - PROGLOG("Copy from [%s] %s to %s", isEmptyString(srcDali) ? "local" : srcDali, sourceLogicalName, destLogicalName); + PROGLOG("Copy from %s[%s] %s to %s", (!isEmptyString(srcLocation) && useRemoteStorage) ? "remote" : "", isEmptyString(srcLocation) ? "local" : srcLocation, sourceLogicalName, destLogicalName); StringBuffer destFolder, destTitle, defaultFolder, defaultReplicateFolder; DfuParseLogicalPath(directories, destLogicalName, destPlane, destFolder, destTitle, defaultFolder, defaultReplicateFolder); CDfsLogicalFileName logicalName; logicalName.set(sourceLogicalName); - if (!isEmptyString(srcDali)) + if (!isEmptyString(srcLocation)) { - SocketEndpoint ep(srcDali); - if (ep.isNull()) - throw MakeStringException(-1, "ReferencedFile Copy %s: cannot resolve SourceDali network IP from %s.", sourceLogicalName, srcDali); + if (useRemoteStorage) + { + StringBuffer remoteSpec; + VStringBuffer lfn("remote::%s::", srcLocation); + if (logicalName.isRemote()) + logicalName.getRemoteSpec(remoteSpec, lfn); + else + { + logicalName.clearForeign(); + logicalName.get(lfn); + } + logicalName.set(lfn); + sourceLogicalName = logicalName.get(); + } + else + { + SocketEndpoint ep(srcLocation); + if (ep.isNull()) + throw MakeStringException(-1, "ReferencedFile Copy %s: cannot resolve SourceDali network IP from %s.", sourceLogicalName, srcLocation); - logicalName.setForeign(ep,false); + logicalName.setForeign(ep,false); + } } - - Owned file = queryDistributedFileDirectory().lookup(logicalName, user, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser); + //Pass forceAllowForeign=true because: 1. we need backward compatibility of configurations, 2. this may be the only place we want to allow foreiegn (to copy files in) + Owned file = wsdfs::lookup(logicalName, user, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE, true); if (!file) throw MakeStringException(-1, "ReferencedFile failed to find file: %s", logicalName.get()); @@ -589,10 +718,13 @@ static void dfuCopy(const IPropertyTree *directories, IDFUWorkUnit *publisherWu, IDFUfileSpec *wuFSpecDest = wu->queryUpdateDestination(); IDFUoptions *wuOptions = wu->queryUpdateOptions(); wuFSpecSource->setLogicalName(sourceLogicalName); - if (!isEmptyString(srcDali)) + if (!isEmptyString(srcLocation)) { - SocketEndpoint ep(srcDali); - wuFSpecSource->setForeignDali(ep); + if (!useRemoteStorage) + { + SocketEndpoint ep(srcLocation); + wuFSpecSource->setForeignDali(ep); + } if (user) { @@ -619,9 +751,9 @@ bool ReferencedFile::needsCopying(bool cloneForeign) const { if ((flags & RefFileCloned) || (flags & RefFileSuper) || (flags & RefFileInPackage)) return false; - if ((flags & RefFileForeign) && !cloneForeign) + if ((flags & RefFileLFNForeign) && !cloneForeign) return false; - if (!(flags & (RefFileRemote | RefFileForeign | RefFileNotOnCluster))) + if (!(flags & (RefFileResolvedForeign | RefFileResolvedRemote | RefFileLFNForeign | RefFileLFNRemote | RefFileNotOnCluster))) return false; return true; } @@ -643,9 +775,9 @@ void ReferencedFile::cloneInfo(const IPropertyTree *directories, IDFUWorkUnit *p bool dfucopy = (updateFlags & DFU_UPDATEF_COPY)!=0; if (!dfucopy) //Whether remote or on a local plane if we get here the file is not on a plane that roxie considers an direct access plane, so if we're in copy data mode the the file will be copied - helper->cloneRoxieSubFile(srcLFN, srcCluster, logicalName, dstCluster, filePrefix, redundancy, channelsPerNode, replicateOffset, defReplicateFolder, user, daliip, updateFlags, false); + helper->cloneRoxieSubFile(srcLFN, srcCluster, logicalName, dstCluster, filePrefix, redundancy, channelsPerNode, replicateOffset, defReplicateFolder, user, daliip, remoteStorage, updateFlags, false); else - dfuCopy(directories, publisherWu, user, srcLFN, logicalName, dstCluster, daliip, false, (updateFlags & DFU_UPDATEF_OVERWRITE)!=0, true, false); + dfuCopy(directories, publisherWu, user, srcLFN, logicalName, dstCluster, remoteStorage.isEmpty() ? daliip : remoteStorage, false, (updateFlags & DFU_UPDATEF_OVERWRITE)!=0, true, false, !remoteStorage.isEmpty()); flags |= RefFileCloned; } @@ -664,12 +796,12 @@ void ReferencedFile::cloneInfo(const IPropertyTree *directories, IDFUWorkUnit *p void ReferencedFile::cloneSuperInfo(IDFUWorkUnit *publisherWu, unsigned updateFlags, ReferencedFileList *list, IUserDescriptor *user, INode *remote) { - if ((flags & RefFileCloned) || (flags & RefFileInPackage) || !(flags & RefFileSuper) || !(flags & RefFileRemote)) + if ((flags & RefFileCloned) || (flags & RefFileInPackage) || !(flags & RefFileSuper) || !(flags & (RefFileResolvedForeign | RefFileResolvedRemote))) return; try { - Owned tree = getSpecifiedOrRemoteFileTree(user, remote, NULL); + Owned tree = getFileOrProvidedForeignFileTree(user, remote, NULL); if (!tree) return; @@ -904,7 +1036,7 @@ void ReferencedFileList::addFilesFromWorkUnit(IConstWorkUnit *cw) addFilesFromQuery(cw, NULL, NULL); } -void ReferencedFileList::resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveForeign) +void ReferencedFileList::resolveSubFiles(StringArray &subfiles, const StringArray &locations, bool checkLocalFirst, bool trackSubFiles, bool resolveLFNForeign) { StringArray childSubFiles; ForEachItemIn(i, subfiles) @@ -916,30 +1048,45 @@ void ReferencedFileList::resolveSubFiles(StringArray &subfiles, const StringArra Owned file = new ReferencedFile(lfn, NULL, NULL, NULL, true, 0, NULL, false, allowSizeCalc); if (file->logicalName.length() && !map.getValue(file->getLogicalName())) { - file->resolve(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveForeign); + if (remoteStorage.isEmpty()) + file->resolveLocalOrForeign(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveLFNForeign); + else + file->resolveLocalOrRemote(locations, srcCluster, user, remoteStorage, remotePrefix, checkLocalFirst, &childSubFiles, trackSubFiles, resolveLFNForeign); const char *ln = file->getLogicalName(); // NOTE: setValue links its parameter map.setValue(ln, file); } } if (childSubFiles.length()) - resolveSubFiles(childSubFiles, locations, checkLocalFirst, trackSubFiles, resolveForeign); + resolveSubFiles(childSubFiles, locations, checkLocalFirst, trackSubFiles, resolveLFNForeign); } -void ReferencedFileList::resolveFiles(const StringArray &locations, const char *remoteIP, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool trackSubFiles, bool resolveForeign) +void ReferencedFileList::resolveFiles(const StringArray &locations, const char *remoteLocation, const char *_remotePrefix, const char *_srcCluster, bool checkLocalFirst, bool expandSuperFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage) { - remote.setown((remoteIP && *remoteIP) ? createINode(remoteIP, 7070) : NULL); + StringArray subfiles; srcCluster.set(_srcCluster); remotePrefix.set(_remotePrefix); - StringArray subfiles; + if (useRemoteStorage) + { + if (!user) + user.setown(createUserDescriptor()); + remoteStorage.set(remoteLocation); + ReferencedFileIterator files(this); + ForEach(files) + files.queryObject().resolveLocalOrRemote(locations, srcCluster, user, remoteStorage, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign); + } + else { + remote.setown(!isEmptyString(remoteLocation) ? createINode(remoteLocation, 7070) : nullptr); + ReferencedFileIterator files(this); ForEach(files) - files.queryObject().resolve(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveForeign); + files.queryObject().resolveLocalOrForeign(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign); } + if (expandSuperFiles) - resolveSubFiles(subfiles, locations, checkLocalFirst, trackSubFiles, resolveForeign); + resolveSubFiles(subfiles, locations, checkLocalFirst, trackSubFiles, resolveLFNForeign); } bool ReferencedFileList::filesNeedCopying(bool cloneForeign) @@ -1031,7 +1178,7 @@ void ReferencedFileList::cloneRelationships() ForEach(files) { ReferencedFile &file = files.queryObject(); - if (!(file.getFlags() & RefFileRemote)) + if (!(file.getFlags() & (RefFileResolvedForeign | RefFileResolvedRemote))) continue; Owned iter = dir.lookupFileRelationships(file.getLogicalName(), NULL, NULL, NULL, NULL, NULL, NULL, addr.str(), WF_LOOKUP_TIMEOUT); diff --git a/common/pkgfiles/referencedfilelist.hpp b/common/pkgfiles/referencedfilelist.hpp index 41c999f0435..3d20b40f7c3 100644 --- a/common/pkgfiles/referencedfilelist.hpp +++ b/common/pkgfiles/referencedfilelist.hpp @@ -33,16 +33,18 @@ #define RefFileIndex 0x0001 #define RefFileNotOnCluster 0x0002 #define RefFileNotFound 0x0004 -#define RefFileRemote 0x0008 -#define RefFileForeign 0x0010 -#define RefFileSuper 0x0020 -#define RefSubFile 0x0040 -#define RefFileCopyInfoFailed 0x0080 -#define RefFileCloned 0x0100 -#define RefFileInPackage 0x0200 -#define RefFileNotOnSource 0x0400 -#define RefFileOptional 0x0800 //File referenced in more than one place can be both optional and not optional -#define RefFileNotOptional 0x1000 +#define RefFileResolvedForeign 0x0008 +#define RefFileResolvedRemote 0x0010 +#define RefFileLFNForeign 0x0020 //LFN was Foreign +#define RefFileLFNRemote 0x0040 //LFN was remote +#define RefFileSuper 0x0080 +#define RefSubFile 0x0100 +#define RefFileCopyInfoFailed 0x0200 +#define RefFileCloned 0x0400 +#define RefFileInPackage 0x0800 +#define RefFileNotOnSource 0x1000 +#define RefFileOptional 0x2000 //File referenced in more than one place can be both optional and not optional +#define RefFileNotOptional 0x4000 interface IReferencedFile : extends IInterface @@ -70,14 +72,14 @@ interface IReferencedFileList : extends IInterface virtual void addFiles(StringArray &files)=0; virtual IReferencedFileIterator *getFiles()=0; - virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0; + virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage)=0; virtual void cloneAllInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0; virtual void cloneFileInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0; virtual void cloneRelationships()=0; virtual void setDfuQueue(const char *dfu_queue) = 0; }; -extern REFFILES_API const char *skipForeign(const char *name, StringBuffer *ip=NULL); +extern REFFILES_API const char *skipForeignOrRemote(const char *name, StringBuffer *ip=nullptr, StringBuffer *remote=nullptr); extern REFFILES_API IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr); extern REFFILES_API IReferencedFileList *createReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr); diff --git a/dali/datest/datest.cmake b/dali/datest/datest.cmake index 1f431e66740..99e3a5e1ab4 100644 --- a/dali/datest/datest.cmake +++ b/dali/datest/datest.cmake @@ -34,6 +34,7 @@ include_directories ( ${HPCC_SOURCE_DIR}/system/jlib ${HPCC_SOURCE_DIR}/system/security/shared ${HPCC_SOURCE_DIR}/esp/clients/wsdfuaccess + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ${HPCC_SOURCE_DIR}/fs/dafsstream ${HPCC_SOURCE_DIR}/rtl/include ${HPCC_SOURCE_DIR}/rtl/eclrtl diff --git a/dali/datest/dfuwutest.cmake b/dali/datest/dfuwutest.cmake index 2896494666d..5bb1d685322 100644 --- a/dali/datest/dfuwutest.cmake +++ b/dali/datest/dfuwutest.cmake @@ -38,6 +38,7 @@ include_directories ( ./../../system/jlib ./../../common/workunit ../../common/environment + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ) HPCC_ADD_EXECUTABLE ( dfuwutest ${SRCS} ) @@ -50,6 +51,7 @@ target_link_libraries ( dfuwutest dafsclient dalibase dfuwu + ws_dfsclient ) if (NOT CONTAINERIZED) diff --git a/dali/dfu/dfurun.cpp b/dali/dfu/dfurun.cpp index 46828fdb4c8..c59289d7409 100644 --- a/dali/dfu/dfurun.cpp +++ b/dali/dfu/dfurun.cpp @@ -1064,7 +1064,7 @@ class CDFUengine: public CInterface, implements IDFUengine } // first see if target exists (and remove if does and overwrite specified) - Owned dfile = wsdfs::lookup(dlfn,ctx.user,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE); + Owned dfile = wsdfs::lookup(dlfn,ctx.user,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false); if (dfile) { if (!ctx.superoptions->getOverwrite()) throw MakeStringException(-1,"Destination file %s already exists",dlfn.get()); @@ -1297,6 +1297,7 @@ class CDFUengine: public CInterface, implements IDFUengine Owned foreigndalinode; StringAttr oldRoxiePrefix; bool foreigncopy = false; + bool remotecopy = false; // first check for 'specials' (e.g. multi-cluster keydiff etc) switch (cmd) { case DFUcmd_copy: @@ -1311,6 +1312,7 @@ class CDFUengine: public CInterface, implements IDFUengine CDfsLogicalFileName srclfn; if (tmp.length()) srclfn.set(tmp.str()); + remotecopy = srclfn.isRemote(); destination->getLogicalName(tmp.clear()); CDfsLogicalFileName dstlfn; if (tmp.length()) @@ -1363,7 +1365,7 @@ class CDFUengine: public CInterface, implements IDFUengine } srcFile.setown(wsdfs::lookup(tmp.str(),userdesc, (cmd==DFUcmd_move)||(cmd==DFUcmd_rename)||((cmd==DFUcmd_copy)&&multiclusterinsert) ? AccessMode::tbdWrite : AccessMode::tbdRead, - false,false,nullptr,true, INFINITE)); + false,false,nullptr,true, INFINITE,false)); if (!srcFile) throw MakeStringException(-1,"Source file %s could not be found",tmp.str()); @@ -1526,7 +1528,7 @@ class CDFUengine: public CInterface, implements IDFUengine } else if (multiclustermerge) { - dstFile.setown(wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE)); + dstFile.setown(wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false)); if (!dstFile) throw MakeStringException(-1,"Destination for merge %s does not exist",tmp.str()); StringBuffer err; @@ -1535,7 +1537,7 @@ class CDFUengine: public CInterface, implements IDFUengine } else { - Owned oldfile = wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE); + Owned oldfile = wsdfs::lookup(tmp.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false); if (oldfile) { StringBuffer reason; @@ -1666,7 +1668,7 @@ class CDFUengine: public CInterface, implements IDFUengine Audit("COPYDIFF",userdesc,srcName.get(),dstName.get()); } } - else if (foreigncopy||auxfdesc) + else if (remotecopy||foreigncopy||auxfdesc) { IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : srcFdesc.get()); fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid); @@ -1741,7 +1743,7 @@ class CDFUengine: public CInterface, implements IDFUengine destination->getLogicalName(toname); if (toname.length()) { unsigned start = msTick(); - Owned newfile = wsdfs::lookup(toname.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE); + Owned newfile = wsdfs::lookup(toname.str(),userdesc,AccessMode::tbdWrite,false,false,nullptr,defaultPrivilegedUser,INFINITE,false); if (newfile) { // check for rename into multicluster CDfsLogicalFileName dstlfn; diff --git a/dali/dfu/dfuutil.cpp b/dali/dfu/dfuutil.cpp index 85bfc3964c1..877ae05e5ea 100644 --- a/dali/dfu/dfuutil.cpp +++ b/dali/dfu/dfuutil.cpp @@ -33,6 +33,8 @@ #include "rmtfile.hpp" #include "dfuutil.hpp" +#include "ws_dfsclient.hpp" + // savemap // superkey functions // (logical) directory functions @@ -129,6 +131,7 @@ class CFileCloner { public: StringAttr nameprefix; + StringAttr remoteStorage; Owned foreigndalinode; Linked userdesc; Linked foreignuserdesc; @@ -409,33 +412,41 @@ class CFileCloner void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster) { DBGLOG("updateCloneFrom %s", lfn); - if (!srcdali || srcdali->endpoint().isNull()) + if (remoteStorage.isEmpty() && (!srcdali || srcdali->endpoint().isNull())) attrs.setProp("@cloneFromPeerCluster", srcCluster); else { + while(attrs.removeProp("cloneFromGroup")); + StringBuffer s; - attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str()); + if (!remoteStorage.isEmpty()) + { + attrs.setProp("@cloneRemote", remoteStorage.str()); + if (!isEmptyString(srcCluster)) + attrs.setProp("@cloneRemoteCluster", srcCluster); + } + else + { + attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str()); + unsigned numClusters = srcfdesc->numClusters(); + for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++) + { + StringBuffer sourceGroup; + srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL); + if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster)) + continue; + Owned groupInfo = createPTree("cloneFromGroup"); + groupInfo->setProp("@groupName", sourceGroup); + ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum); + spec.toProp(groupInfo); + attrs.addPropTree("cloneFromGroup", groupInfo.getClear()); + } + } attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir()); if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources attrs.setProp("@cloneFromPeerCluster", "-"); if (prefix.length()) attrs.setProp("@cloneFromPrefix", prefix.get()); - - while(attrs.removeProp("cloneFromGroup")); - - unsigned numClusters = srcfdesc->numClusters(); - for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++) - { - StringBuffer sourceGroup; - srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL); - if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster)) - continue; - Owned groupInfo = createPTree("cloneFromGroup"); - groupInfo->setProp("@groupName", sourceGroup); - ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum); - spec.toProp(groupInfo); - attrs.addPropTree("cloneFromGroup", groupInfo.getClear()); - } } } void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster) @@ -570,6 +581,7 @@ class CFileCloner const char *_cluster2, IUserDescriptor *_userdesc, const char *_foreigndali, + const char *_remoteStorage, IUserDescriptor *_foreignuserdesc, const char *_nameprefix, bool _overwrite, @@ -590,6 +602,8 @@ class CFileCloner level = 0; if (_foreigndali&&*_foreigndali) foreigndalinode.setown(createINode(_foreigndali,DALI_SERVER_PORT)); + if (_remoteStorage && *_remoteStorage) + remoteStorage.set(_remoteStorage); fdir = &queryDistributedFileDirectory(); switch(_clustmap) { case DFUcpdm_c_replicated_by_d: @@ -833,6 +847,8 @@ class CFileCloner else { StringBuffer s; + if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneRemote"), remoteStorage.str())) + return true; if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getEndpointHostText(s).str())) return true; if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir())) @@ -876,20 +892,44 @@ class CFileCloner srcLFN.clearForeign(); srcdali.setown(createINode(ep)); } + StringBuffer s; - Owned ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign); - if (!ftree.get()) - throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s)); - IPropertyTree *attsrc = ftree->queryPropTree("Attr"); - if (!attsrc) - throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s)); + Owned ftree; + Owned filePlane; + IPropertyTree *attsrc = nullptr; + if (remoteStorage || srcLFN.isRemote()) + { + StringBuffer remoteLFN; + if (!srcLFN.isRemote()) + remoteLFN.append("remote::").append(remoteStorage).append("::"); + srcLFN.get(remoteLFN); + + Owned dfsFile = wsdfs::lookupDFSFile(remoteLFN.str(), AccessMode::readSequential, INFINITE, wsdfs::keepAliveExpiryFrequency, foreignuserdesc); + IPropertyTree *tree = (dfsFile) ? dfsFile->queryFileMeta() : nullptr; + if (tree) + ftree.setown(tree->getPropTree("File")); + if (!ftree.get()) + throw MakeStringException(-1,"Source file %s could not be found in Remote Storage", remoteLFN.str()); //remote scope already included in remoteLFN + const char *remotePlaneName = ftree->queryProp("@group"); + VStringBuffer planeXPath("planes[@name=\"%s\"]", remotePlaneName); + filePlane.set(dfsFile->queryCommonMeta()->queryPropTree(planeXPath)); + } + else + { + ftree.setown(fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign)); + if (!ftree.get()) + throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s)); + attsrc = ftree->queryPropTree("Attr"); + if (!attsrc) + throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s)); + } CDfsLogicalFileName dlfn; dlfn.set(destfilename); if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File))) throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s)); - if (!srcdali.get()||queryCoven().inCoven(srcdali)) + if (!remoteStorage.length() && (!srcdali.get() || queryCoven().inCoven(srcdali))) { // if dali is local and filenames same if (streq(srcLFN.get(), dlfn.get())) @@ -1241,7 +1281,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface ) { CFileCloner cloner; - cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,nameprefix,overwrite,dophysicalcopy); + cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,nameprefix,overwrite,dophysicalcopy); CDfsLogicalFileName dlfn; cloner.cloneSuperFile(srcname,dlfn); } @@ -1263,7 +1303,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface { DBGLOG("createSingleFileClone src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=%d", srcname, srcCluster, dstname, cluster1, prefix, overwrite, dophysicalcopy); CFileCloner cloner; - cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,NULL,overwrite,dophysicalcopy); + cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,NULL,overwrite,dophysicalcopy); cloner.srcCluster.set(srcCluster); cloner.prefix.set(prefix); cloner.cloneFile(srcname,dstname); @@ -1280,6 +1320,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface const char *defReplicateFolder, IUserDescriptor *userdesc, // user desc for local dali const char *foreigndali, // can be omitted if srcname foreign or local + const char *remoteStorage, // can be omitted if srcname remote or local unsigned overwriteFlags, // overwrite destination if exists bool dophysicalcopy ) @@ -1288,7 +1329,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface CFileCloner cloner; // MORE: Would the following be better to ensure files are copied when queries are deployed? // bool copyPhysical = isContainerized() && (foreigndali != nullptr); - cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, dophysicalcopy); + cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, remoteStorage, NULL, NULL, false, dophysicalcopy); cloner.overwriteFlags = overwriteFlags; #ifndef _CONTAINERIZED //In containerized mode there is no need to replicate files to the local disks of the roxie cluster - so don't set the special flag diff --git a/dali/dfu/dfuutil.hpp b/dali/dfu/dfuutil.hpp index 87321e40bca..b2de7f75d53 100644 --- a/dali/dfu/dfuutil.hpp +++ b/dali/dfu/dfuutil.hpp @@ -36,6 +36,7 @@ interface IDfuFileCopier: extends IInterface #define DALI_UPDATEF_PACKAGEMAP 0x0100 #define DFU_UPDATEF_COPY 0x1000 #define DFU_UPDATEF_OVERWRITE 0x2000 +#define DFU_UPDATEF_REMOTESTORAGE 0x4000 #define DALI_UPDATEF_SUBFILE_MASK (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_APPEND_CLUSTER) @@ -93,6 +94,7 @@ interface IDFUhelper: extends IInterface const char *defReplicateFolder, IUserDescriptor *userdesc, // user desc for local dali const char *foreigndali, // can be omitted if srcname foreign or local + const char *remoteStorage, // can be omitted if srcname foreign or local unsigned overwriteFlags, // overwrite destination options bool dophysicalcopy ) = 0; diff --git a/dali/dfu/dfuwu.cmake b/dali/dfu/dfuwu.cmake index 6f021cf42b0..8f9179de40d 100644 --- a/dali/dfu/dfuwu.cmake +++ b/dali/dfu/dfuwu.cmake @@ -53,5 +53,6 @@ target_link_libraries ( dfuwu hrpc remote dalibase + ws_dfsclient ) diff --git a/dali/dfu/dfuwu.cpp b/dali/dfu/dfuwu.cpp index 205361fe4d8..3e9d0396e12 100644 --- a/dali/dfu/dfuwu.cpp +++ b/dali/dfu/dfuwu.cpp @@ -33,6 +33,8 @@ #include "wujobq.hpp" #include "dfuutil.hpp" +#include "ws_dfsclient.hpp" + #include "dfuwu.hpp" #define COPY_WAIT_SECONDS 30 @@ -981,7 +983,7 @@ class CDFUfileSpec: public CLinkedDFUWUchild, implements IDFUfileSpec parent->getPassword(password); } userdesc->set(username.str(),password.str()); - Owned file = queryDistributedFileDirectory().lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser); + Owned file = wsdfs::lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser,INFINITE,false); if (file) return file->getFileDescriptor(); } diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index ed0f7d2cfc4..7af8139deda 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -1488,6 +1488,9 @@ void FileSprayer::calibrateProgress() void FileSprayer::checkForOverlap() { + if (distributedSource && distributedSource->isExternal()) + return; + unsigned num = std::min(sources.ordinality(), targets.ordinality()); for (unsigned idx = 0; idx < num; idx++) diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 1ca18ca42c4..8ebad9bf6ea 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1441,7 +1441,7 @@ bool EclAgent::fileExists(const char *name) StringBuffer lfn; expandLogicalName(lfn, name); - Owned f = wsdfs::lookup(lfn.str(), queryUserDescriptor(), AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned f = wsdfs::lookup(lfn.str(), queryUserDescriptor(), AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (f) return true; return false; @@ -2774,7 +2774,7 @@ unsigned __int64 EclAgent::getDatasetHash(const char * logicalName, unsigned __i return crc; } - Owned file = wsdfs::lookup(fullname.str(),queryUserDescriptor(), AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned file = wsdfs::lookup(fullname.str(),queryUserDescriptor(), AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (file) { WorkunitUpdate wu = updateWorkUnit(); @@ -3084,7 +3084,7 @@ void EclAgent::deleteLRUPersists(const char * logicalName, unsigned keep) MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP)); persistLock.setown(getPersistReadLock(goer)); } - Owned f = wsdfs::lookup(goer, queryUserDescriptor(), AccessMode::tbdWrite, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned f = wsdfs::lookup(goer, queryUserDescriptor(), AccessMode::tbdWrite, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (!f) goto restart; // Persist has been deleted since last checked - repeat the whole process const char *newAccessTime = f->queryAttributes().queryProp("@accessed"); diff --git a/ecl/eclcc/eclcc.cpp b/ecl/eclcc/eclcc.cpp index 878b574c332..04bb4042e7e 100644 --- a/ecl/eclcc/eclcc.cpp +++ b/ecl/eclcc/eclcc.cpp @@ -2553,7 +2553,7 @@ IHqlExpression *EclCC::lookupDFSlayout(const char *filename, IErrorReceiver &err // Look up the file in Dali try { - Owned dfsFile = wsdfs::lookup(filename, udesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned dfsFile = wsdfs::lookup(filename, udesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE,false); if (dfsFile) { const char *recordECL = dfsFile->queryAttributes().queryProp("ECL"); diff --git a/ecl/eclcmd/eclcmd_common.hpp b/ecl/eclcmd/eclcmd_common.hpp index 9be689fa98f..efb6c7cdceb 100644 --- a/ecl/eclcmd/eclcmd_common.hpp +++ b/ecl/eclcmd/eclcmd_common.hpp @@ -106,6 +106,7 @@ typedef IEclCommand *(*EclCommandFactory)(const char *cmdname); #define ECLOPT_OVERWRITE_ENV NULL #define ECLOPT_DONT_COPY_FILES "--no-files" +#define ECLOPT_REMOTE_STORAGE "--remote-storage" #define ECLOPT_DFU_COPY_FILES "--dfu-copy" #define ECLOPT_ONLY_COPY_FILES "--only-copy-files" #define ECLOPT_ALLOW_FOREIGN "--allow-foreign" @@ -457,6 +458,7 @@ class EclCmdOptionsDFU template void updateRequest(TRequest *req) { + req->setRemoteStorage(optRemoteStorage); req->setDfuCopyFiles(optDfuCopyFiles); req->setDfuQueue(optDfuQueue); req->setDfuWait(optDfuWaitSec); @@ -486,6 +488,8 @@ class EclCmdOptionsDFU bool match(ArgvIterator &iter) { + if (iter.matchOption(optRemoteStorage, ECLOPT_REMOTE_STORAGE)) + return true; if (iter.matchFlag(optDfuCopyFiles, ECLOPT_DFU_COPY_FILES)) return true; if (iter.matchOption(optDfuQueue, ECLOPT_DFU_QUEUE)) @@ -505,6 +509,8 @@ class EclCmdOptionsDFU void usage() { fputs( + " DFS Options:\n" + " --remote-storage Use the given remote storage configuration to locate remote files\n" " DFU Options:\n" " --dfu-copy Use DFU to copy files during deployment, not on roxie in the background\n" " --dfu-queue DFU Queue to use when doing a DFU copy\n" @@ -520,6 +526,7 @@ class EclCmdOptionsDFU } public: StringAttr optDfuPublisherWuid; + StringAttr optRemoteStorage; StringAttr optDfuQueue; unsigned optDfuWaitSec = 1800; //30 minutes bool optDfuCopyFiles = false; diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index bcf85f94b94..49013d51712 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -1105,7 +1105,7 @@ CHThorIndexWriteActivity::CHThorIndexWriteActivity(IAgentContext &_agent, unsign expandLogicalFilename(lfn, fname, agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false); if (!agent.queryResolveFilesLocally()) { - Owned f = wsdfs::lookup(lfn, agent.queryCodeContext()->queryUserDescriptor(), AccessMode::tbdWrite, false, false, nullptr, defaultNonPrivilegedUser, INFINITE); + Owned f = wsdfs::lookup(lfn, agent.queryCodeContext()->queryUserDescriptor(), AccessMode::tbdWrite, false, false, nullptr, defaultNonPrivilegedUser, INFINITE, false); if (f) { diff --git a/esp/clients/ws_dfsclient/ws_dfsclient.cpp b/esp/clients/ws_dfsclient/ws_dfsclient.cpp index d622a790210..fd452416ea9 100644 --- a/esp/clients/ws_dfsclient/ws_dfsclient.cpp +++ b/esp/clients/ws_dfsclient/ws_dfsclient.cpp @@ -782,14 +782,14 @@ IDistributedFile *lookupLegacyDFSFile(const char *logicalName, AccessMode access return createLegacyDFSFile(dfsFile); } -IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout) +IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout, bool forceAllowForeign) { bool viaDali = false; bool isForeign = false; try { isForeign = lfn.isForeign(); } catch(IException *e) { e->Release(); } // catch and ignore multi lfn case, will be checked later - if (isForeign) + if (isForeign && !forceAllowForeign) { // default denied in cloud, allowed in bare-metal bool allow = isContainerized() ? false : true; @@ -825,11 +825,11 @@ IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, Access return wsdfs::lookupLegacyDFSFile(lfn.get(), accessMode, timeout, wsdfs::keepAliveExpiryFrequency, user); } -IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout) +IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout, bool forceAllowForeign) { CDfsLogicalFileName lfn; lfn.set(logicalFilename); - return lookup(lfn, user, accessMode, hold, lockSuperOwner, transaction, priviledged, timeout); + return lookup(lfn, user, accessMode, hold, lockSuperOwner, transaction, priviledged, timeout, forceAllowForeign); } @@ -884,7 +884,7 @@ class CLocalOrDistributedFile: implements ILocalOrDistributedFile, public CInter if (gotlocal) { if (!write && !onlylocal) // MORE - this means the dali access checks not happening... maybe that's ok? - dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE)); + dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE, false)); Owned file = getPartFile(0,0); if (file.get()) { @@ -908,7 +908,7 @@ class CLocalOrDistributedFile: implements ILocalOrDistributedFile, public CInter } else { - dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE)); + dfile.setown(wsdfs::lookup(lfn, user, accessMode, false, false, nullptr, isPrivilegedUser, INFINITE, false)); if (dfile.get()) return true; } diff --git a/esp/clients/ws_dfsclient/ws_dfsclient.hpp b/esp/clients/ws_dfsclient/ws_dfsclient.hpp index 0769d0e0634..502d26131d6 100644 --- a/esp/clients/ws_dfsclient/ws_dfsclient.hpp +++ b/esp/clients/ws_dfsclient/ws_dfsclient.hpp @@ -51,8 +51,8 @@ WS_DFSCLIENT_API IDFSFile *lookupDFSFile(const char *logicalName, AccessMode acc WS_DFSCLIENT_API IDistributedFile *createLegacyDFSFile(IDFSFile *dfsFile); WS_DFSCLIENT_API IDistributedFile *lookupLegacyDFSFile(const char *logicalName, AccessMode accessMode, unsigned timeoutSecs, unsigned keepAliveExpiryFrequency, IUserDescriptor *userDesc); -WS_DFSCLIENT_API IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout); -WS_DFSCLIENT_API IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout); +WS_DFSCLIENT_API IDistributedFile *lookup(CDfsLogicalFileName &lfn, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout, bool forceAllowForeign); +WS_DFSCLIENT_API IDistributedFile *lookup(const char *logicalFilename, IUserDescriptor *user, AccessMode accessMode, bool hold, bool lockSuperOwner, IDistributedFileTransaction *transaction, bool priviledged, unsigned timeout, bool forceAllowForeign); } // end of namespace wsdfs diff --git a/esp/scm/ws_packageprocess.ecm b/esp/scm/ws_packageprocess.ecm index 91b5f651e84..5e83b6b5178 100644 --- a/esp/scm/ws_packageprocess.ecm +++ b/esp/scm/ws_packageprocess.ecm @@ -49,6 +49,7 @@ ESPrequest AddPackageRequest [min_ver("1.05")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.05")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.06")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.06")] string RemoteStorage; }; @@ -84,6 +85,7 @@ ESPrequest CopyPackageMapRequest [min_ver("1.05")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.05")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.06")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.06")] string RemoteStorage; }; ESPresponse [exceptions_inline] CopyPackageMapResponse @@ -318,6 +320,7 @@ ESPrequest AddPartToPackageMapRequest [min_ver("1.05")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.05")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.06")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.06")] string RemoteStorage; }; ESPresponse [exceptions_inline] AddPartToPackageMapResponse diff --git a/esp/scm/ws_workunits_queryset_req_resp.ecm b/esp/scm/ws_workunits_queryset_req_resp.ecm index 55d190ac2bf..4e2aa6c5b86 100644 --- a/esp/scm/ws_workunits_queryset_req_resp.ecm +++ b/esp/scm/ws_workunits_queryset_req_resp.ecm @@ -55,6 +55,7 @@ ESPrequest [nil_remove] WURecreateQueryRequest [min_ver("1.89")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.89")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline, nil_remove] WURecreateQueryResponse @@ -141,6 +142,7 @@ ESPrequest [nil_remove] WUPublishWorkunitRequest [min_ver("1.89")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.89")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline] WUPublishWorkunitResponse @@ -391,6 +393,7 @@ ESPrequest WUQuerysetImportRequest [min_ver("1.89")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.89")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline] WUQuerysetImportResponse @@ -478,6 +481,7 @@ ESPrequest [nil_remove] WUQuerySetCopyQueryRequest [min_ver("1.89")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.89")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline] WUQuerySetCopyQueryResponse @@ -513,6 +517,7 @@ ESPrequest [nil_remove] WUCopyQuerySetRequest [min_ver("1.89")] bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command [min_ver("1.89")] bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline] WUCopyQuerySetResponse diff --git a/esp/scm/ws_workunits_req_resp.ecm b/esp/scm/ws_workunits_req_resp.ecm index 145c49376a5..22673090203 100644 --- a/esp/scm/ws_workunits_req_resp.ecm +++ b/esp/scm/ws_workunits_req_resp.ecm @@ -1085,6 +1085,7 @@ ESPrequest [nil_remove] WUEclDefinitionActionRequest bool OnlyCopyFiles(false); //Copies the files needed for the command but doesn't actually complete the command bool StopIfFilesCopied(false); //Command only completes if no files need copying. User can run again after DFU Publisher Workunit completes. [min_ver("1.95")] string DfuPublisherWuid; //Wuid can be preallocated and then passed in here to use. Will be created if empty + [min_ver("1.97")] string RemoteStorage; }; ESPresponse [exceptions_inline] WUEclDefinitionActionResponse diff --git a/esp/services/ws_dfu/CMakeLists.txt b/esp/services/ws_dfu/CMakeLists.txt index 28bc5ca6dac..ddcfbdd2241 100644 --- a/esp/services/ws_dfu/CMakeLists.txt +++ b/esp/services/ws_dfu/CMakeLists.txt @@ -79,6 +79,7 @@ include_directories ( ./../../../common/fileview2 ./../../bindings/SOAP/xpp ${HPCC_SOURCE_DIR}/esp/clients + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ) ADD_DEFINITIONS( -D_USRDLL -DWS_DFU_EXPORTS -DWSDFU_API_LOCAL -DWSDFUXREF_API_LOCAL -DESP_SERVICE_WsDfu) diff --git a/esp/services/ws_fs/ws_fsService.cpp b/esp/services/ws_fs/ws_fsService.cpp index c46857a3f77..042a955c903 100644 --- a/esp/services/ws_fs/ws_fsService.cpp +++ b/esp/services/ws_fs/ws_fsService.cpp @@ -2752,7 +2752,7 @@ bool CFileSprayEx::onCopy(IEspContext &context, IEspCopy &req, IEspCopyResponse logicalName.setForeign(ep,false); } - Owned file = wsdfs::lookup(logicalName, udesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned file = wsdfs::lookup(logicalName, udesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (!file) throw MakeStringException(ECLWATCH_FILE_NOT_EXIST, "Failed to find file: %s", logicalName.get()); diff --git a/esp/services/ws_packageprocess/CMakeLists.txt b/esp/services/ws_packageprocess/CMakeLists.txt index 059d2d0fa0d..2ae4916d1c8 100644 --- a/esp/services/ws_packageprocess/CMakeLists.txt +++ b/esp/services/ws_packageprocess/CMakeLists.txt @@ -48,6 +48,7 @@ include_directories ( ${HPCC_SOURCE_DIR}/common/workunit ${HPCC_SOURCE_DIR}/rtl/include ${HPCC_SOURCE_DIR}/esp/smc/SMCLib + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ) ADD_DEFINITIONS( -D_USRDLL -DWsPackageProcess_API_LOCAL -DFileSpray_API_LOCAL -DWSDFU_API_LOCAL -DESP_SERVICE_WsPackageProcess) diff --git a/esp/services/ws_packageprocess/ws_packageprocessService.cpp b/esp/services/ws_packageprocess/ws_packageprocessService.cpp index d8bd6902938..442896cd7a0 100644 --- a/esp/services/ws_packageprocess/ws_packageprocessService.cpp +++ b/esp/services/ws_packageprocess/ws_packageprocessService.cpp @@ -124,7 +124,7 @@ bool isFileKnownOnCluster(const char *logicalname, const char *target, IUserDesc return isFileKnownOnCluster(logicalname, clusterInfo, userdesc); } -void cloneFileInfoToDali(StringBuffer &publisherWuid, unsigned updateFlags, StringArray ¬Found, IPropertyTree *packageMap, const char *lookupDaliIp, IConstWUClusterInfo *dstInfo, const char *srcCluster, const char *remotePrefix, IUserDescriptor* userdesc, bool allowForeignFiles, const char *jobname=nullptr) +void cloneFileInfoToDali(StringBuffer &publisherWuid, unsigned updateFlags, StringArray ¬Found, IPropertyTree *packageMap, const char *remoteLocation, IConstWUClusterInfo *dstInfo, const char *srcCluster, const char *remotePrefix, IUserDescriptor* userdesc, bool allowForeignFiles, const char *jobname=nullptr) { StringBuffer user; StringBuffer password; @@ -146,14 +146,14 @@ void cloneFileInfoToDali(StringBuffer &publisherWuid, unsigned updateFlags, Stri StringBuffer targetPlane; //roxies default plane, where files will be copied if not found in locations getRoxieDirectAccessPlanes(locations, targetPlane, clusterName.str(), true); - wufiles->resolveFiles(locations, lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false); + wufiles->resolveFiles(locations, remoteLocation, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false, false, (updateFlags & DFU_UPDATEF_REMOTESTORAGE)!=0); wufiles->cloneAllInfo(publisherWuid, targetPlane, updateFlags, helper, true, false, 0, 1, 0, nullptr); #else StringArray locations; SCMStringBuffer processName; dstInfo->getRoxieProcess(processName); locations.append(processName.str()); - wufiles->resolveFiles(locations, lookupDaliIp, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false); + wufiles->resolveFiles(locations, remoteLocation, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), false, false, false, (updateFlags & DFU_UPDATEF_REMOTESTORAGE)!=0); StringBuffer defReplicateFolder; getConfigurationDirectory(NULL, "data2", "roxie", processName.str(), defReplicateFolder); @@ -262,7 +262,7 @@ class PackageMapUpdater IPropertyTree *packageMaps; IPropertyTree *pmExisting; - StringBuffer daliIP; + StringBuffer remoteLocation; StringBuffer srcCluster; StringBuffer prefix; StringBuffer pmid; @@ -331,11 +331,11 @@ class PackageMapUpdater } void setDerivedDfsLocation(const char *dfsLocation, const char *srcProcess) { - splitDerivedDfsLocation(dfsLocation, srcCluster, daliIP, prefix, srcProcess, srcProcess, NULL, NULL); + splitDerivedDfsLocation(dfsLocation, srcCluster, remoteLocation, prefix, srcProcess, srcProcess, NULL, NULL); if (srcCluster.length()) { - if (!validateDataPlaneName(daliIP, srcCluster)) - throw MakeStringException(PKG_INVALID_CLUSTER_TYPE, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local"); + if (!validateDataPlaneName(remoteLocation, srcCluster)) + throw MakeStringException(PKG_INVALID_CLUSTER_TYPE, "Process cluster %s not found on %s remote", srcCluster.str(), remoteLocation.length() ? remoteLocation.str() : "local"); } } void convertExisting() @@ -384,7 +384,7 @@ class PackageMapUpdater if (isEmptyString(jobname)) jobname = pmid.str(); if (!streq(target.get(), "*")) - cloneFileInfoToDali(publisherWuid, updateFlags, filesNotFound, pt, daliIP, ensureClusterInfo(), srcCluster, prefix, userdesc, checkFlag(PKGADD_ALLOW_FOREIGN), jobname); + cloneFileInfoToDali(publisherWuid, updateFlags, filesNotFound, pt, remoteLocation, ensureClusterInfo(), srcCluster, prefix, userdesc, checkFlag(PKGADD_ALLOW_FOREIGN), jobname); else { CConstWUClusterInfoArray clusters; @@ -393,7 +393,7 @@ class PackageMapUpdater { IConstWUClusterInfo &cluster = clusters.item(i); if (cluster.getPlatform() == RoxieCluster) - cloneFileInfoToDali(publisherWuid, updateFlags, filesNotFound, pt, daliIP, &cluster, srcCluster, prefix, userdesc, checkFlag(PKGADD_ALLOW_FOREIGN), jobname); + cloneFileInfoToDali(publisherWuid, updateFlags, filesNotFound, pt, remoteLocation, &cluster, srcCluster, prefix, userdesc, checkFlag(PKGADD_ALLOW_FOREIGN), jobname); } } } @@ -876,9 +876,15 @@ bool CWsPackageProcessEx::onAddPackage(IEspContext &context, IEspAddPackageReque updater.setPMID(req.getTarget(), req.getPackageMap(), req.getGlobalScope()); updater.setProcess(req.getProcess()); updater.setUser(context.queryUserId(), context.queryPassword(), nullptr); - updater.setDerivedDfsLocation(req.getDaliIp(), req.getSourceProcess()); unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getDaliIp(); + updater.setDerivedDfsLocation(remoteStr, req.getSourceProcess()); + if (req.getOverWrite()) updateFlags |= (DALI_UPDATEF_PACKAGEMAP | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES); if (req.getReplacePackageMap()) @@ -947,9 +953,15 @@ bool CWsPackageProcessEx::onCopyPackageMap(IEspContext &context, IEspCopyPackage updater.setProcess(req.getProcess()); updater.setUser(context.queryUserId(), context.queryPassword(), &context); - updater.setDerivedDfsLocation(req.getDaliIp(), req.getSourceProcess()); unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getDaliIp(); + updater.setDerivedDfsLocation(remoteStr, req.getSourceProcess()); + if (req.getReplacePackageMap()) updateFlags |= DALI_UPDATEF_PACKAGEMAP; if (req.getUpdateCloneFrom()) @@ -1284,7 +1296,7 @@ void CWsPackageProcessEx::validatePackage(IEspContext &context, IEspValidatePack pmfiles->addFilesFromPackageMap(mapTree); StringArray locations; locations.append(process.str()); - pmfiles->resolveFiles(locations, nullptr, nullptr, nullptr, true, false, false); + pmfiles->resolveFiles(locations, nullptr, nullptr, nullptr, true, false, false, false, false); Owned files = pmfiles->getFiles(); ForEach(*files) { @@ -1475,9 +1487,16 @@ bool CWsPackageProcessEx::onAddPartToPackageMap(IEspContext &context, IEspAddPar updater.setPMID(req.getTarget(), req.getPackageMap(), req.getGlobalScope()); updater.setProcess(req.getProcess()); updater.setUser(context.queryUserId(), context.queryPassword(), nullptr); - updater.setDerivedDfsLocation(req.getDaliIp(), req.getSourceProcess()); unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getDaliIp(); + updater.setDerivedDfsLocation(remoteStr, req.getSourceProcess()); + + if (req.getDeletePrevious()) updateFlags |= DALI_UPDATEF_PACKAGEMAP; if (req.getUpdateCloneFrom()) diff --git a/esp/services/ws_workunits/CMakeLists.txt b/esp/services/ws_workunits/CMakeLists.txt index 72ebd39a8fa..9356d5d1a41 100644 --- a/esp/services/ws_workunits/CMakeLists.txt +++ b/esp/services/ws_workunits/CMakeLists.txt @@ -43,6 +43,7 @@ set ( SRCS ${HPCC_SOURCE_DIR}/esp/scm/ws_workunits_queryset_req_resp.ecm ${HPCC_SOURCE_DIR}/esp/scm/ws_workunits.ecm ${HPCC_SOURCE_DIR}/esp/clients/roxiecontrol.cpp + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ws_workunitsPlugin.cpp ws_workunitsService.cpp ws_workunitsService.hpp @@ -96,6 +97,7 @@ include_directories ( ./../../smc/SMCLib ./../../bindings/SOAP/xpp ${HPCC_SOURCE_DIR}/esp/bindings/http/platform + ${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient ${HPCC_SOURCE_DIR}/dali/dfu ${HPCC_SOURCE_DIR}/dali/ft ${HPCC_SOURCE_DIR}/common/wuanalysis @@ -132,6 +134,7 @@ target_link_libraries ( ws_workunits thorhelper pkgfiles wuanalysis + ws_dfsclient ${CPPUNIT_LIBRARIES} ${COMMON_ESP_SERVICE_LIBS} ) diff --git a/esp/services/ws_workunits/ws_workunitsQuerySets.cpp b/esp/services/ws_workunits/ws_workunitsQuerySets.cpp index 6dd70d3700c..72eb3b87d31 100644 --- a/esp/services/ws_workunits/ws_workunitsQuerySets.cpp +++ b/esp/services/ws_workunits/ws_workunitsQuerySets.cpp @@ -366,7 +366,7 @@ void QueryFilesInUse::loadTarget(IPropertyTree *t, const char *target, unsigned return; StringArray locations; locations.append(process.str()); - wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, false); + wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, false, false); Owned files = wufiles->getFiles(); ForEach(*files) @@ -793,7 +793,7 @@ class QueryFileCopier const char * targetPlaneOrGroup = process; locations.append(targetPlaneOrGroup); #endif - files->resolveFiles(locations, remoteIP, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true); + files->resolveFiles(locations, remoteLocation, remotePrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES)), true, false, true, (updateFlags & DFU_UPDATEF_REMOTESTORAGE)); Owned helper = createIDFUhelper(); files->setDfuQueue(dfu_queue); #ifdef _CONTAINERIZED @@ -822,7 +822,7 @@ class QueryFileCopier Owned files; StringBuffer process; - StringAttr remoteIP; + StringAttr remoteLocation; StringAttr remotePrefix; StringAttr srcCluster; StringAttr queryname; @@ -978,9 +978,40 @@ bool CWsWorkunitsEx::onWUPublishWorkunit(IEspContext &context, IEspWUPublishWork StringBuffer publisherWuid(req.getDfuPublisherWuid()); if (!req.getDontCopyFiles()) { + StringBuffer remoteLocation; + StringBuffer srcCluster; + StringBuffer srcPrefix; + + unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getRemoteDali(); + + splitDerivedDfsLocation(remoteStr, srcCluster, remoteLocation, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL); + + if (srcCluster.length()) + { + if (!validateDataPlaneName(remoteLocation, srcCluster)) + throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s remote", srcCluster.str(), remoteLocation.length() ? remoteLocation.str() : "local"); + } + if (req.getUpdateDfs()) + updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM); + if (req.getUpdateCloneFrom()) + updateFlags |= DALI_UPDATEF_CLONE_FROM; + if (req.getUpdateSuperFiles()) + updateFlags |= DALI_UPDATEF_SUPERFILES; + if (req.getAppendCluster()) + updateFlags |= DALI_UPDATEF_APPEND_CLUSTER; + if (req.getDfuCopyFiles()) + updateFlags |= DFU_UPDATEF_COPY; + if (req.getDfuOverwrite()) + updateFlags |= DFU_UPDATEF_OVERWRITE; + QueryFileCopier cpr(target); cpr.init(context, req.getAllowForeignFiles(), queryName); - cpr.remoteIP.set(daliIP); + cpr.remoteLocation.set(remoteLocation); cpr.remotePrefix.set(srcPrefix); cpr.srcCluster.set(srcCluster); cpr.queryname.set(queryName); @@ -2130,17 +2161,25 @@ bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQuery StringBuffer publisherWuid(req.getDfuPublisherWuid()); if (!req.getDontCopyFiles()) { - StringBuffer daliIP; + StringBuffer remoteLocation; StringBuffer srcCluster; StringBuffer srcPrefix; - splitDerivedDfsLocation(req.getRemoteDali(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL); + + unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getRemoteDali(); + + splitDerivedDfsLocation(remoteStr, srcCluster, remoteLocation, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), NULL, NULL); if (srcCluster.length()) { - if (!validateDataPlaneName(daliIP, srcCluster)) - throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s DALI", srcCluster.str(), daliIP.length() ? daliIP.str() : "local"); + if (!validateDataPlaneName(remoteLocation, srcCluster)) + throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s remote", srcCluster.str(), remoteLocation.length() ? remoteLocation.str() : "local"); } - unsigned updateFlags = 0; + if (req.getUpdateDfs()) updateFlags |= (DALI_UPDATEF_SUPERFILES | DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM); if (req.getUpdateCloneFrom()) @@ -2156,7 +2195,7 @@ bool CWsWorkunitsEx::onWURecreateQuery(IEspContext &context, IEspWURecreateQuery QueryFileCopier cpr(target); cpr.init(context, req.getAllowForeignFiles(), srcQueryName); - cpr.remoteIP.set(daliIP); + cpr.remoteLocation.set(remoteLocation); cpr.remotePrefix.set(srcPrefix); cpr.srcCluster.set(srcCluster); cpr.queryname.set(srcQueryName); @@ -2568,7 +2607,7 @@ bool CWsWorkunitsEx::getQueryFiles(IEspContext &context, const char* wuid, const wufiles->addFilesFromQuery(cw, (ps) ? ps->queryActiveMap(target) : NULL, query); StringArray locations; locations.append(process.str()); - wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, true); + wufiles->resolveFiles(locations, NULL, NULL, NULL, true, true, true, true, false); Owned refFileItr = wufiles->getFiles(); ForEach(*refFileItr) { @@ -3241,7 +3280,7 @@ class QueryCloner { if (cloneFilesEnabled) { - wufiles->resolveFiles(locations, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true); + wufiles->resolveFiles(locations, dfsIP, srcPrefix, srcCluster, !(updateFlags & (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM)), true, false, true, false); Owned helper = createIDFUhelper(); Owned cl = getWUClusterInfoByName(target); if (cl) @@ -3437,11 +3476,25 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC StringBuffer publisherWuid(req.getDfuPublisherWuid()); if (!req.getDontCopyFiles()) { - StringBuffer daliIP; + StringBuffer remoteLocation; StringBuffer srcCluster; StringBuffer srcPrefix; - splitDerivedDfsLocation(req.getDaliServer(), srcCluster, daliIP, srcPrefix, req.getSourceProcess(), req.getSourceProcess(), remoteIP.str(), NULL); + unsigned updateFlags = 0; + const char *remoteStr = req.getRemoteStorage(); + if (!isEmptyString(remoteStr)) + updateFlags = DFU_UPDATEF_REMOTESTORAGE; + else + remoteStr = req.getDaliServer(); + + splitDerivedDfsLocation(remoteStr, srcCluster, remoteLocation, srcPrefix, req.getSourceProcess(),req.getSourceProcess(), remoteIP.str(), NULL); + + if (srcCluster.length()) + { + if (!validateDataPlaneName(remoteLocation, srcCluster)) + throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Process cluster %s not found on %s remote", srcCluster.str(), remoteLocation.length() ? remoteLocation.str() : "local"); + } + if (req.getOverwrite()) updateFlags |= (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_SUPERFILES); if (req.getUpdateCloneFrom()) @@ -3457,7 +3510,7 @@ bool CWsWorkunitsEx::onWUQuerysetCopyQuery(IEspContext &context, IEspWUQuerySetC QueryFileCopier cpr(target); cpr.init(context, req.getAllowForeignFiles(), targetQueryName); - cpr.remoteIP.set(daliIP); + cpr.remoteLocation.set(remoteLocation); cpr.remotePrefix.set(srcPrefix); cpr.srcCluster.set(srcCluster); cpr.queryname.set(targetQueryName); diff --git a/roxie/ccd/ccdcontext.cpp b/roxie/ccd/ccdcontext.cpp index 0b0ea57de7b..a7de26121a7 100644 --- a/roxie/ccd/ccdcontext.cpp +++ b/roxie/ccd/ccdcontext.cpp @@ -448,7 +448,7 @@ class CRoxieWorkflowMachine : public WorkflowMachine inline bool fileExists(const char *lfn) { - Owned f = wsdfs::lookup(lfn, queryUserDescriptor(), AccessMode::readMeta, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned f = wsdfs::lookup(lfn, queryUserDescriptor(), AccessMode::readMeta, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (f) return true; return false; @@ -767,7 +767,7 @@ class CRoxieWorkflowMachine : public WorkflowMachine MilliSleep(PERSIST_LOCK_SLEEP + (getRandom()%PERSIST_LOCK_SLEEP)); persistLock.setown(getPersistReadLock(goer)); } - Owned f = wsdfs::lookup(goer, queryUserDescriptor(), AccessMode::writeSequential, false, false, nullptr, defaultPrivilegedUser, INFINITE); + Owned f = wsdfs::lookup(goer, queryUserDescriptor(), AccessMode::writeSequential, false, false, nullptr, defaultPrivilegedUser, INFINITE, false); if (!f) goto restart; // Persist has been deleted since last checked - repeat the whole process const char *newAccessTime = f->queryAttributes().queryProp("@accessed"); @@ -3856,7 +3856,7 @@ class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerCon { StringBuffer fullname; expandLogicalFilename(fullname, logicalName, workUnit, false, false); - Owned file = wsdfs::lookup(fullname.str(),queryUserDescriptor(),AccessMode::readMeta,false,false,nullptr,defaultPrivilegedUser,INFINITE); + Owned file = wsdfs::lookup(fullname.str(),queryUserDescriptor(),AccessMode::readMeta,false,false,nullptr,defaultPrivilegedUser,INFINITE,false); if (file) { WorkunitUpdate wu = updateWorkUnit(); diff --git a/roxie/ccd/ccddali.cpp b/roxie/ccd/ccddali.cpp index 6465556dd74..770111cb6ae 100644 --- a/roxie/ccd/ccddali.cpp +++ b/roxie/ccd/ccddali.cpp @@ -354,7 +354,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface return localTree.getClear(); } - IFileDescriptor *recreateCloneSource(IFileDescriptor *srcfdesc, const char *destfilename) + IFileDescriptor *recreateCloneForeignSource(const char *cloneFrom, IFileDescriptor *srcfdesc, const char *destfilename) { Owned dstfdesc = createFileDescriptor(srcfdesc->getProperties()); // calculate dest dir @@ -379,7 +379,6 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str()); } - const char *cloneFrom = srcfdesc->queryProperties().queryProp("@cloneFrom"); Owned groups = srcfdesc->queryProperties().getElements("cloneFromGroup"); ForEach(*groups) { @@ -576,29 +575,61 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface return ret.getClear(); } - IFileDescriptor *checkClonedFromRemote(const char *_lfn, IFileDescriptor *fdesc, bool cacheIt, bool isPrivilegedUser) + IFileDescriptor *checkClonedFromRemoteStorage(const char *cloneRemote, const char *_lfn, IFileDescriptor *fdesc, bool cacheIt, bool isPrivilegedUser) { - // NOTE - we rely on the fact that queryNamedGroupStore().lookup caches results,to avoid excessive load on remote dali - if (_lfn && !strnicmp(_lfn, "foreign", 7)) //if need to support dali hopping should add each remote location - return NULL; - if (!fdesc) - return NULL; - const char *cloneFrom = fdesc->queryProperties().queryProp("@cloneFrom"); - if (!cloneFrom) - return NULL; - StringBuffer foreignLfn("foreign::"); - foreignLfn.append(cloneFrom); + StringBuffer remoteLfn; + remoteLfn.append("remote::").append(cloneRemote); + const char *cloneFromPrefix = fdesc->queryProperties().queryProp("@cloneFromPrefix"); + if (cloneFromPrefix && *cloneFromPrefix) + remoteLfn.append("::").append(cloneFromPrefix); + remoteLfn.append("::").append(_lfn); + const char *cloneRemoteCluster = fdesc->queryProperties().queryProp("@cloneRemoteCluster"); + if (!isEmptyString(cloneRemoteCluster)) + remoteLfn.append("@").append(cloneRemoteCluster); + + try + { + //Treat remote storage files a bit differently then foreign DALI files for now. For foreign DALI, we try to recreate the DistributedFile without contacting DALI. + // This is to prevent DALI traffic. We may want to optimize remote storage access differently over time. For example improved caching in the DFS service which may + // scale better than DALI. Anyway, just go through DFS for now, and we can optimize once you understand the usage pattern better + + if (traceLevel > 1) + DBGLOG("checkClonedFromRemoteStorage: Resolving %s from remote storage", _lfn); + Owned cloneFile = resolveLFN(remoteLfn, cacheIt, AccessMode::readRandom, isPrivilegedUser); + if (cloneFile) + { + Owned cloneFDesc = cloneFile->getFileDescriptor(); + if (cloneFDesc->numParts()==fdesc->numParts()) + return cloneFDesc.getClear(); + + DBGLOG(ROXIE_MISMATCH, "File local metadata (%s numParts %d) vs cloneRemote(%s numParts %d) mismatch", _lfn, fdesc->numParts(), cloneRemote, cloneFDesc->numParts()); + } + } + catch (IException *E) + { + if (traceLevel > 3) + EXCLOG(E); + E->Release(); // Any failure means act as if no remote info + } + return NULL; + } + + IFileDescriptor *checkClonedFromForeignDali(const char *cloneFrom, const char *_lfn, IFileDescriptor *fdesc, bool cacheIt, bool isPrivilegedUser) + { + StringBuffer foreignLfn; + foreignLfn.append("foreign::").append(cloneFrom); const char *cloneFromPrefix = fdesc->queryProperties().queryProp("@cloneFromPrefix"); if (cloneFromPrefix && *cloneFromPrefix) foreignLfn.append("::").append(cloneFromPrefix); foreignLfn.append("::").append(_lfn); + if (!connected()) return resolveCachedLFN(foreignLfn); // Note - cache only used when no dali connection available try { if (fdesc->queryProperties().hasProp("cloneFromGroup") && fdesc->queryProperties().hasProp("@cloneFromDir")) { - Owned ret = recreateCloneSource(fdesc, _lfn); + Owned ret = recreateCloneForeignSource(cloneFrom, fdesc, _lfn); if (cacheIt) cacheFileDescriptor(foreignLfn, ret); return ret.getClear(); @@ -606,7 +637,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface else // Legacy mode - recently cloned files should have the extra info { if (doTrace(traceRoxieFiles)) - DBGLOG("checkClonedFromRemote: Resolving %s in legacy mode", _lfn); + DBGLOG("checkClonedFromForeignDali: Resolving %s in legacy mode", _lfn); Owned cloneFile = resolveLFN(foreignLfn, cacheIt, AccessMode::readRandom, isPrivilegedUser); if (cloneFile) { @@ -627,6 +658,22 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface return NULL; } + IFileDescriptor *checkClonedFromRemote(const char *_lfn, IFileDescriptor *fdesc, bool cacheIt, bool isPrivilegedUser) + { + if (!fdesc) + return NULL; + if (_lfn && (strnicmp(_lfn, "foreign", 7)==0 || strnicmp(_lfn, "remote", 6)==0)) //if need to support dali hopping should add each remote location + return NULL; + + const char *cloneRemote = fdesc->queryProperties().queryProp("@cloneRemote"); + if (!isEmptyString(cloneRemote)) + return checkClonedFromRemoteStorage(cloneRemote, _lfn, fdesc, cacheIt, isPrivilegedUser); + const char *cloneFrom = fdesc->queryProperties().queryProp("@cloneFrom"); + if (!isEmptyString(cloneFrom)) + return checkClonedFromForeignDali(cloneFrom, _lfn, fdesc, cacheIt, isPrivilegedUser); + return nullptr; + } + virtual IDistributedFile *resolveLFN(const char *logicalName, bool cacheIt, AccessMode accessMode, bool isPrivilegedUser) { if (isConnected) @@ -634,7 +681,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface unsigned start = msTick(); CDfsLogicalFileName lfn; lfn.set(logicalName); - Owned dfsFile = wsdfs::lookup(lfn, userdesc.get(), accessMode, cacheIt,false,nullptr,isPrivilegedUser,INFINITE); + Owned dfsFile = wsdfs::lookup(lfn, userdesc.get(), accessMode, cacheIt,false,nullptr,isPrivilegedUser,INFINITE,false); if (dfsFile) { IDistributedSuperFile *super = dfsFile->querySuperFile(); diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index 14c3968691c..3c11d8c9038 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -315,7 +315,7 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager { auto func = [&job, &lfn, accessMode, privilegedUser](unsigned timeout) { - return wsdfs::lookup(lfn, job.queryUserDescriptor(), accessMode, false, false, nullptr, privilegedUser, timeout); + return wsdfs::lookup(lfn, job.queryUserDescriptor(), accessMode, false, false, nullptr, privilegedUser, timeout, false); }; VStringBuffer blockedMsg("lock file '%s' for %s access", lfn.get(), isWrite(accessMode) ? "WRITE" : "READ");