Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Shamser Ahmed <[email protected]>
  • Loading branch information
shamser committed Jan 12, 2024
1 parent 3bca1fe commit f5f61b3
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 15 deletions.
4 changes: 4 additions & 0 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ class THORHELPER_API IndirectCodeContext : implements ICodeContext
{
return ctx->getWuid();
}
virtual unsigned getWorkflowId() const
{
return ctx->getWorkflowId();
}
virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
{
ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer);
Expand Down
6 changes: 1 addition & 5 deletions ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,7 @@ void EclAgent::doProcess()
}
{
MTIME_SECTION(queryActiveTimer(), "Process");
Owned<IEclProcess> process = loadProcess();
Owned<IEclProcess> process = new EclProcessExtra(loadProcess());
QueryTerminationCleanup threadCleanup(false);

if (checkVersion && (process->getActivityVersion() != eclccCodeVersion))
Expand Down Expand Up @@ -2267,10 +2267,6 @@ void EclAgent::runProcess(IEclProcess *process)
LOG(MCrunlock, unknownJob, "Released persist read locks");
}

unsigned EclAgent::getWorkflowId()
{
throwUnexpected();
}

//----------------------------------------------------------------

Expand Down
4 changes: 2 additions & 2 deletions ecl/eclagent/eclagent.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public:
{
return ctx->isResult(name, sequence);
}
virtual unsigned getWorkflowId()
virtual unsigned getWorkflowId() const override
{
return ctx->getWorkflowId();
}
Expand Down Expand Up @@ -631,7 +631,6 @@ public:
virtual const char *loadResource(unsigned id);
virtual ICodeContext *queryCodeContext();
virtual bool isResult(const char * name, unsigned sequence);
virtual unsigned getWorkflowId();
virtual IConstWorkUnit *queryWorkUnit() const override; // no link
virtual IWorkUnit *updateWorkUnit() const; // links
virtual void reloadWorkUnit();
Expand All @@ -645,6 +644,7 @@ public:
virtual unsigned __int64 getFileOffset(const char *logicalPart) { UNIMPLEMENTED; return 0; }
virtual char *getOutputDir() { UNIMPLEMENTED; }
virtual char *getWuid();
virtual unsigned getWorkflowId() const override { throwUnexpected(); }
virtual const char *queryWuid();
virtual IDistributedFileTransaction *querySuperFileTransaction();
virtual unsigned getPriority() const { return 0; }
Expand Down
2 changes: 1 addition & 1 deletion ecl/hql/hqlfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ class DummyContext: implements ICodeContext
virtual char *getPlatform() { throwUnexpected(); } // caller frees return string.
virtual unsigned getPriority() const { throwUnexpected(); }
virtual char *getWuid() { throwUnexpected(); } // caller frees return string.

virtual unsigned getWorkflowId() const { throwUnexpected(); }
// Exception handling

virtual void addWuException(const char*, unsigned int, unsigned int, const char*) { throwUnexpected(); } //n.b. this might be better named: it should only be used for adding user-generated exceptions (via the logging plug-in) --- there's a call in IAgentContext which takes a source argument too
Expand Down
2 changes: 1 addition & 1 deletion plugins/fileservices/fileservices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ static void blockUntilComplete(const char * label, IClientFileSpray &server, ICo
aborting = wu->aborting();
StringBuffer wuScope, ElapsedLabel, RemainingLabel;
StringBuffer labelbuf(label);
wuScope.appendf("dfu:%s:%s", labelbuf.toLowerCase().str(), dfuwu.getID());
wuScope.appendf("w%u:dfu:%s:%s", ctx->getWorkflowId(), labelbuf.toLowerCase().str(), dfuwu.getID());
ElapsedLabel.append(wuScope).append(" (Elapsed) ");
RemainingLabel.append(wuScope).append(" (Remaining) ");
//MORE: I think this are intended to replace the timing information, but will currently combine
Expand Down
1 change: 1 addition & 0 deletions roxie/ccd/ccdactivities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ class CRoxieAgentActivity : implements CInterfaceOf<IRoxieAgentActivity>, implem
// Not yet thought about these....

virtual char *getWuid() { throwUnexpected(); } // caller frees return string.
virtual unsigned getWorkflowId() const override { throwUnexpected(); }
virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); } // shouldn't really be here, but it broke thor.
virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract) { throwUnexpected(); }
virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash) { throwUnexpected(); return 0; }
Expand Down
10 changes: 7 additions & 3 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,6 +1796,7 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
virtual void printResults(IXmlWriter *output, const char *name, unsigned sequence) { throwUnexpected(); }

virtual char *getWuid() { throwUnexpected(); }
virtual unsigned getWorkflowId() const override { throwUnexpected(); }
virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer) { throwUnexpected(); }

virtual char * getExpandLogicalName(const char * logicalName) { throwUnexpected(); }
Expand Down Expand Up @@ -2876,7 +2877,7 @@ class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerCon
MTIME_SECTION(myTimer, "Process");
QueryTerminationCleanup threadCleanup(true);
EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
Owned<IEclProcess> p = pf();
Owned<IEclProcess> p = new EclProcessExtra(pf());
try
{
if (debugContext)
Expand Down Expand Up @@ -3692,7 +3693,10 @@ class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerCon
CriticalBlock b(contextCrit);
return useContext(sequence).hasProp(name);
}

virtual unsigned getWorkflowId() const override
{
throwUnexpected();
}
virtual char *getClusterName()
{
if (workUnit)
Expand Down Expand Up @@ -4029,7 +4033,7 @@ class CSoapRoxieServerContext : public CRoxieServerContext
virtual void process()
{
EclProcessFactory pf = (EclProcessFactory) factory->queryDll()->getEntry("createProcess");
Owned<IEclProcess> p = pf();
Owned<IEclProcess> p = new EclProcessExtra(pf());
if (workflow)
workflow->perform(this, p);
else
Expand Down
Loading

0 comments on commit f5f61b3

Please sign in to comment.