Skip to content

Commit

Permalink
Merge pull request #18346 from ghalliday/topActivities
Browse files Browse the repository at this point in the history
HPCC-31353 Report the slowest 5 activies in the roxie complete line 

Reviewed-by: Mark Kelly [email protected]
Reviewed-By: Richard Chapman <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored May 31, 2024
2 parents 06076ce + 47133eb commit c9ca733
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 75 deletions.
2 changes: 1 addition & 1 deletion common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ class CStatsContextLogger : public CSimpleInterfaceOf<IContextLogger>
{
stats.setStatistic(kind, value);
}
virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
virtual void mergeStats(unsigned activityId, const CRuntimeStatisticCollection &from) const override
{
stats.merge(from);
}
Expand Down
25 changes: 8 additions & 17 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ extern unsigned agentQueryReleaseDelaySeconds;
extern unsigned coresPerQuery;

extern unsigned cacheReportPeriodSeconds;
extern stat_type minimumInterestingActivityCycles;


extern StringBuffer logDirectory;
extern StringBuffer pluginDirectory;
Expand Down Expand Up @@ -592,6 +594,9 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface
bool blind;
mutable bool aborted;
mutable CIArrayOf<LogItem> log;
static constexpr const unsigned MaxSlowActivities = 5;
mutable unsigned slowestActivityIds[MaxSlowActivities] = {};
mutable stat_type slowestActivityTimes[MaxSlowActivities] = {};
private:
Owned<ISpan> activeSpan = getNullSpan();
ContextLogger(const ContextLogger &); // Disable copy constructor
Expand Down Expand Up @@ -693,11 +698,7 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface
ctxTraceLevel = _level;
}

StringBuffer &getStats(StringBuffer &s) const
{
CriticalBlock block(statsCrit);
return stats.toStr(s);
}
StringBuffer &getStats(StringBuffer &s) const;

virtual bool isIntercepted() const
{
Expand All @@ -721,18 +722,8 @@ class ContextLogger : implements IRoxieContextLogger, public CInterface
stats.setStatistic(kind, value);
}

virtual void mergeStats(const CRuntimeStatisticCollection &from) const
{
if (from.isThreadSafeMergeSource())
{
stats.merge(from);
}
else
{
CriticalBlock block(statsCrit);
stats.merge(from);
}
}
virtual void mergeStats(unsigned activityId, const CRuntimeStatisticCollection &from) const;

virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
{
merged.merge(stats);
Expand Down
8 changes: 4 additions & 4 deletions roxie/ccd/ccdcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1313,9 +1313,9 @@ class CRoxieContextBase : implements IRoxieAgentContext, implements ICodeContext
logctx.setStatistic(kind, value);
}

virtual void mergeStats(const CRuntimeStatisticCollection &from) const override
virtual void mergeStats(unsigned activityId, const CRuntimeStatisticCollection &from) const override
{
logctx.mergeStats(from);
logctx.mergeStats(activityId, from);
}

virtual void gatherStats(CRuntimeStatisticCollection & merged) const override
Expand Down Expand Up @@ -2379,7 +2379,7 @@ class CAgentContext : public CRoxieContextBase
{
// NOTE: This is needed to ensure that owned activities are destroyed BEFORE I am,
// to avoid pure virtual calls when they come to call noteProcessed()
logctx.mergeStats(globalStats);
logctx.mergeStats(0, globalStats);
if (factory)
factory->mergeStats(logctx);
childGraphs.releaseAll();
Expand Down Expand Up @@ -2611,7 +2611,7 @@ class CRoxieServerContext : public CRoxieContextBase, implements IRoxieServerCon

void doPostProcess()
{
logctx.mergeStats(globalStats);
logctx.mergeStats(0, globalStats);
logctx.setStatistic(StTimeTotalExecute, elapsedTimer.elapsedNs());
if (factory)
{
Expand Down
81 changes: 81 additions & 0 deletions roxie/ccd/ccdlistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,64 @@ extern void updateAffinity(unsigned __int64 affinity)

//--------------------------------------------------------------------------------------------------------------------

StringBuffer & ContextLogger::getStats(StringBuffer &s) const
{
CriticalBlock block(statsCrit);
stats.toStr(s);

if (slowestActivityIds[0])
{
StringBuffer ids;
StringBuffer times;
for (unsigned i=0; i < MaxSlowActivities; i++)
{
if (!slowestActivityIds[i])
break;

if (i)
{
ids.append(",");
times.append(",");
}
ids.append(slowestActivityIds[i]);
formatStatistic(times, cycle_to_nanosec(slowestActivityTimes[i]), SMeasureTimeNs);
}
s.appendf(", slowestActivities={ ids=[%s] times=[%s] }", ids.str(), times.str());
}
return s;
}


void ContextLogger::mergeStats(unsigned activityId, const CRuntimeStatisticCollection &from) const
{
CLeavableCriticalBlock block(statsCrit, !from.isThreadSafeMergeSource());

stats.merge(from);

//Record the times of the slowest N activities
if (activityId)
{
stat_type localTime = from.getStatisticValue(StCycleLocalExecuteCycles);
if (localTime >= minimumInterestingActivityCycles)
{
if (localTime > slowestActivityTimes[MaxSlowActivities-1])
{
unsigned pos = MaxSlowActivities-1;
while (pos > 0)
{
if (localTime <= slowestActivityTimes[pos-1])
break;
slowestActivityIds[pos] = slowestActivityIds[pos-1];
slowestActivityTimes[pos] = slowestActivityTimes[pos-1];
pos--;
}
slowestActivityIds[pos] = activityId;
slowestActivityTimes[pos] = localTime;
}
}
}
}

void ContextLogger::exportStatsToSpan(bool failed, stat_type elapsedNs, unsigned memused, unsigned agentsDuplicates, unsigned agentsResends)
{
if (activeSpan->isRecording())
Expand All @@ -957,6 +1015,29 @@ void ContextLogger::exportStatsToSpan(bool failed, stat_type elapsedNs, unsigned

StringBuffer prefix("");
stats.exportToSpan(activeSpan, prefix);

if (slowestActivityIds[0])
{
//Even better if these were exported as arrays - needs extensions to our api
//Not commoned up with the code above because it is likely to change to arrays in the future.
StringBuffer ids;
StringBuffer times;
for (unsigned i=0; i < MaxSlowActivities; i++)
{
if (!slowestActivityIds[i])
break;

if (i)
{
ids.append(",");
times.append(",");
}
ids.append(slowestActivityIds[i]);
times.append(cycle_to_nanosec(slowestActivityTimes[i]));
}
setSpanAttribute("slowest_activities.ids", ids);
setSpanAttribute("slowest_activities.times", times);
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
unsigned socketCheckInterval = 5000;

unsigned cacheReportPeriodSeconds = 5*60;
stat_type minimumInterestingActivityCycles;

StringBuffer logDirectory;
StringBuffer pluginDirectory;
Expand Down Expand Up @@ -1320,6 +1321,9 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
unsigned __int64 affinity = topology->getPropInt64("@affinity", 0);
updateAffinity(affinity);

unsigned __int64 minimumInterestingActivityMs = topology->getPropInt64("@minimumInterestingActivityMs", 10);
minimumInterestingActivityCycles = nanosec_to_cycle(minimumInterestingActivityMs * 1'000'000);

minFreeDiskSpace = topology->getPropInt64("@minFreeDiskSpace", (1024 * 0x100000)); // default to 1 GB
mtu_size = topology->getPropInt("@mtuPayload", 0);
if (mtu_size)
Expand Down
Loading

0 comments on commit c9ca733

Please sign in to comment.