diff --git a/dockerfiles/platform-build/README.md b/dockerfiles/platform-build/README.md index 8d2c9dd1f67..8b2e9fec6d4 100644 --- a/dockerfiles/platform-build/README.md +++ b/dockerfiles/platform-build/README.md @@ -18,7 +18,7 @@ on the build behavior remains exactly the same. Default of signing_secret is empty. Populate this variable within your Github Secrets for the repository with an exported armored secret key. It is hidden by Github Actions from view and is hidden from the docker image layers by using -BuildKit and [secret mounting](https://docs.docker.com.xy2401.com/develop/develop-images/build_enhancements/#new-docker-build-secret-information). +BuildKit and [secret mounting](https://docs.docker.com/build/building/secrets/). This ensures that the secret key does not leak into the final docker image layers. > signing_keyid: ${{ secrets.SIGNING_KEYID }} diff --git a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/WorkunitTimeStamps.xml b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/WorkunitTimeStamps.xml index e6ca5cb1dda..05ed0b9d6fe 100644 --- a/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/WorkunitTimeStamps.xml +++ b/docs/EN_US/ECLStandardLibraryReference/SLR-Mods/WorkunitTimeStamps.xml @@ -4,21 +4,17 @@ WorkunitTimeStamps - STD.System.Workunit.WorkunitTimeStamps + STD.System.Workunit.WorkunitTimeStamps + STD.System.Workunit.WorkunitTimeStamps - - + System.Workunit.WorkunitTimeStamps - - + Workunit.WorkunitTimeStamps - - + WorkunitTimeStamps - -( - wuid ) + ( wuid ) @@ -46,7 +42,7 @@ The WorkunitTimeStamps function returns a DATASET with this format: - EXPORT WsTimeStamp := RECORD + EXPORT TimeStampRecord := RECORD STRING32 application; STRING16 id; STRING20 time; @@ -58,14 +54,15 @@ END; Example: - OUTPUT(STD.System.Workunit.WorkunitTimeStamps('W20070308-164946')); + OUTPUT(STD.System.Workunit.WorkunitTimeStamps('W20240801-122755')); + /* produces output like this: -'workunit ','Created ','2008-02-13T18:28:20Z',' ' -'workunit ','Modified','2008-02-13T18:32:47Z',' ' -'EclServer ','Compiled','2008-02-13T18:28:20Z','10.173.9.2:0 ' -'EclAgent ','Started ','2008-02-13T18:32:35Z','training009003' -'Thor - graph1','Finished','2008-02-13T18:32:47Z','training009004' -'Thor - graph1','Started ','2008-02-13T18:32:13Z','training009004' -'EclAgent ','Finished','2008-02-13T18:33:09Z','training009003' +'workunit ','Created ','2024-08-01T16:28:20Z',' ' +'workunit ','Modified','2024-08-01T16:32:47Z',' ' +'EclServer ','Compiled','2024-08-01T16:28:20Z','172.31.4.17' +'EclAgent ','Started ','2024-08-01T16:32:35Z','172.31.4.17' +'Thor - graph1','Finished','2024-08-01T16:32:47Z','172.31.4.17' +'Thor - graph1','Started ','2024-08-01T16:32:13Z','172.31.4.17' +'EclAgent ','Finished','2024-08-01T16:33:09Z','172.31.4.17' */ diff --git a/ecl/agentexec/agentexec.cpp b/ecl/agentexec/agentexec.cpp index d001bd8eeba..5582af12177 100644 --- a/ecl/agentexec/agentexec.cpp +++ b/ecl/agentexec/agentexec.cpp @@ -261,6 +261,7 @@ class WaitThread : public CInterfaceOf virtual void threadmain() override { Owned exception; + bool sharedK8sJob = false; try { StringAttr jobSpecName(apptype); @@ -276,6 +277,7 @@ class WaitThread : public CInterfaceOf bool useChildProcesses = compConfig->getPropBool("@useChildProcesses"); if (isContainerized() && !useChildProcesses) { + sharedK8sJob = true; constexpr unsigned queueWaitingTimeoutMs = 10000; constexpr unsigned queueWaitingCheckPeriodMs = 1000; if (!owner.lingerQueue || !queueJobIfQueueWaiting(owner.lingerQueue, item, queueWaitingCheckPeriodMs, queueWaitingCheckPeriodMs)) @@ -337,13 +339,26 @@ class WaitThread : public CInterfaceOf { EXCLOG(exception); Owned factory = getWorkUnitFactory(); - Owned workunit = factory->updateWorkUnit(wuid); - if (workunit) + Owned cw = factory->openWorkUnit(wuid); + if (cw) { - workunit->setState(WUStateFailed); - StringBuffer eStr; - addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0); - workunit->commit(); + // if either a) NOT a thoragent with useChildProcesses=false (default in k8s config) or b) is still in an executing state + if (!sharedK8sJob || (cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait)) + { + // For a shared k8s job, i.e. where this agent is thoragent launching shared (multiJobLinger) k8s jobs + // the job agent should handle the job state. + // In that scenario, this is a fallback that should only come into effect if the job workflow instance has failed to handle the exception + // e.g. because it abruptly disappeared. + Owned workunit = &cw->lock(); + // recheck now locked + if ((workunit->getState() == WUStateRunning) || (workunit->getState() == WUStateBlocked) || (workunit->getState() == WUStateWait)) + { + workunit->setState(WUStateFailed); + StringBuffer eStr; + addExceptionToWorkunit(workunit, SeverityError, "agentexec", exception->errorCode(), exception->errorMessage(eStr).str(), nullptr, 0, 0, 0); + workunit->commit(); + } + } } } } diff --git a/ecl/hqlcpp/hqlcatom.cpp b/ecl/hqlcpp/hqlcatom.cpp index 12c1995114d..aa91c5b96d4 100644 --- a/ecl/hqlcpp/hqlcatom.cpp +++ b/ecl/hqlcpp/hqlcatom.cpp @@ -533,6 +533,7 @@ IIdAtom * regexNewStrFindId; IIdAtom * regexNewStrFoundId; IIdAtom * regexNewStrFoundXId; IIdAtom * regexNewStrReplaceXId; +IIdAtom * regexNewStrReplaceFixedId; IIdAtom * regexNewUStrFindId; IIdAtom * regexNewU8StrFindId; IIdAtom * regexNewUStrFoundId; @@ -540,7 +541,9 @@ IIdAtom * regexNewU8StrFoundId; IIdAtom * regexNewUStrFoundXId; IIdAtom * regexNewU8StrFoundXId; IIdAtom * regexNewUStrReplaceXId; +IIdAtom * regexNewUStrReplaceFixedId; IIdAtom * regexNewU8StrReplaceXId; +IIdAtom * regexNewU8StrReplaceFixedId; IIdAtom * regexMatchSetId; IIdAtom * regexUStrMatchSetId; IIdAtom * regexU8StrMatchSetId; @@ -1218,6 +1221,7 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1) MAKEID(regexNewStrFound); MAKEID(regexNewStrFoundX); MAKEID(regexNewStrReplaceX); + MAKEID(regexNewStrReplaceFixed); MAKEID(regexNewUStrFind); MAKEID(regexNewU8StrFind); MAKEID(regexNewUStrFound); @@ -1225,7 +1229,9 @@ MODULE_INIT(INIT_PRIORITY_HQLATOM-1) MAKEID(regexNewUStrFoundX); MAKEID(regexNewU8StrFoundX); MAKEID(regexNewUStrReplaceX); + MAKEID(regexNewUStrReplaceFixed); MAKEID(regexNewU8StrReplaceX); + MAKEID(regexNewU8StrReplaceFixed); MAKEID(regexMatchSet); MAKEID(regexUStrMatchSet); MAKEID(regexU8StrMatchSet); diff --git a/ecl/hqlcpp/hqlcatom.hpp b/ecl/hqlcpp/hqlcatom.hpp index d2f4aef46ba..bf9e5c130c8 100644 --- a/ecl/hqlcpp/hqlcatom.hpp +++ b/ecl/hqlcpp/hqlcatom.hpp @@ -531,6 +531,7 @@ extern IIdAtom * regexNewStrFindId; extern IIdAtom * regexNewStrFoundId; extern IIdAtom * regexNewStrFoundXId; extern IIdAtom * regexNewStrReplaceXId; +extern IIdAtom * regexNewStrReplaceFixedId; extern IIdAtom * regexNewUStrFindId; extern IIdAtom * regexNewU8StrFindId; extern IIdAtom * regexNewUStrFoundId; @@ -538,7 +539,9 @@ extern IIdAtom * regexNewU8StrFoundId; extern IIdAtom * regexNewUStrFoundXId; extern IIdAtom * regexNewU8StrFoundXId; extern IIdAtom * regexNewUStrReplaceXId; +extern IIdAtom * regexNewUStrReplaceFixedId; extern IIdAtom * regexNewU8StrReplaceXId; +extern IIdAtom * regexNewU8StrReplaceFixedId; extern IIdAtom * regexMatchSetId; extern IIdAtom * regexUStrMatchSetId; extern IIdAtom * regexU8StrMatchSetId; diff --git a/ecl/hqlcpp/hqlcpp.cpp b/ecl/hqlcpp/hqlcpp.cpp index 5009be858f4..3189ca0955c 100644 --- a/ecl/hqlcpp/hqlcpp.cpp +++ b/ecl/hqlcpp/hqlcpp.cpp @@ -2226,8 +2226,28 @@ void HqlCppTranslator::buildFunctionCall(BuildCtx & ctx, IIdAtom * name, HqlExpr void HqlCppTranslator::callProcedure(BuildCtx & ctx, IIdAtom * name, HqlExprArray & args) { OwnedHqlExpr call = bindTranslatedFunctionCall(name, args); - assertex(call->queryExternalDefinition()); + IHqlExpression * funcdef = call->queryExternalDefinition(); + + assertex(funcdef); + + CHqlBoundExpr boundTimer, boundStart; + IHqlExpression * external = funcdef->queryChild(0); + bool isTimed = external->hasAttribute(timeAtom); + if (isTimed) + { + StringBuffer nameTemp; + const char * name = str(external->queryId()); + if (getStringValue(nameTemp, queryAttributeChild(external, timeAtom, 0)).length()) + name = nameTemp; + buildStartTimer(ctx, boundTimer, boundStart, name); + } + ctx.addExpr(call); + + if (isTimed) + { + buildStopTimer(ctx, boundTimer, boundStart); + } } bool HqlCppTranslator::getDebugFlag(const char * name, bool defValue) @@ -6697,6 +6717,10 @@ void HqlCppTranslator::doBuildAssignCast(BuildCtx & ctx, const CHqlBoundTarget & //don't do this if the target type is unicode at the moment ignoreStretched = isStringType(targetType); break; + case no_regex_replace: + // replacing into a fixed-sized target should not require a temp + useTemp = false; + break; } if (ignoreStretched) diff --git a/ecl/hqlcpp/hqlcppsys.ecl b/ecl/hqlcpp/hqlcppsys.ecl index 050b35591c3..4919983331a 100644 --- a/ecl/hqlcpp/hqlcppsys.ecl +++ b/ecl/hqlcpp/hqlcppsys.ecl @@ -558,6 +558,7 @@ const char * cppSystemText[] = { " boolean regexNewStrFound() : method,pure,entrypoint='found';" " string regexNewStrFoundX(unsigned4 idx) : method,pure,entrypoint='getMatchX';" " string regexNewStrReplaceX(const string _search, const string _replace) : method,pure,entrypoint='replace',time('REGEXREPLACE');" + " regexNewStrReplaceFixed(noconst string _tgt, const string _search, const string _replace) : method,pure,entrypoint='replaceFixed',time('REGEXREPLACE');" " set of string regexMatchSet(const string _search) : method,pure,entrypoint='getMatchSet',time('REGEXFINDSET');" " regexNewSetUStrPattern(const varunicode _pattern, boolean isCaseSensitive) : omethod,entrypoint='setPattern',time('CompileUnicodeRegex');" @@ -565,6 +566,7 @@ const char * cppSystemText[] = { " boolean regexNewUStrFound() : method,pure,entrypoint='found';" " unicode regexNewUStrFoundX(unsigned4 idx) : method,pure,entrypoint='getMatchX';" " unicode regexNewUStrReplaceX(const unicode _search, const unicode _replace) : method,pure,entrypoint='replace',time('REGEXREPLACE');" + " regexNewUStrReplaceFixed(noconst unicode _tgt, const unicode _search, const unicode _replace) : method,pure,entrypoint='replaceFixed',time('REGEXREPLACE');" " set of unicode regexUStrMatchSet(const unicode _search) : method,pure,entrypoint='getMatchSet',time('REGEXFINDSET');" " regexNewSetU8StrPattern(const utf8 _pattern, boolean isCaseSensitive) : omethod,entrypoint='setPattern',time('CompileUTF8Regex');" @@ -572,6 +574,7 @@ const char * cppSystemText[] = { " boolean regexNewU8StrFound() : method,pure,entrypoint='found';" " utf8 regexNewU8StrFoundX(unsigned4 idx) : method,pure,entrypoint='getMatchX';" " utf8 regexNewU8StrReplaceX(const utf8 _search, const utf8 _replace) : method,pure,entrypoint='replace',time('REGEXREPLACE');" + " regexNewU8StrReplaceFixed(noconst utf8 _tgt, const utf8 _search, const utf8 _replace) : method,pure,entrypoint='replaceFixed',time('REGEXREPLACE');" " set of utf8 regexU8StrMatchSet(const utf8 _search) : method,pure,entrypoint='getMatchSet',time('REGEXFINDSET');" //clibrary functions that are called from the code generation diff --git a/ecl/hqlcpp/hqlhtcpp.cpp b/ecl/hqlcpp/hqlhtcpp.cpp index b31e9667a7e..36abc0f758e 100644 --- a/ecl/hqlcpp/hqlhtcpp.cpp +++ b/ecl/hqlcpp/hqlhtcpp.cpp @@ -18659,20 +18659,54 @@ void HqlCppTranslator::doBuildNewRegexFindReplace(BuildCtx & ctx, const CHqlBoun // as long as the find instance. Only exception could be if call created a temporary class instance. if (expr->getOperator() == no_regex_replace) { - HqlExprArray args; - args.append(*LINK(compiled)); - args.append(*LINK(search)); - args.append(*LINK(expr->queryChild(2))); - IIdAtom * func = nullptr; - if (isUTF8Type(searchStringType)) - func = regexNewU8StrReplaceXId; - else if (isUnicodeType(searchStringType)) - func = regexNewUStrReplaceXId; + // If the target is a preallocated fixed-length buffer and the + // datatype matches the result expression datatype, we can call an optimized replace function + if (target && target->isFixedSize() && target->queryType()->getTypeCode() == expr->queryType()->getTypeCode()) + { + // We need to build our arguments manually because we need to + // pass the size of the output buffer (the target) as an argument + IHqlExpression * targetVar = target->expr; + unsigned targetSize = target->queryType()->getStringLen(); + + CHqlBoundExpr searchExpr, replaceExpr; + buildCachedExpr(ctx, search, searchExpr); + buildCachedExpr(ctx, expr->queryChild(2), replaceExpr); + + HqlExprArray args; + args.append(*LINK(compiled)); // instance on which method is called + args.append(*getSizetConstant(targetSize)); // size of the output buffer in code units + args.append(*getElementPointer(targetVar)); // pointer to the output buffer + args.append(*getBoundLength(searchExpr)); // length of regex expression, in characters + args.append(*LINK(searchExpr.expr)); // pointer to regex expression + args.append(*getBoundLength(replaceExpr)); // length of replacement expression, in characters + args.append(*LINK(replaceExpr.expr)); // pointer to replacement expression + + IIdAtom * func = nullptr; + if (isUTF8Type(searchStringType)) + func = regexNewU8StrReplaceFixedId; + else if (isUnicodeType(searchStringType)) + func = regexNewUStrReplaceFixedId; + else + func = regexNewStrReplaceFixedId; + callProcedure(ctx, func, args); + } else - func = regexNewStrReplaceXId; - OwnedHqlExpr call = bindFunctionCall(func, args); - //Need to associate??? - buildExprOrAssign(ctx, target, call, bound); + { + HqlExprArray args; + args.append(*LINK(compiled)); + args.append(*LINK(search)); + args.append(*LINK(expr->queryChild(2))); + IIdAtom * func = nullptr; + if (isUTF8Type(searchStringType)) + func = regexNewU8StrReplaceXId; + else if (isUnicodeType(searchStringType)) + func = regexNewUStrReplaceXId; + else + func = regexNewStrReplaceXId; + OwnedHqlExpr call = bindFunctionCall(func, args); + //Need to associate??? + buildExprOrAssign(ctx, target, call, bound); + } } else { diff --git a/esp/src/eclwatch/CurrentUserDetailsWidget.js b/esp/src/eclwatch/CurrentUserDetailsWidget.js index b96e014a669..eebf772193a 100644 --- a/esp/src/eclwatch/CurrentUserDetailsWidget.js +++ b/esp/src/eclwatch/CurrentUserDetailsWidget.js @@ -26,89 +26,96 @@ define([ ], function (declare, lang, nlsHPCCMod, dom, domForm, arrayUtil, - registry, - _Widget, WsAccount, - template) { - - var nlsHPCC = nlsHPCCMod.default; - return declare("CurrentUserDetailsWidget", [_Widget], { - templateString: template, - baseClass: "CurrentUserDetailsWidget", - i18n: nlsHPCC, - - user: null, - - getTitle: function () { - return this.i18n.UserDetails; - }, - - postCreate: function (args) { - this.inherited(arguments); - this.userForm = registry.byId(this.id + "UserForm"); - }, - - resize: function (args) { - this.inherited(arguments); - this.widget.BorderContainer.resize(); - }, - - // Hitched actions --- - _onSave: function (event) { - var context = this; - var dialog = this.params.Widget; - - if (this.userForm.validate()) { - var formInfo = domForm.toObject(this.id + "UserForm"); - WsAccount.UpdateUser({ - showOkMsg: true, - request: { - username: this.user, - oldpass: formInfo.oldPassword, - newpass1: formInfo.newPassword, - newpass2: formInfo.newPassword - } - }).then(function (response) { - if (lang.exists("UpdateUserResponse", response)) { - arrayUtil.forEach(context.userForm.getDescendants(), function (item, idx) { - item.set("value", ""); - }); - } - }); - dialog.hide(); - } - }, - - // Implementation --- - init: function (params) { - if (this.inherited(arguments)) - return; - - this.user = params.Username; - this.refresh(); - }, - - refresh: function () { - if (this.user) { - this.updateInput("User", null, this.user); - this.updateInput("Username", null, this.user); - + registry, + _Widget, WsAccount, + template) { + + var nlsHPCC = nlsHPCCMod.default; + return declare("CurrentUserDetailsWidget", [_Widget], { + templateString: template, + baseClass: "CurrentUserDetailsWidget", + i18n: nlsHPCC, + + user: null, + canUpdatePassword: false, + + getTitle: function () { + return this.i18n.UserDetails; + }, + + postCreate: function (args) { + this.inherited(arguments); + this.userForm = registry.byId(this.id + "UserForm"); + }, + + resize: function (args) { + this.inherited(arguments); + this.widget.BorderContainer.resize(); + }, + + // Hitched actions --- + _onSave: function (event) { var context = this; - WsAccount.MyAccount({ - }).then(function (response) { - if (lang.exists("MyAccountResponse.firstName", response)) { - context.updateInput("FirstName", null, response.MyAccountResponse.firstName); - } - if (lang.exists("MyAccountResponse.employeeID", response)) { - context.updateInput("EmployeeID", null, response.MyAccountResponse.employeeID); - } - if (lang.exists("MyAccountResponse.lastName", response)) { - context.updateInput("LastName", null, response.MyAccountResponse.lastName); - } - if (lang.exists("MyAccountResponse.passwordExpiration", response)) { - context.updateInput("PasswordExpiration", null, response.MyAccountResponse.passwordExpiration); - } - }); + var dialog = this.params.Widget; + + if (this.canUpdatePassword && this.userForm.validate()) { + var formInfo = domForm.toObject(this.id + "UserForm"); + WsAccount.UpdateUser({ + showOkMsg: true, + request: { + username: this.user, + oldpass: formInfo.oldPassword, + newpass1: formInfo.newPassword, + newpass2: formInfo.newPassword + } + }).then(function (response) { + if (lang.exists("UpdateUserResponse", response)) { + arrayUtil.forEach(context.userForm.getDescendants(), function (item, idx) { + item.set("value", ""); + }); + } + }); + dialog.hide(); + } + }, + + // Implementation --- + init: function (params) { + if (this.inherited(arguments)) + return; + + this.user = params.Username; + this.refresh(); + }, + + refresh: function () { + if (this.user) { + this.updateInput("User", null, this.user); + this.updateInput("Username", null, this.user); + + var context = this; + WsAccount.MyAccount({ + }).then(function (response) { + if (lang.exists("MyAccountResponse.firstName", response)) { + context.updateInput("FirstName", null, response.MyAccountResponse.firstName); + } + if (lang.exists("MyAccountResponse.employeeID", response)) { + context.updateInput("EmployeeID", null, response.MyAccountResponse.employeeID); + } + if (lang.exists("MyAccountResponse.lastName", response)) { + context.updateInput("LastName", null, response.MyAccountResponse.lastName); + } + if (lang.exists("MyAccountResponse.passwordExpiration", response)) { + context.updateInput("PasswordExpiration", null, response.MyAccountResponse.passwordExpiration); + } + if (!response?.MyAccountResponse?.CanUpdatePassword) { + context.setDisabled("dijit_form_ValidationTextBox_0", true); + context.setDisabled("dojox_form__NewPWBox_0", true); + context.setDisabled("dojox_form__VerifyPWBox_0", true); + } + context.canUpdatePassword = response?.MyAccountResponse?.CanUpdatePassword; + }); + } } - } + }); }); -}); diff --git a/esp/src/eclwatch/WUQueryWidget.js b/esp/src/eclwatch/WUQueryWidget.js index 8936500ec04..6cffa29a6f3 100644 --- a/esp/src/eclwatch/WUQueryWidget.js +++ b/esp/src/eclwatch/WUQueryWidget.js @@ -158,19 +158,19 @@ define([ }, _onSetToFailed: function (event) { - WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "SetToFailed"); + WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "SetToFailed").then(() => this.refreshGrid()); }, _onAbort: function (event) { - WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Abort"); + WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Abort").then(() => this.refreshGrid()); }, _onProtect: function (event) { - WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Protect"); + WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Protect").then(() => this.refreshGrid()); }, _onUnprotect: function (event) { - WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Unprotect"); + WsWorkunits.WUAction(this.workunitsGrid.getSelected(), "Unprotect").then(() => this.refreshGrid()); }, _onReschedule: function (event) { diff --git a/esp/src/package-lock.json b/esp/src/package-lock.json index bb858aa4279..28aef7baf94 100644 --- a/esp/src/package-lock.json +++ b/esp/src/package-lock.json @@ -18,7 +18,7 @@ "@hpcc-js/chart": "2.83.4", "@hpcc-js/codemirror": "2.62.1", "@hpcc-js/common": "2.71.18", - "@hpcc-js/comms": "2.94.0", + "@hpcc-js/comms": "2.94.1", "@hpcc-js/dataflow": "8.1.7", "@hpcc-js/eclwatch": "2.74.8", "@hpcc-js/graph": "2.85.16", @@ -2079,9 +2079,9 @@ } }, "node_modules/@hpcc-js/comms": { - "version": "2.94.0", - "resolved": "https://registry.npmjs.org/@hpcc-js/comms/-/comms-2.94.0.tgz", - "integrity": "sha512-+AfJsqj648638hTUeLYd0Thvu1QMHX9zLflrep2xVtz7Wo1OmOiI/mrjClMqK8A8drMa3AduKuQS1R2rL15wZw==", + "version": "2.94.1", + "resolved": "https://registry.npmjs.org/@hpcc-js/comms/-/comms-2.94.1.tgz", + "integrity": "sha512-ROCHmHogsZ5/G9LsRPHRIx25rpVoR9d5TakbSkXeSugRkhPV+16+3C/Lfd4d8ZzNLR99Jmnvjd0NXZAMgpAkGQ==", "dependencies": { "@hpcc-js/ddl-shim": "^2.21.0", "@hpcc-js/util": "^2.52.0", diff --git a/esp/src/package.json b/esp/src/package.json index a30728c2cf2..7a4508f1cf3 100644 --- a/esp/src/package.json +++ b/esp/src/package.json @@ -44,7 +44,7 @@ "@hpcc-js/chart": "2.83.4", "@hpcc-js/codemirror": "2.62.1", "@hpcc-js/common": "2.71.18", - "@hpcc-js/comms": "2.94.0", + "@hpcc-js/comms": "2.94.1", "@hpcc-js/dataflow": "8.1.7", "@hpcc-js/eclwatch": "2.74.8", "@hpcc-js/graph": "2.85.16", diff --git a/esp/src/src-react/components/MyAccount.tsx b/esp/src/src-react/components/MyAccount.tsx index 51130178fca..09fb51bc9d2 100644 --- a/esp/src/src-react/components/MyAccount.tsx +++ b/esp/src/src-react/components/MyAccount.tsx @@ -1,10 +1,11 @@ import * as React from "react"; import { DefaultButton, Dialog, DialogFooter, DialogType, MessageBar, MessageBarType, PrimaryButton } from "@fluentui/react"; +import { useConst } from "@fluentui/react-hooks"; import { AccountService, WsAccount } from "@hpcc-js/comms"; import { scopedLogger } from "@hpcc-js/util"; +import { PasswordStatus } from "../hooks/user"; import nlsHPCC from "src/nlsHPCC"; import { TableGroup } from "./forms/Groups"; -import { useConst } from "@fluentui/react-hooks"; const logger = scopedLogger("src-react/components/MyAccount.tsx"); @@ -36,8 +37,14 @@ export const MyAccount: React.FunctionComponent = ({ }; }, [currentUser]); + const resetForm = React.useCallback(() => { + setOldPassword(""); + setNewPassword1(""); + setNewPassword2(""); + }, []); + const saveUser = React.useCallback(() => { - if (oldPassword !== "" && newPassword1 !== "") { + if (currentUser?.CanUpdatePassword && oldPassword !== "" && newPassword1 !== "") { service.UpdateUser({ username: currentUser.username, oldpass: oldPassword, @@ -51,13 +58,14 @@ export const MyAccount: React.FunctionComponent = ({ } else { setShowError(false); setErrorMessage(""); + resetForm(); onClose(); } }) .catch(err => logger.error(err)) ; } - }, [currentUser, newPassword1, newPassword2, oldPassword, onClose, service]); + }, [currentUser, newPassword1, newPassword2, oldPassword, onClose, resetForm, service]); return ; }; diff --git a/esp/src/src-react/components/Result.tsx b/esp/src/src-react/components/Result.tsx index a9d47294417..3490153b800 100644 --- a/esp/src/src-react/components/Result.tsx +++ b/esp/src/src-react/components/Result.tsx @@ -1,6 +1,6 @@ import * as React from "react"; import * as ReactDOM from "react-dom"; -import { Checkbox, CommandBar, ContextualMenuItemType, DefaultButton, Dialog, DialogFooter, DialogType, ICommandBarItemProps, PrimaryButton, SpinButton, Stack } from "@fluentui/react"; +import { Checkbox, CommandBar, ContextualMenuItemType, DefaultButton, Dialog, DialogFooter, DialogType, ICommandBarItemProps, PrimaryButton, SpinButton, Spinner, Stack } from "@fluentui/react"; import { useConst } from "@fluentui/react-hooks"; import { Result as CommsResult, XSDXMLNode } from "@hpcc-js/comms"; import { scopedLogger } from "@hpcc-js/util"; @@ -247,6 +247,7 @@ export const Result: React.FunctionComponent = ({ const [wu] = useWorkunit(wuid); const [result, setResult] = React.useState(resultTable.calcResult()); const [FilterFields, setFilterFields] = React.useState({}); + const [loading, setLoading] = React.useState(true); const [showFilter, setShowFilter] = React.useState(false); React.useEffect(() => { @@ -276,6 +277,7 @@ export const Result: React.FunctionComponent = ({ }; }); setFilterFields(filterFields); + setLoading(false); }).catch(err => { logger.error(err); if (err.message.indexOf("Cannot open the workunit result") > -1) { @@ -370,7 +372,10 @@ export const Result: React.FunctionComponent = ({ header={} main={ <> - + {loading ? + : + + } diff --git a/esp/src/src-react/components/Title.tsx b/esp/src/src-react/components/Title.tsx index f42b2e6ad77..5257ba34747 100644 --- a/esp/src/src-react/components/Title.tsx +++ b/esp/src/src-react/components/Title.tsx @@ -15,7 +15,7 @@ import { replaceUrl } from "../util/history"; import { useECLWatchLogger } from "../hooks/logging"; import { useBuildInfo, useModernMode, useCheckFeatures } from "../hooks/platform"; import { useGlobalStore } from "../hooks/store"; -import { useMyAccount, useUserSession } from "../hooks/user"; +import { PasswordStatus, useMyAccount, useUserSession } from "../hooks/user"; import { TitlebarConfig } from "./forms/TitlebarConfig"; import { switchTechPreview } from "./controls/ComingSoon"; @@ -246,10 +246,10 @@ export const DevTitle: React.FunctionComponent = ({ // cookie expires option expects whole number of days, use a decimal < 1 for hours cookie("PasswordExpiredCheck", "true", { expires: 0.5, path: "/" }); switch (currentUser.passwordDaysRemaining) { - case -1: // password has expired + case PasswordStatus.Expired: setPasswordExpiredConfirm(true); break; - case -2: // password never expires + case PasswordStatus.NeverExpires: case null: break; default: diff --git a/esp/src/src-react/components/Workunits.tsx b/esp/src/src-react/components/Workunits.tsx index b2047850ee7..b4ff6a3a291 100644 --- a/esp/src/src-react/components/Workunits.tsx +++ b/esp/src/src-react/components/Workunits.tsx @@ -213,7 +213,7 @@ export const Workunits: React.FunctionComponent = ({ { key: "divider_2", itemType: ContextualMenuItemType.Divider, onRender: () => }, { key: "setFailed", text: nlsHPCC.SetToFailed, disabled: !uiState.hasNotProtected, - onClick: () => { WsWorkunits.WUAction(selection, "SetToFailed"); } + onClick: () => { WsWorkunits.WUAction(selection, "SetToFailed").then(() => refreshTable.call()); } }, { key: "abort", text: nlsHPCC.Abort, disabled: !uiState.hasNotCompleted, diff --git a/esp/src/src-react/components/forms/Fields.tsx b/esp/src/src-react/components/forms/Fields.tsx index 6ec76f804a4..5ae9c59e80e 100644 --- a/esp/src/src-react/components/forms/Fields.tsx +++ b/esp/src/src-react/components/forms/Fields.tsx @@ -890,6 +890,7 @@ export function createInputs(fields: Fields, onChange?: (id: string, newValue: a onChange={(evt, newValue) => onChange(fieldID, newValue)} borderless={field.readonly && !field.multiline} readOnly={field.readonly} + disabled={field.disabled(field) ? true : false} required={field.required} multiline={field.multiline} errorMessage={field.errorMessage ?? ""} diff --git a/esp/src/src-react/hooks/user.ts b/esp/src/src-react/hooks/user.ts index 005ffc4ce5e..a1b49dab49e 100644 --- a/esp/src/src-react/hooks/user.ts +++ b/esp/src/src-react/hooks/user.ts @@ -12,6 +12,12 @@ const defaults = { const userSession = { ...defaults }; +export enum PasswordStatus { + NeverExpires = -2, + Expired = -1, + Unexpired = 0, +} + export interface UserSession { ESPSessionTimeout: number; ESPAuthenticated: boolean; diff --git a/esp/src/src/ESPWorkunit.ts b/esp/src/src/ESPWorkunit.ts index 77b406e53fd..0b7713df67f 100644 --- a/esp/src/src/ESPWorkunit.ts +++ b/esp/src/src/ESPWorkunit.ts @@ -1096,7 +1096,7 @@ export function CreateWUQueryStore(): BaseStore section for an optional "regex" subsection with a "cacheSize" attribute * By default, the maximum cache size is set to 500 patterns. Override with 0 to disable caching. */ @@ -248,7 +292,7 @@ class CStrRegExprFindInstance : implements IStrRegExprFindInstance private: bool matched = false; std::shared_ptr compiledRegex = nullptr; - pcre2_match_data_8 * matchData = nullptr; + PCRE2MatchData8 matchData; const char * subject = nullptr; // points to current subject of regex; do not free char * sample = nullptr; //only required if findstr/findvstr will be called @@ -267,7 +311,7 @@ class CStrRegExprFindInstance : implements IStrRegExprFindInstance if (_keep) { sample = (char *)rtlMalloc(subjectSize + 1); //required for findstr - memcpy(sample, _subject + subjectOffset, subjectSize); + memcpy_iflen(sample, _subject + subjectOffset, subjectSize); sample[subjectSize] = '\0'; subject = sample; } @@ -295,7 +339,6 @@ class CStrRegExprFindInstance : implements IStrRegExprFindInstance { if (sample) rtlFree(sample); - pcre2_match_data_free_8(matchData); } //IStrRegExprFindInstance @@ -310,7 +353,7 @@ class CStrRegExprFindInstance : implements IStrRegExprFindInstance const char * matchStart = subject + ovector[2 * n]; outlen = ovector[2 * n + 1] - ovector[2 * n]; out = (char *)rtlMalloc(outlen); - memcpy(out, matchStart, outlen); + memcpy_iflen(out, matchStart, outlen); } else { @@ -328,7 +371,7 @@ class CStrRegExprFindInstance : implements IStrRegExprFindInstance unsigned substrLen = ovector[2 * n + 1] - ovector[2 * n]; if (substrLen >= outlen) substrLen = outlen - 1; - memcpy(out, matchStart, substrLen); + memcpy_iflen(out, matchStart, substrLen); out[substrLen] = 0; } else @@ -376,12 +419,11 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr void replace(size32_t & outlen, char * & out, size32_t slen, char const * str, size32_t rlen, char const * replace) const { - PCRE2_SIZE pcreLen = 0; outlen = 0; - pcre2_match_data_8 * matchData = pcre2_match_data_create_from_pattern_8(compiledRegex.get(), pcre2GeneralContext8); + PCRE2MatchData8 matchData = pcre2_match_data_create_from_pattern_8(compiledRegex.get(), pcre2GeneralContext8); // This method is often called through an ECL interface and the provided lengths - // (slen and rlen) are in characters, not bytes; we need to convert these to a + // (slen and rlen) are in code points (characters), not bytes; we need to convert these to a // byte count for PCRE2 size32_t sourceSize = (isUTF8Enabled ? rtlUtf8Size(slen, str) : slen); size32_t replaceSize = (isUTF8Enabled ? rtlUtf8Size(rlen, replace) : rlen); @@ -393,47 +435,50 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr if (numMatches < 0 && numMatches != PCRE2_ERROR_NOMATCH) { // Treat everything other than PCRE2_ERROR_NOMATCH as an error - pcre2_match_data_free_8(matchData); failWithPCRE2Error(numMatches, "Error in regex replace: "); } if (numMatches > 0) { uint32_t replaceOptions = PCRE2_SUBSTITUTE_MATCHED|PCRE2_SUBSTITUTE_GLOBAL|PCRE2_SUBSTITUTE_EXTENDED; + PCRE2_SIZE pcreSize = 0; - // Call substitute once to get the size of the output, then allocate memory for it; - // Note that pcreLen will include space for a terminating null character; - // we have to allocate memory for that byte to avoid a buffer overrun, - // but we won't count that terminating byte - int replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, nullptr, &pcreLen); + // Call substitute once to get the size of the output (pushed into pcreSize); + // note that pcreSize will include space for a terminating null character even though we don't want it + int replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, nullptr, &pcreSize); if (replaceResult < 0 && replaceResult != PCRE2_ERROR_NOMEMORY) { - // PCRE2_ERROR_NOMEMORY is a normal result when we're just asking for the size of the output - pcre2_match_data_free_8(matchData); + // PCRE2_ERROR_NOMEMORY is a normal result when we're just asking for the size of the output; + // everything else is an error failWithPCRE2Error(replaceResult, "Error in regex replace: "); } - if (pcreLen > 0) + if (pcreSize > 1) { - out = (char *)rtlMalloc(pcreLen); + out = (char *)rtlMalloc(pcreSize); - replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, (PCRE2_UCHAR8 *)out, &pcreLen); + replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, (PCRE2_UCHAR8 *)out, &pcreSize); - // Note that, weirdly, pcreLen will now contain the number of code points - // in the result *excluding* the null terminator, so pcreLen will + // Note that, weirdly, pcreSize will now contain the number of code points + // in the result *excluding* the null terminator, so pcreSize will // become our final result length if (replaceResult < 0) { - pcre2_match_data_free_8(matchData); failWithPCRE2Error(replaceResult, "Error in regex replace: "); } } + else + { + // The replacement results in an empty string + outlen = 0; + out = nullptr; + return; + } - pcre2_match_data_free_8(matchData); // We need to return the number of characters here, not the byte count - outlen = (isUTF8Enabled ? rtlUtf8Length(pcreLen, out) : pcreLen); + outlen = (isUTF8Enabled ? rtlUtf8Length(pcreSize, out) : pcreSize); } else { @@ -441,7 +486,137 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr out = (char *)rtlMalloc(sourceSize); memcpy_iflen(out, str, sourceSize); outlen = slen; - pcre2_match_data_free_8(matchData); + } + } + + // This method supports "fixed length UTF-8" even though that isn't really a thing; + // it's here more for completeness, in case we ever implement some version of it + void replaceFixed(size32_t tlen, char * tgt, size32_t slen, char const * str, size32_t rlen, char const * replace) const + { + if (tlen == 0) + return; + + PCRE2MatchData8 matchData = pcre2_match_data_create_from_pattern_8(compiledRegex.get(), pcre2GeneralContext8); + + // This method is often called through an ECL interface and the provided lengths + // (slen and rlen) are in code points (characters), not bytes; we need to convert these to a + // byte count for PCRE2 + size32_t sourceSize = (isUTF8Enabled ? rtlUtf8Size(slen, str) : slen); + size32_t replaceSize = (isUTF8Enabled ? rtlUtf8Size(rlen, replace) : rlen); + + // Execute an explicit match first to see if we match at all; if we do, matchData will be populated + // with data that can be used by pcre2_substitute to bypass some work + int numMatches = pcre2_match_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, 0, matchData, pcre2MatchContext8); + + if (numMatches < 0 && numMatches != PCRE2_ERROR_NOMATCH) + { + // Treat everything other than PCRE2_ERROR_NOMATCH as an error + failWithPCRE2Error(numMatches, "Error in regex replace: "); + } + + if (numMatches > 0) + { + uint32_t replaceOptions = PCRE2_SUBSTITUTE_MATCHED|PCRE2_SUBSTITUTE_GLOBAL|PCRE2_SUBSTITUTE_EXTENDED; + PCRE2_SIZE pcreSize = 0; + + // Call substitute once to get the size of the output and see if it will fit within fixedOutLen; + // if it does then we can substitute within the given buffer and then pad with spaces, if not then + // we have to allocate memory, substitute into that memory, then copy into the given buffer; + // note that pcreSize will include space for a terminating null character even though we don't want it + int replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, nullptr, &pcreSize); + + if (replaceResult < 0 && replaceResult != PCRE2_ERROR_NOMEMORY) + { + // PCRE2_ERROR_NOMEMORY is a normal result when we're just asking for the size of the output; + // everything else is an error + failWithPCRE2Error(replaceResult, "Error in regex replace: "); + } + + if (pcreSize > 1) + { + std::string tempBuffer; + bool useFixedBuffer = (pcreSize <= tlen); + char * replaceBuffer = nullptr; + + if (useFixedBuffer) + { + replaceBuffer = tgt; + } + else + { + tempBuffer.reserve(pcreSize); + replaceBuffer = (char *)tempBuffer.data(); + } + + replaceResult = pcre2_substitute_8(compiledRegex.get(), (PCRE2_SPTR8)str, sourceSize, 0, replaceOptions, matchData, pcre2MatchContext8, (PCRE2_SPTR8)replace, replaceSize, (PCRE2_UCHAR8 *)replaceBuffer, &pcreSize); + + if (replaceResult < 0) + { + failWithPCRE2Error(replaceResult, "Error in regex replace: "); + } + + // Note that after a successful replace, pcreSize will contain the number of code points in + // the result *excluding* the null terminator + + if (useFixedBuffer) + { + // We used the fixed buffer so we only need to pad the result with spaces + if (isUTF8Enabled) + { + memset_iflen(tgt + pcreSize, ' ', tlen - rtlUtf8Length(pcreSize, tgt)); + } + else + { + memset_iflen(tgt + pcreSize, ' ', tlen - pcreSize); + } + } + else + { + // We used a separate buffer, so we need to copy the result into the fixed buffer; + // temp buffer was larger so we don't have to worry about padding + if (isUTF8Enabled) + { + rtlUtf8ToUtf8(tlen, tgt, pcreSize, replaceBuffer); + } + else + { + memcpy_iflen(tgt, replaceBuffer, tlen); + } + + } + } + else + { + // The replacement results in an empty string + memset_iflen(tgt, ' ', tlen); + } + } + else + { + // No match found; return the original string + if (isUTF8Enabled) + { + if (tlen == slen) + { + memcpy_iflen(tgt, str, sourceSize); + } + else + { + rtlUtf8ToUtf8(tlen, tgt, slen, str); + } + } + else + { + if (slen <= tlen) + { + memcpy_iflen(tgt, str, sourceSize); + memset_iflen(tgt + sourceSize, ' ', tlen - sourceSize); + } + else + { + memcpy_iflen(tgt, str, tlen); + } + } } } @@ -458,7 +633,7 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr PCRE2_SIZE offset = 0; uint32_t matchOptions = 0; PCRE2_SIZE subjectSize = (isUTF8Enabled ? rtlUtf8Size(_subjectLen, _subject) : _subjectLen); - pcre2_match_data_8 * matchData = pcre2_match_data_create_from_pattern_8(compiledRegex.get(), pcre2GeneralContext8); + PCRE2MatchData8 matchData = pcre2_match_data_create_from_pattern_8(compiledRegex.get(), pcre2GeneralContext8); // Capture groups are ignored when gathering match results into a set, // so we will focus on only the first match (the entire matched string); @@ -479,7 +654,6 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr else { // Treat everything else as an error - pcre2_match_data_free_8(matchData); failWithPCRE2Error(numMatches, "Error in regex getMatchSet: "); } } @@ -495,7 +669,7 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr // Append the number of characters in the match * (size32_t *) outData = (isUTF8Enabled ? rtlUtf8Length(matchSize, matchStart) : matchSize); // Copy the bytes - memcpy(outData + sizeof(size32_t), matchStart, matchSize); + memcpy_iflen(outData + sizeof(size32_t), matchStart, matchSize); outBytes += matchSize + sizeof(size32_t); // Update search offset (which is in code units) @@ -511,8 +685,6 @@ class CCompiledStrRegExpr : implements ICompiledStrRegExpr } } - pcre2_match_data_free_8(matchData); - __isAllResult = false; __resultBytes = outBytes; __result = out.detachdata(); @@ -680,7 +852,7 @@ class CUStrRegExprFindInstance : implements IUStrRegExprFindInstance private: bool matched = false; std::shared_ptr compiledRegex = nullptr; - pcre2_match_data_16 * matchData = nullptr; + PCRE2MatchData16 matchData; const UChar * subject = nullptr; // points to current subject of regex; do not free public: @@ -702,10 +874,7 @@ class CUStrRegExprFindInstance : implements IUStrRegExprFindInstance } - ~CUStrRegExprFindInstance() //CAVEAT non-virtual destructor ! - { - pcre2_match_data_free_16(matchData); - } + ~CUStrRegExprFindInstance() = default; //IUStrRegExprFindInstance @@ -720,7 +889,7 @@ class CUStrRegExprFindInstance : implements IUStrRegExprFindInstance outlen = ovector[2 * n + 1] - ovector[2 * n]; PCRE2_SIZE outSize = outlen * sizeof(UChar); out = (UChar *)rtlMalloc(outSize); - memcpy(out, matchStart, outSize); + memcpy_iflen(out, matchStart, outSize); } else { @@ -738,7 +907,7 @@ class CUStrRegExprFindInstance : implements IUStrRegExprFindInstance unsigned substrLen = ovector[2 * n + 1] - ovector[2 * n]; if (substrLen >= outlen) substrLen = outlen - 1; - memcpy(out, matchStart, substrLen * sizeof(UChar)); + memcpy_iflen(out, matchStart, substrLen * sizeof(UChar)); out[substrLen] = 0; } else @@ -781,9 +950,8 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr void replace(size32_t & outlen, UChar * & out, size32_t slen, const UChar * str, size32_t rlen, UChar const * replace) const { - PCRE2_SIZE pcreLen = 0; outlen = 0; - pcre2_match_data_16 * matchData = pcre2_match_data_create_from_pattern_16(compiledRegex.get(), pcre2GeneralContext16); + PCRE2MatchData16 matchData = pcre2_match_data_create_from_pattern_16(compiledRegex.get(), pcre2GeneralContext16); // Execute an explicit match first to see if we match at all; if we do, matchData will be populated // with data that can be used by pcre2_substitute to bypass some work @@ -792,47 +960,50 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr if (numMatches < 0 && numMatches != PCRE2_ERROR_NOMATCH) { // Treat everything other than PCRE2_ERROR_NOMATCH as an error - pcre2_match_data_free_16(matchData); failWithPCRE2Error(numMatches, "Error in regex replace: "); } if (numMatches > 0) { uint32_t replaceOptions = PCRE2_SUBSTITUTE_MATCHED|PCRE2_SUBSTITUTE_GLOBAL|PCRE2_SUBSTITUTE_EXTENDED; + PCRE2_SIZE pcreSize = 0; // Call substitute once to get the size of the output, then allocate memory for it; - // Note that pcreLen will include space for a terminating null character; + // Note that pcreSize will include space for a terminating null character; // we have to allocate memory for that byte to avoid a buffer overrun, // but we won't count that terminating byte - int replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, nullptr, &pcreLen); + int replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, nullptr, &pcreSize); if (replaceResult < 0 && replaceResult != PCRE2_ERROR_NOMEMORY) { // PCRE2_ERROR_NOMEMORY is a normal result when we're just asking for the size of the output - pcre2_match_data_free_16(matchData); failWithPCRE2Error(replaceResult, "Error in regex replace: "); } - if (pcreLen > 0) + if (pcreSize > 1) { - out = (UChar *)rtlMalloc(pcreLen * sizeof(UChar)); + out = (UChar *)rtlMalloc(pcreSize * sizeof(UChar)); - replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, (PCRE2_UCHAR16 *)out, &pcreLen); + replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, (PCRE2_UCHAR16 *)out, &pcreSize); - // Note that, weirdly, pcreLen will now contain the number of code points - // in the result *excluding* the null terminator, so pcreLen will + // Note that, weirdly, pcreSize will now contain the number of code points + // in the result *excluding* the null terminator, so pcreSize will // become our final result length if (replaceResult < 0) { - pcre2_match_data_free_16(matchData); failWithPCRE2Error(replaceResult, "Error in regex replace: "); } } + else + { + // The replacement results in an empty string + outlen = 0; + out = nullptr; + return; + } - pcre2_match_data_free_16(matchData); - // We need to return the number of characters here, not the byte count - outlen = pcreLen; + outlen = pcreSize; } else { @@ -840,7 +1011,104 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr out = (UChar *)rtlMalloc(slen * sizeof(UChar)); memcpy_iflen(out, str, slen * sizeof(UChar)); outlen = slen; - pcre2_match_data_free_16(matchData); + } + } + + void replaceFixed(size32_t tlen, UChar * tgt, size32_t slen, UChar const * str, size32_t rlen, UChar const * replace) const + { + if (tlen == 0) + return; + + PCRE2MatchData16 matchData = pcre2_match_data_create_from_pattern_16(compiledRegex.get(), pcre2GeneralContext16); + + // Execute an explicit match first to see if we match at all; if we do, matchData will be populated + // with data that can be used by pcre2_substitute to bypass some work + int numMatches = pcre2_match_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, 0, matchData, pcre2MatchContext16); + + if (numMatches < 0 && numMatches != PCRE2_ERROR_NOMATCH) + { + // Treat everything other than PCRE2_ERROR_NOMATCH as an error + failWithPCRE2Error(numMatches, "Error in regex replace: "); + } + + if (numMatches > 0) + { + uint32_t replaceOptions = PCRE2_SUBSTITUTE_MATCHED|PCRE2_SUBSTITUTE_GLOBAL|PCRE2_SUBSTITUTE_EXTENDED; + PCRE2_SIZE pcreSize = 0; + + // Call substitute once to get the size of the output and see if it will fit within fixedOutLen; + // if it does then we can substitute within the given buffer and then pad with spaces, if not then + // we have to allocate memory, substitute into that memory, then copy into the given buffer; + // note that pcreSize will include space for a terminating null character even though we don't want it + int replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions|PCRE2_SUBSTITUTE_OVERFLOW_LENGTH, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, nullptr, &pcreSize); + + if (replaceResult < 0 && replaceResult != PCRE2_ERROR_NOMEMORY) + { + // PCRE2_ERROR_NOMEMORY is a normal result when we're just asking for the size of the output; + // everything else is an error + failWithPCRE2Error(replaceResult, "Error in regex replace: "); + } + + if (pcreSize > 1) + { + std::string tempBuffer; + bool useFixedBuffer = (pcreSize <= tlen); + UChar * replaceBuffer = nullptr; + + if (useFixedBuffer) + { + replaceBuffer = tgt; + } + else + { + tempBuffer.reserve(pcreSize * sizeof(UChar)); + replaceBuffer = (UChar *)tempBuffer.data(); + } + + replaceResult = pcre2_substitute_16(compiledRegex.get(), (PCRE2_SPTR16)str, slen, 0, replaceOptions, matchData, pcre2MatchContext16, (PCRE2_SPTR16)replace, rlen, (PCRE2_UCHAR16 *)replaceBuffer, &pcreSize); + + if (replaceResult < 0) + { + failWithPCRE2Error(replaceResult, "Error in regex replace: "); + } + + // Note that after a successful replace, pcreSize will contain the number of code points in + // the result *excluding* the null terminator + + if (useFixedBuffer) + { + // We used the fixed buffer so we only need to pad the result with spaces + while (pcreSize < tlen) + tgt[pcreSize++] = ' '; + } + else + { + // We used a separate buffer, so we need to copy the result into the fixed buffer; + // temp buffer was larger so we don't have to worry about padding + memcpy_iflen(tgt, replaceBuffer, (tlen * sizeof(UChar))); + } + } + else + { + // The replacement results in an empty string + size32_t pos = 0; + while (pos < tlen) + tgt[pos++] = ' '; + } + } + else + { + // No match found; return the original string + if (slen <= tlen) + { + memcpy_iflen(tgt, str, (slen * sizeof(UChar))); + while (slen < tlen) + tgt[slen++] = ' '; + } + else + { + memcpy_iflen(tgt, str, (tlen * sizeof(UChar))); + } } } @@ -856,7 +1124,7 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr size32_t outBytes = 0; PCRE2_SIZE offset = 0; uint32_t matchOptions = 0; - pcre2_match_data_16 * matchData = pcre2_match_data_create_from_pattern_16(compiledRegex.get(), pcre2GeneralContext16); + PCRE2MatchData16 matchData = pcre2_match_data_create_from_pattern_16(compiledRegex.get(), pcre2GeneralContext16); // Capture groups are ignored when gathering match results into a set, // so we will focus on only the first match (the entire matched string); @@ -877,7 +1145,6 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr else { // Treat everything else as an error - pcre2_match_data_free_16(matchData); failWithPCRE2Error(numMatches, "Error in regex getMatchSet: "); } } @@ -893,7 +1160,7 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr out.ensureAvailable(outBytes + matchSize + sizeof(size32_t)); byte * outData = out.getbytes() + outBytes; * (size32_t *) outData = matchLen; - memcpy(outData + sizeof(size32_t), matchStart, matchSize); + memcpy_iflen(outData + sizeof(size32_t), matchStart, matchSize); outBytes += matchSize + sizeof(size32_t); // Update offset @@ -909,8 +1176,6 @@ class CCompiledUStrRegExpr : implements ICompiledUStrRegExpr } } - pcre2_match_data_free_16(matchData); - __isAllResult = false; __resultBytes = outBytes; __result = out.detachdata(); diff --git a/rtl/eclrtl/eclrtl.hpp b/rtl/eclrtl/eclrtl.hpp index 6cb401b69b0..97beb5ff748 100644 --- a/rtl/eclrtl/eclrtl.hpp +++ b/rtl/eclrtl/eclrtl.hpp @@ -87,6 +87,7 @@ interface IStrRegExprFindInstance interface ICompiledStrRegExpr { virtual void replace(size32_t & outlen, char * & out, size32_t slen, char const * str, size32_t rlen, char const * replace) const = 0; + virtual void replaceFixed(size32_t tlen, char * tgt, size32_t slen, char const * str, size32_t rlen, char const * replace) const = 0; virtual IStrRegExprFindInstance * find(const char * str, size32_t from, size32_t len, bool needToKeepSearchString) const = 0; virtual void getMatchSet(bool & __isAllResult, size32_t & __resultBytes, void * & __result, size32_t _srcLen, const char * _search) = 0; }; @@ -101,6 +102,7 @@ interface IUStrRegExprFindInstance interface ICompiledUStrRegExpr { virtual void replace(size32_t & outlen, UChar * & out, size32_t slen, UChar const * str, size32_t rlen, UChar const * replace) const = 0; + virtual void replaceFixed(size32_t tlen, UChar * tgt, size32_t slen, UChar const * str, size32_t rlen, UChar const * replace) const = 0; virtual IUStrRegExprFindInstance * find(const UChar * str, size32_t from, size32_t len) const = 0; virtual void getMatchSet(bool & __isAllResult, size32_t & __resultBytes, void * & __result, size32_t _srcLen, const UChar * _search) = 0; }; diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index d7ea754f161..53c47476dd3 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -2768,7 +2768,9 @@ void CSocket::shutdown(unsigned mode) void CSocket::shutdownNoThrow(unsigned mode) { - if (state == ss_open) { + if (state == ss_open) + { + state = ss_shutdown; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%p)", mode, sock, sock, this); #endif diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index 898e1ceed3e..e68a7a13562 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -1846,6 +1846,7 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface { if (!parent) return false; + bool gc = false; // if a gc is hit, then will fall through to close socket try { while (true) // NB: breaks out if blocked (if (remaining) ..) @@ -1870,11 +1871,11 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface if (!gotPacketHdr) { CCycleTimer timer; - sock->readtms(activeptr, 0, remaining, szRead, timer.remainingMs(60000)); + gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, timer.remainingMs(60000)); remaining -= szRead; activeptr += szRead; if (remaining) // only possible if blocked. - return false; // wait for next notification + break; // wait for next notification PacketHeader &hdr = *(PacketHeader *)activemsg->bufferBase(); if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) @@ -1896,14 +1897,14 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface gotPacketHdr = true; } - if (remaining) + if (!gc && remaining) { - sock->readtms(activeptr, 0, remaining, szRead, WAIT_FOREVER); + gc = readtmsAllowClose(sock, activeptr, 0, remaining, szRead, WAIT_FOREVER); remaining -= szRead; activeptr += szRead; } if (remaining) // only possible if blocked. - return false; // wait for next notification + break; // wait for next notification #ifdef _FULLTRACE LOG(MCdebugInfo, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter); @@ -1937,6 +1938,8 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface } } while (activemsg); + if (gc) + break; } } catch (IException *e) @@ -1947,24 +1950,27 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface gotPacketHdr = false; } - // here due to error or graceful close, so close socket (ignore error as may be closed already) - try + if (gc) { - Linked pc; + // here due to error or graceful close, so close socket (ignore error as may be closed already) + try { - CriticalBlock block(sect); - if (parent) + Linked pc; { - pc.set(parent); // don't want channel to disappear during call - parent = NULL; + CriticalBlock block(sect); + if (parent) + { + pc.set(parent); // don't want channel to disappear during call + parent = NULL; + } } + if (pc) + pc->closeSocket(false, true); + } + catch (IException *e) + { + e->Release(); } - if (pc) - pc->closeSocket(false, true); - } - catch (IException *e) - { - e->Release(); } return false; } diff --git a/system/security/LdapSecurity/ldapconnection.cpp b/system/security/LdapSecurity/ldapconnection.cpp index 14fe3b7d9d5..35e74cd1450 100644 --- a/system/security/LdapSecurity/ldapconnection.cpp +++ b/system/security/LdapSecurity/ldapconnection.cpp @@ -1925,28 +1925,33 @@ class CLdapClient : implements ILdapClient, public CInterface StringBuffer hostbuf; int rc = LDAP_SERVER_DOWN; char *ldap_errstring=NULL; - for(int retries = 0; retries <= LDAPSEC_MAX_RETRIES; retries++) + for (int numHosts=0; numHosts < m_ldapconfig->getHostCount(); numHosts++) { - m_ldapconfig->getLdapHost(hostbuf);//get next available AD, as it may have changed - DBGLOG("LdapBind for user %s (retries=%d) on host %s.", username, retries, hostbuf.str()); + for(int retries = 0; retries <= LDAPSEC_MAX_RETRIES; retries++) { - LDAP* user_ld = LdapUtils::LdapInit(m_ldapconfig->getProtocol(), hostbuf.str(), m_ldapconfig->getLdapPort(), m_ldapconfig->getLdapSecurePort(), m_ldapconfig->getCipherSuite()); - rc = LdapUtils::LdapBind(user_ld, m_ldapconfig->getLdapTimeout(), m_ldapconfig->getDomain(), username, password, userdnbuf.str(), m_ldapconfig->getServerType(), m_ldapconfig->getAuthMethod()); - if(rc != LDAP_SUCCESS) - ldap_get_option(user_ld, LDAP_OPT_ERROR_STRING, &ldap_errstring); - LDAP_UNBIND(user_ld); - } - DBGLOG("finished LdapBind for user %s, rc=%d", username, rc); + m_ldapconfig->getLdapHost(hostbuf);//get next available AD, as it may have changed + DBGLOG("LdapBind for user %s (retries=%d) on host %s.", username, retries, hostbuf.str()); + { + LDAP* user_ld = LdapUtils::LdapInit(m_ldapconfig->getProtocol(), hostbuf.str(), m_ldapconfig->getLdapPort(), m_ldapconfig->getLdapSecurePort(), m_ldapconfig->getCipherSuite()); + rc = LdapUtils::LdapBind(user_ld, m_ldapconfig->getLdapTimeout(), m_ldapconfig->getDomain(), username, password, userdnbuf.str(), m_ldapconfig->getServerType(), m_ldapconfig->getAuthMethod()); + if(rc != LDAP_SUCCESS) + ldap_get_option(user_ld, LDAP_OPT_ERROR_STRING, &ldap_errstring); + LDAP_UNBIND(user_ld); + } + DBGLOG("finished LdapBind for user %s, rc=%d", username, rc); - if(rc==LDAP_SERVER_DOWN || rc==LDAP_UNAVAILABLE) - { - m_ldapconfig->rejectHost(hostbuf); - continue;//try again with next configured LDAP host + if(rc==LDAP_TIMEOUT && retries < LDAPSEC_MAX_RETRIES) + { + sleep(LDAPSEC_RETRY_WAIT); + DBGLOG("Server %s temporarily unreachable, retrying ...", hostbuf.str()); + } + else + break; } - else if(rc==LDAP_TIMEOUT && retries < LDAPSEC_MAX_RETRIES) + + if(LdapServerDown(rc)) { - sleep(LDAPSEC_RETRY_WAIT); - DBGLOG("Server %s temporarily unreachable, retrying ...", hostbuf.str()); + m_ldapconfig->rejectHost(hostbuf); // move to next host } else break; diff --git a/system/security/securesocket/securesocket.cpp b/system/security/securesocket/securesocket.cpp index c9b2f675855..56ac25da05c 100644 --- a/system/security/securesocket/securesocket.cpp +++ b/system/security/securesocket/securesocket.cpp @@ -774,6 +774,7 @@ void CSecureSocket::handleError(int ssl_err, bool writing, bool wait, unsigned t { case SSL_ERROR_ZERO_RETURN: { + m_socket->shutdownNoThrow(); THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); } case SSL_ERROR_WANT_READ: // NB: SSL_write can provoke SSL_ERROR_WANT_READ @@ -928,6 +929,7 @@ void CSecureSocket::readtms(void* buf, size32_t min_size, size32_t max_size, siz } else if (0 == rc) { + m_socket->shutdownNoThrow(); if (suppresGCIfMinSize && (sizeRead >= min_size)) break; THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); diff --git a/testing/helm/tests/furthercoverage.yaml b/testing/helm/tests/furthercoverage.yaml new file mode 100644 index 00000000000..7396612ba53 --- /dev/null +++ b/testing/helm/tests/furthercoverage.yaml @@ -0,0 +1,31 @@ +# This values file is here to exrcise all of the different paths in the helm chart. +# It is a copy of values.yaml, all changes are marked with #CHANGE. +# +# - roxie: Enabling server replicas in roxie +# - thor: eclAgentUseChildProcesses=false + +roxie: +- name: roxie + disabled: false + prefix: roxie + services: + - name: roxie + servicePort: 9876 + listenQueue: 200 + numThreads: 30 + visibility: local + replicas: 2 + numChannels: 2 + singleNode: false + traceLevel: 1 + serverReplicas: 4 #CHANGE + topoServer: + replicas: 1 + +thor: +- name: thor + prefix: thor + numWorkers: 2 + maxJobs: 4 + maxGraphs: 2 + eclAgentUseChildProcesses: false #CHANGE diff --git a/testing/regress/ecl/key/regex_replace_fixed.xml b/testing/regress/ecl/key/regex_replace_fixed.xml new file mode 100644 index 00000000000..6cbe7600df0 --- /dev/null +++ b/testing/regress/ecl/key/regex_replace_fixed.xml @@ -0,0 +1,12 @@ + + Dani DanDani Dan + + + Daniel DanDaniel Dan + + + + + + DDaanniieeDDaDDaanniieeDDa + diff --git a/testing/regress/ecl/regex_replace_fixed.ecl b/testing/regress/ecl/regex_replace_fixed.ecl new file mode 100644 index 00000000000..49a7f1c53bb --- /dev/null +++ b/testing/regress/ecl/regex_replace_fixed.ecl @@ -0,0 +1,108 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +#OPTION('globalFold', FALSE); + +//------------------------------------------ + +inDS := DATASET([{'Daniel', 'Daniel'}], {STRING s1, STRING s2}); + +// UTF-8 not included because the concept of a fixed-length +// UTF-8 string does not make sense + +// buffer_x: replacement occurs entirely within target buffer +// alloc_x: replacement requires extra temp buffer + +ResLayout := RECORD + STRING10 buffer_s; + STRING3 alloc_s; + UNICODE10 buffer_u; + UNICODE3 alloc_u; +END; + +//------------------------------------------ + +STRING del_few_chars_ps := '[le]' : STORED('del_few_chars_ps'); +UNICODE del_few_chars_pu := u'[le]' : STORED('del_few_chars_pu'); + +remove_some_chars := PROJECT + ( + NOFOLD(inDS), + TRANSFORM + ( + ResLayout, + SELF.buffer_s := NOFOLD(REGEXREPLACE(del_few_chars_ps, (STRING)LEFT.s1, '')), + SELF.alloc_s := NOFOLD(REGEXREPLACE(del_few_chars_ps, (STRING)LEFT.s2, '')), + SELF.buffer_u := NOFOLD(REGEXREPLACE(del_few_chars_pu, (UNICODE)LEFT.s1, u'')), + SELF.alloc_u := NOFOLD(REGEXREPLACE(del_few_chars_pu, (UNICODE)LEFT.s2, u'')) + ) + ); +OUTPUT(remove_some_chars, NAMED('remove_some_chars')); + +//------------------------------------------ + +STRING del_no_chars_ps := '[[:punct:]]' : STORED('del_no_chars_ps'); +UNICODE del_no_chars_pu := u'[[:punct:]]' : STORED('del_no_chars_pu'); + +remove_zero_chars := PROJECT + ( + NOFOLD(inDS), + TRANSFORM + ( + ResLayout, + SELF.buffer_s := NOFOLD(REGEXREPLACE(del_no_chars_ps, (STRING)LEFT.s1, '')), + SELF.alloc_s := NOFOLD(REGEXREPLACE(del_no_chars_ps, (STRING)LEFT.s2, '')), + SELF.buffer_u := NOFOLD(REGEXREPLACE(del_no_chars_pu, (UNICODE)LEFT.s1, u'')), + SELF.alloc_u := NOFOLD(REGEXREPLACE(del_no_chars_pu, (UNICODE)LEFT.s2, u'')) + ) + ); +OUTPUT(remove_zero_chars, NAMED('remove_zero_chars')); + +//------------------------------------------ + +STRING del_all_chars_ps := '\\w' : STORED('del_all_chars_ps'); +UNICODE del_all_chars_pu := u'\\w' : STORED('del_all_chars_pu'); + +remove_all_chars := PROJECT + ( + NOFOLD(inDS), + TRANSFORM + ( + ResLayout, + SELF.buffer_s := NOFOLD(REGEXREPLACE(del_all_chars_ps, (STRING)LEFT.s1, '')), + SELF.alloc_s := NOFOLD(REGEXREPLACE(del_all_chars_ps, (STRING)LEFT.s2, '')), + SELF.buffer_u := NOFOLD(REGEXREPLACE(del_all_chars_pu, (UNICODE)LEFT.s1, u'')), + SELF.alloc_u := NOFOLD(REGEXREPLACE(del_all_chars_pu, (UNICODE)LEFT.s2, u'')) + ) + ); +OUTPUT(remove_all_chars, NAMED('remove_all_chars')); + +//------------------------------------------ + +STRING double_all_chars_ps := '(\\w)' : STORED('double_all_chars_ps'); +UNICODE double_all_chars_pu := u'(\\w)' : STORED('double_all_chars_pu'); + +double_all_chars := PROJECT + ( + NOFOLD(inDS), + TRANSFORM + ( + ResLayout, + SELF.buffer_s := NOFOLD(REGEXREPLACE(double_all_chars_ps, (STRING)LEFT.s1, '$1$1')), + SELF.alloc_s := NOFOLD(REGEXREPLACE(double_all_chars_ps, (STRING)LEFT.s2, '$1$1')), + SELF.buffer_u := NOFOLD(REGEXREPLACE(double_all_chars_pu, (UNICODE)LEFT.s1, u'$1$1')), + SELF.alloc_u := NOFOLD(REGEXREPLACE(double_all_chars_pu, (UNICODE)LEFT.s2, u'$1$1')) + ) + ); +OUTPUT(double_all_chars, NAMED('double_all_chars')); diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 3a0da40d7f3..0441534af72 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -15,6 +15,7 @@ limitations under the License. ############################################################################## */ +#include #include "thactivityutil.ipp" #include "thcompressutil.hpp" #include "thexception.hpp" @@ -155,6 +156,8 @@ class CBroadcaster : public CSimpleInterface CThreadedPersistent threaded; SimpleInterThreadQueueOf broadcastQueue; Owned exception; + unsigned myNode; + unsigned nodes; bool aborted; void clearQueue() { @@ -169,6 +172,12 @@ class CBroadcaster : public CSimpleInterface CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster) { aborted = false; + myNode = broadcaster.activity.queryJob().queryMyNodeRank()-1; // 0 based + nodes = broadcaster.activity.queryJob().queryNodes(); + + // in theory each worker could be sending log(n) packets, with the broadcaster on each blocking waiting for acks + unsigned limit = nodes * std::ceil(std::log2(nodes)); + broadcastQueue.setLimit(limit); } ~CSend() { @@ -182,7 +191,22 @@ class CBroadcaster : public CSimpleInterface sendItem->Release(); throw exception.getClear(); } - broadcastQueue.enqueue(sendItem); // will block if queue full + // check if anywhere else to send to + if (sendItem) + { + unsigned origin = sendItem->queryNode(); + unsigned pseudoNode = (myNode=nodes) + { + sendItem->Release(); + return; + } + } + while (!broadcastQueue.enqueue(sendItem, 5000)) // will block if queue full + { + DBGLOG("CSend::addBlock() - broadcastQueue full, waiting for space"); + } } void start() { @@ -272,6 +296,8 @@ class CBroadcaster : public CSimpleInterface mptag_t rt = ::createReplyTag(); unsigned origin = sendItem->queryNode(); unsigned pseudoNode = (myNodequerySlave()+1, sendLen, (unsigned)sendItem->queryCode()); #endif @@ -330,6 +360,11 @@ class CBroadcaster : public CSimpleInterface } return false; } + // recieve loop, receives CSendItem packets, adds them to broadcaster thread ('sender'), and processes the packet via 'bCastReceive'. + // bcast_sendStopping are regular row packets that inform us that the sender is stopping (something upstream has asked it to stop()) + // - If all workers have signalled stopping, 'allRequestStop' will be set and will curtail the broadcast of more packets. + // - Or, if the broadcaster has explicitly been stopped (occurs via failover to local lookup), this will also curtail the broadcast of more packets. + // bcast_stop contains no row data, it signals that the sender has finished sending data. void recvLoop() { // my sender is implicitly stopped (never sends to self) @@ -349,16 +384,20 @@ class CBroadcaster : public CSimpleInterface break; } mptag_t replyTag = msg.getReplyTag(); - CMessageBuffer ackMsg; Owned sendItem = new CSendItem(msg); #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode()); #endif + CMessageBuffer ackMsg; + bool stopping = isStopping(); // this is effectively a shortcut to inform sender asap. bcast_sendStopping/bcast_stop will be queued soon + ackMsg.append(stopping); comm.send(ackMsg, sendRank, replyTag); // send ack #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d, sent ack to node %d, replyTag=%d", myNode+1, (unsigned)sendRank, (unsigned)replyTag); #endif - sender.addBlock(sendItem.getLink()); + // if all stopping, then suppress broadcasting (except stop packets) + if (!allRequestStop || (bcast_stop == sendItem->queryCode())) + sender.addBlock(sendItem.getLink()); assertex(myNode != sendItem->queryNode()); switch (sendItem->queryCode()) { diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 6c667ebd8e9..3159d58ad48 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -43,17 +43,17 @@ #endif /// Thor options, that can be hints, workunit options, or global settings -#define THOROPT_COMPRESS_SPILLS "compressInternalSpills" // Compress internal spills, e.g. spills created by lookahead or sort gathering (default = true) -#define THOROPT_COMPRESS_SPILL_TYPE "spillCompressorType" // Compress spill type, e.g. FLZ, LZ4 (or other to get previous) (default = LZ4) -#define THOROPT_HDIST_SPILL "hdistSpill" // Allow distribute receiver to spill to disk, rather than blocking (default = true) -#define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize" // Distribute send thread pool size (default = 16) -#define THOROPT_HDIST_BUCKET_SIZE "hdOutBufferSize" // Distribute target bucket send size (default = 1MB) -#define THOROPT_HDIST_BUFFER_SIZE "hdInBufferSize" // Distribute send buffer size (for all targets) (default = 32MB) -#define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize" // Distribute pull buffer size (receiver side limit, before spilling) -#define THOROPT_HDIST_CANDIDATELIMIT "hdCandidateLimit" // Limits # of buckets to push to the writers when send buffer is full (default = is 50% largest) -#define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit" // Limit # of writer threads working on a single target (default = unbound, but picks round-robin) -#define THOROPT_HDIST_COMP "hdCompressorType" // Distribute compressor to use (default = "LZ4") -#define THOROPT_HDIST_COMPOPTIONS "hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "") +#define THOROPT_COMPRESS_SPILLS "v9_4_compressInternalSpills" // Compress internal spills, e.g. spills created by lookahead or sort gathering (default = true) +#define THOROPT_COMPRESS_SPILL_TYPE "v9_4_spillCompressorType" // Compress spill type, e.g. FLZ, LZ4 (or other to get previous) (default = LZ4) +#define THOROPT_HDIST_SPILL "hdistSpill" // Allow distribute receiver to spill to disk, rather than blocking (default = true) +#define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize" // Distribute send thread pool size (default = 16) +#define THOROPT_HDIST_BUCKET_SIZE "hdOutBufferSize" // Distribute target bucket send size (default = 1MB) +#define THOROPT_HDIST_BUFFER_SIZE "hdInBufferSize" // Distribute send buffer size (for all targets) (default = 32MB) +#define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize" // Distribute pull buffer size (receiver side limit, before spilling) +#define THOROPT_HDIST_CANDIDATELIMIT "hdCandidateLimit" // Limits # of buckets to push to the writers when send buffer is full (default = is 50% largest) +#define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit" // Limit # of writer threads working on a single target (default = unbound, but picks round-robin) +#define THOROPT_HDIST_COMP "v9_4_hdCompressorType" // Distribute compressor to use (default = "LZ4") +#define THOROPT_HDIST_COMPOPTIONS "v9_4_hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "") #define THOROPT_SPLITTER_SPILL "splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1) #define THOROPT_SPLITTER_MAXROWMEMK "splitterRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) #define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K)