Skip to content

Commit

Permalink
Merge pull request #18232 from shamser/issue31143
Browse files Browse the repository at this point in the history
HPCC-31143 Extend ICodeContext with getWorkflowId

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Feb 6, 2024
2 parents 297807f + fef8247 commit 5f9b805
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 312 deletions.
295 changes: 6 additions & 289 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "rtldynfield.hpp"
#include "thorhelper.hpp"
#include "thorxmlwrite.hpp"
#include "wfcontext.hpp"

static unsigned const defaultDaliResultLimit = 10; // MB
static unsigned const defaultMaxCsvRowSize = 10; // MB
Expand Down Expand Up @@ -374,305 +375,21 @@ struct BlockedActivityTimer
};
#endif

class THORHELPER_API IndirectCodeContext : implements ICodeContext
class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
{
public:
IndirectCodeContext(ICodeContext * _ctx = NULL) : ctx(_ctx) {}
IndirectCodeContextEx(ICodeContext * _ctx = NULL) : IndirectCodeContext(_ctx) {}

void set(ICodeContext * _ctx) { ctx = _ctx; }

virtual const char *loadResource(unsigned id)
{
return ctx->loadResource(id);
}
virtual void setResultBool(const char *name, unsigned sequence, bool value)
{
ctx->setResultBool(name, sequence, value);
}
virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
{
ctx->setResultData(name, sequence, len, data);
}
virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val)
{
ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val);
}
virtual void setResultInt(const char *name, unsigned sequence, __int64 value, unsigned size)
{
ctx->setResultInt(name, sequence, value, size);
}
virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
{
ctx->setResultRaw(name, sequence, len, data);
}
virtual void setResultReal(const char * stepname, unsigned sequence, double value)
{
ctx->setResultReal(stepname, sequence, value);
}
virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
{
ctx->setResultSet(name, sequence, isAll, len, data, transformer);
}
virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
{
ctx->setResultString(name, sequence, len, str);
}
virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value, unsigned size)
{
ctx->setResultUInt(name, sequence, value, size);
}
virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
{
ctx->setResultUnicode(name, sequence, len, str);
}
virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
{
ctx->setResultVarString(name, sequence, value);
}
virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
{
ctx->setResultVarUnicode(name, sequence, value);
}
virtual bool getResultBool(const char * name, unsigned sequence)
{
return ctx->getResultBool(name, sequence);
}
virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
{
ctx->getResultData(tlen, tgt, name, sequence);
}
virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
{
ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
}
virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
{
ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
}
virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
{
ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
}
virtual __int64 getResultInt(const char * name, unsigned sequence)
{
return ctx->getResultInt(name, sequence);
}
virtual double getResultReal(const char * name, unsigned sequence)
{
return ctx->getResultReal(name, sequence);
}
virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
{
ctx->getResultString(tlen, tgt, name, sequence);
}
virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
{
ctx->getResultStringF(tlen, tgt, name, sequence);
}
virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
{
ctx->getResultUnicode(tlen, tgt, name, sequence);
}
virtual char *getResultVarString(const char * name, unsigned sequence)
{
return ctx->getResultVarString(name, sequence);
}
virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
{
return ctx->getResultVarUnicode(name, sequence);
}
virtual unsigned getResultHash(const char * name, unsigned sequence)
{
return ctx->getResultHash(name, sequence);
}
virtual unsigned getExternalResultHash(const char * wuid, const char * name, unsigned sequence)
{
return ctx->getExternalResultHash(wuid, name, sequence);
}
virtual char *getWuid()
{
return ctx->getWuid();
}
virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
{
ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer);
}
virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
{
ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract);
}
virtual char * getExpandLogicalName(const char * logicalName)
{
return ctx->getExpandLogicalName(logicalName);
}
virtual void addWuException(const char * text, unsigned code, unsigned severity, const char *source)
{
ctx->addWuException(text, code, severity, source);
}
virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
{
ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort);
}
virtual IUserDescriptor *queryUserDescriptor()
{
return ctx->queryUserDescriptor();
}
virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
{
return ctx->resolveChildQuery(activityId, colocal);
}
virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
{
return ctx->getDatasetHash(name, hash);
}
virtual unsigned getNodes()
{
return ctx->getNodes();
}
virtual unsigned getNodeNum()
{
return ctx->getNodeNum();
}
virtual char *getFilePart(const char *logicalPart, bool create)
{
return ctx->getFilePart(logicalPart, create);
}
virtual unsigned __int64 getFileOffset(const char *logicalPart)
{
return ctx->getFileOffset(logicalPart);
}
virtual IDistributedFileTransaction *querySuperFileTransaction()
{
return ctx->querySuperFileTransaction();
}
virtual char *getEnv(const char *name, const char *defaultValue) const
{
return ctx->getEnv(name, defaultValue);
}
virtual char *getJobName()
{
return ctx->getJobName();
}
virtual char *getJobOwner()
{
return ctx->getJobOwner();
}
virtual char *getClusterName()
{
return ctx->getClusterName();
}
virtual char *getGroupName()
{
return ctx->getGroupName();
}
virtual char * queryIndexMetaData(char const * lfn, char const * xpath)
{
return ctx->queryIndexMetaData(lfn, xpath);
}
virtual unsigned getPriority() const
{
return ctx->getPriority();
}
virtual char *getPlatform()
{
return ctx->getPlatform();
}
virtual char *getOS()
{
return ctx->getOS();
}
virtual IEclGraphResults * resolveLocalQuery(__int64 activityId)
{
return ctx->resolveLocalQuery(activityId);
}
virtual char *getEnv(const char *name, const char *defaultValue)
{
return ctx->getEnv(name, defaultValue);
}
virtual unsigned logString(const char *text) const
{
return ctx->logString(text);
}
virtual const IContextLogger &queryContextLogger() const
{
return ctx->queryContextLogger();
}
virtual IDebuggableContext *queryDebugContext() const
{
return ctx->queryDebugContext();
}
virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
{
return ctx->getRowAllocator(meta, activityId);
}
virtual IEngineRowAllocator * getRowAllocatorEx(IOutputMetaData * meta, unsigned activityId, unsigned heapFlags) const
{
return ctx->getRowAllocatorEx(meta, activityId, heapFlags);
}
virtual const char *cloneVString(const char *str) const
{
return ctx->cloneVString(str);
}
virtual const char *cloneVString(size32_t len, const char *str) const
{
return ctx->cloneVString(len, str);
}
virtual void getResultRowset(size32_t & tcount, const byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) override
{
ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
}
virtual void getResultDictionary(size32_t & tcount, const byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher) override
{
ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
}
virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) override
{
convertRowToXML(lenResult, result, info, row, flags);
}
virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
virtual void getRowJSON(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags) override
{
convertRowToJSON(lenResult, result, info, row, flags);
}
virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
{
return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
}
virtual const void * fromJson(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
{
return ctx->fromJson(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
}
virtual IEngineContext *queryEngineContext()
{
return ctx->queryEngineContext();
}
virtual char *getDaliServers()
{
return ctx->getDaliServers();
}
virtual IWorkUnit *updateWorkUnit() const
{
return ctx->updateWorkUnit();
}
virtual ISectionTimer * registerTimer(unsigned activityId, const char * name)
{
return ctx->registerTimer(activityId, name);
}
virtual unsigned getGraphLoopCounter() const override
{
return ctx->getGraphLoopCounter();
}
virtual void addWuExceptionEx(const char * text, unsigned code, unsigned severity, unsigned audience, const char *source) override
{
ctx->addWuExceptionEx(text, code, severity, audience, source);
}
virtual unsigned getElapsedMs() const override
{
return ctx->getElapsedMs();
}

protected:
ICodeContext * ctx;
};

class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
protected:
Expand Down
4 changes: 3 additions & 1 deletion common/workunit/workflow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "jregexp.hpp"
#include "workflow.hpp"
#include "dasds.hpp"
#include "wfcontext.hpp"

//------------------------------------------------------------------------------------------
/* Parallel Workflow explanation
Expand Down Expand Up @@ -2173,8 +2174,9 @@ void WorkflowMachine::performItem(unsigned wfid, unsigned scheduledWfid)
currentWfid = wfid;
currentScheduledWfid = scheduledWfid;
timestamp_type startTime = getTimeStampNowValue();
GlobalCodeContextExtra gctx(ctx, wfid);
CCycleTimer timer;
process->perform(ctx, wfid);
process->perform(&gctx, wfid);
noteTiming(wfid, startTime, timer.elapsedNs());
scheduledWfid = wfidStack.popGet();
currentWfid = wfidStack.popGet();
Expand Down
8 changes: 6 additions & 2 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "hpccconfig.hpp"

#include "ws_dfsclient.hpp"
#include "wfcontext.hpp"

using roxiemem::OwnedRoxieString;

Expand Down Expand Up @@ -2234,7 +2235,10 @@ void EclAgent::runProcess(IEclProcess *process)
workflow->perform(this, process);
}
else
process->perform(this, 0);
{
GlobalCodeContextExtra gctx(this, 0);
process->perform(&gctx, 0);
}

ForEachItemIn(i, queryLibraries)
queryLibraries.item(i).updateProgress();
Expand Down Expand Up @@ -2267,7 +2271,7 @@ void EclAgent::runProcess(IEclProcess *process)
LOG(MCrunlock, unknownJob, "Released persist read locks");
}

unsigned EclAgent::getWorkflowId()
unsigned EclAgent::getWorkflowIdDeprecated()
{
throwUnexpected();
}
Expand Down
Loading

0 comments on commit 5f9b805

Please sign in to comment.