Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
GordonSmith committed Nov 9, 2023
2 parents 2014cbb + 44c7b57 commit ec7635e
Show file tree
Hide file tree
Showing 85 changed files with 1,434 additions and 614 deletions.
178 changes: 122 additions & 56 deletions common/remote/rmtsmtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,14 +476,15 @@ class CMailInfo
char inbuff[200];
unsigned inlen;
bool highPriority;
bool termJobOnFail;

static char const * toHeader;
static char const * ccHeader;
static char const * subjectHeader;
static char const * senderHeader;
public:
CMailInfo(char const * _to, char const * _cc, char const * _bcc, char const * _subject, char const * _mailServer, unsigned _port, char const * _sender, StringArray *_warnings, bool _highPriority)
: subject(_subject), mailServer(_mailServer), port(_port), sender(_sender), lastAction("process initialization"), inlen(0), highPriority(_highPriority)
CMailInfo(char const * _to, char const * _cc, char const * _bcc, char const * _subject, char const * _mailServer, unsigned _port, char const * _sender, StringArray *_warnings, bool _highPriority, bool _termJobOnFail)
: subject(_subject), mailServer(_mailServer), port(_port), sender(_sender), lastAction("process initialization"), inlen(0), highPriority(_highPriority), termJobOnFail(_termJobOnFail)
{
warnings = _warnings;
CSMTPValidator validator;
Expand Down Expand Up @@ -528,6 +529,7 @@ class CMailInfo
lastAction.clear().append(action);
else
lastAction.clear().append(len, out).clip();

try
{
socket->write(out, len);
Expand All @@ -545,30 +547,34 @@ class CMailInfo
}
}

void read()
bool read(int numRetriesRemaining)
{
try
{
socket->read(inbuff,1,sizeof(inbuff),inlen);
socket->readtms(inbuff, 1, sizeof(inbuff), inlen, 30000);

//MORE: the following is somewhat primitive and not RFC compliant (see bug 25951) - but it is a lot better than nothing
if((*inbuff == '4') || (*inbuff == '5'))
if ( (*inbuff == '5') || ((*inbuff == '4') && (!numRetriesRemaining)) )
{
StringBuffer b;
b.append("Negative reply from mail server at ").append(mailServer.get()).append(":").append(port).append(" after writing ").append(lastAction.str()).append(" in SendEmail*: ").append(inlen, inbuff).clip();
WARNLOG("%s", b.str());
if (warnings)
warnings->append(b.str());
b.append("negative reply after writing ").append(inlen, inbuff).clip();
// don't continue on after these responses ...
throw makeStringException(MSGAUD_operator, 0, b.str());
}
#ifdef SMTP_TRACE
else
{
#ifdef SMTP_TRACE
StringBuffer b(inlen, inbuff);
b.clip();
DBGLOG("SMTP read: [%s]", b.str());
}
#endif
if (*inbuff == '4')
return false;
else
return true;
}
}
catch(IException * e)
catch (IException * e)
{
int code = e->errorCode();
StringBuffer buff;
Expand All @@ -578,6 +584,34 @@ class CMailInfo
}
}

void readRemainingData()
{
// read any remaining bytes ...
try
{
socket->readtms(inbuff, 0, sizeof(inbuff), inlen, 0);
}
catch(IException * e)
{
e->Release();
}
}

void writeAndAck(char const * out, size32_t len, char const * action = NULL)
{
bool ok;
int retries = 2;
while (retries >= 0)
{
readRemainingData();
write(out, len, action);
ok = read(retries);
if (ok)
break;
retries--;
}
}

void getHeader(StringBuffer & header) const
{
header.append(senderHeader).append(sender.get()).append("\r\n");
Expand Down Expand Up @@ -615,6 +649,17 @@ class CMailInfo
out.append("RCPT TO:<").append(rcpt).append(">\r\n");
}

bool getTermJobOnFail() const
{
return termJobOnFail;
}

void addToWarnings(const char *warnStr) const
{
if (warnings && (warnStr && *warnStr))
warnings->append(warnStr);
}

private:
void getRecipients(CSMTPValidator & validator, char const * _to, StringBuffer &destBuffer)
{
Expand Down Expand Up @@ -773,77 +818,98 @@ static const char *quit="QUIT\r\n";

static void doSendEmail(CMailInfo & info, CMailPart const & part)
{
info.open();
StringBuffer outbuff;

info.read();
info.getHelo(outbuff);
info.write(outbuff.str(), outbuff.length());
info.read();

info.getMailFrom(outbuff.clear());
info.write(outbuff.str(), outbuff.length());
info.read();

unsigned numRcpt = info.numRecipients();
for(unsigned i=0; i<numRcpt; ++i)
{
info.getRecipient(i, outbuff.clear());
info.write(outbuff.str(), outbuff.length());
info.read();
}

info.write(data, strlen(data));
info.read();
info.getHeader(outbuff.clear());
part.getHeader(outbuff);
outbuff.append("\r\n");
info.write(outbuff.str(), outbuff.length(), "mail header");
part.write(info);
info.write(endMail, strlen(endMail), "end of mail body");
info.read();

info.write(quit, strlen(quit));
info.read();
try
{
info.open();
StringBuffer outbuff;

info.read(0);
info.getHelo(outbuff);

info.writeAndAck(outbuff.str(), outbuff.length());

info.getMailFrom(outbuff.clear());

info.writeAndAck(outbuff.str(), outbuff.length());

unsigned numRcpt = info.numRecipients();
for(unsigned i=0; i<numRcpt; ++i)
{
info.getRecipient(i, outbuff.clear());
info.writeAndAck(outbuff.str(), outbuff.length());
}

info.writeAndAck(data, strlen(data));

info.getHeader(outbuff.clear());
part.getHeader(outbuff);
outbuff.append("\r\n");

bool ok;
int retries = 2;
while (retries >= 0)
{
info.readRemainingData();
info.write(outbuff.str(), outbuff.length(), "mail header");
part.write(info);
info.write(endMail, strlen(endMail), "end of mail body");
ok = info.read(retries);
if (ok)
break;
retries--;
}

info.writeAndAck(quit, strlen(quit));
}
catch(IException * e)
{
if (info.getTermJobOnFail())
throw e;

StringBuffer msg;
info.addToWarnings(e->errorMessage(msg).str());
EXCLOG(MCoperatorError, e, "WARNING");
e->Release();
}
}

void sendEmail(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmail(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority);
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority, termJobOnFail);
CTextMailPart bodyPart(body, "text/plain; charset=ISO-8859-1", NULL);
doSendEmail(info, bodyPart);
}

void sendEmail(const char * to, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmail(const char * to, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
sendEmail(to, nullptr, nullptr, subject, body, mailServer, port, sender, warnings, highPriority);
sendEmail(to, nullptr, nullptr, subject, body, mailServer, port, sender, warnings, highPriority, termJobOnFail);
}

void sendEmailAttachText(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmailAttachText(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority);
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority, termJobOnFail);
CTextMailPart inlinedPart(body, "text/plain; charset=ISO-8859-1", NULL);
CTextMailPart attachmentPart(attachment, mimeType, attachmentName);
CMultiMailPart multiPart(inlinedPart, attachmentPart);
doSendEmail(info, multiPart);
}

void sendEmailAttachText(const char * to, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmailAttachText(const char * to, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
sendEmailAttachText(to, nullptr, nullptr, subject, body, attachment, mimeType, attachmentName, mailServer, port, sender, warnings, highPriority);
sendEmailAttachText(to, nullptr, nullptr, subject, body, attachment, mimeType, attachmentName, mailServer, port, sender, warnings, highPriority, termJobOnFail);
}

void sendEmailAttachData(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmailAttachData(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority);
CMailInfo info(to, cc, bcc, subject, mailServer, port, sender, warnings, highPriority, termJobOnFail);
CTextMailPart inlinedPart(body, "text/plain; charset=ISO-8859-1", NULL);
CDataMailPart attachmentPart(lenAttachment, attachment, mimeType, attachmentName);
CMultiMailPart multiPart(inlinedPart, attachmentPart);
doSendEmail(info, multiPart);
}

void sendEmailAttachData(const char * to, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority)
void sendEmailAttachData(const char * to, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings, bool highPriority, bool termJobOnFail)
{
sendEmailAttachData(to, nullptr, nullptr, subject, body, lenAttachment, attachment, mimeType, attachmentName, mailServer, port, sender, warnings, highPriority);
sendEmailAttachData(to, nullptr, nullptr, subject, body, lenAttachment, attachment, mimeType, attachmentName, mailServer, port, sender, warnings, highPriority, termJobOnFail);
}

12 changes: 6 additions & 6 deletions common/remote/rmtsmtp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
#define REMOTE_API DECL_IMPORT
#endif

extern REMOTE_API void sendEmail( const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmail( const char * to, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmail( const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);
extern REMOTE_API void sendEmail( const char * to, const char * subject, const char * body, const char * mailServer, unsigned port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);

extern REMOTE_API void sendEmailAttachText(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmailAttachText(const char * to, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmailAttachText(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);
extern REMOTE_API void sendEmailAttachText(const char * to, const char * subject, const char * body, const char * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);

extern REMOTE_API void sendEmailAttachData(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmailAttachData(const char * to, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false);
extern REMOTE_API void sendEmailAttachData(const char * to, const char * cc, const char * bcc, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);
extern REMOTE_API void sendEmailAttachData(const char * to, const char * subject, const char * body, size32_t lenAttachment, const void * attachment, const char * mimeType, const char * attachmentName, const char * mailServer, unsigned int port, const char * sender, StringArray *warnings=NULL, bool highPriority=false, bool termJobOnFail=true);


#endif
39 changes: 34 additions & 5 deletions common/wuanalysis/anarule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,49 @@ class IoSkewRule : public AActivityRule
{
stat_type ioAvg = activity.getStatRaw(stat, StAvgX);
stat_type ioMaxSkew = activity.getStatRaw(stat, StSkewMax);
unsigned actkind = activity.getAttr(WaKind);
if (ioMaxSkew > options.queryOption(watOptSkewThreshold))
{
stat_type timeMaxLocalExecute = activity.getStatRaw(StTimeLocalExecute, StMaxX);
stat_type timeAvgLocalExecute = activity.getStatRaw(StTimeLocalExecute, StAvgX);

stat_type cost;
//If one node didn't spill then it is possible the skew caused all the lost time
unsigned actkind = activity.getAttr(WaKind);
if ((actkind==TAKspillread||actkind==TAKspillwrite) && activity.getStatRaw(stat, StMinX) == 0)
if ((actkind==TAKspillread||actkind==TAKspillwrite) && (activity.getStatRaw(stat, StMinX) == 0))
{
//If one node didn't spill then it is possible the skew caused all the lost time
cost = timeMaxLocalExecute;
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Uneven worker spilling is causing uneven %s time", category);
}
else
{
bool sizeSkew = false;
bool numRowsSkew = false;
IWuEdge *wuEdge = nullptr;
if ((stat==StTimeDiskWriteIO) || (actkind==TAKspillwrite))
{
if (activity.getStatRaw(StSizeDiskWrite, StSkewMax)>options.queryOption(watOptSkewThreshold))
sizeSkew = true;
IWuEdge *wuEdge = activity.queryInput(0);
}
else if ((stat == StTimeDiskReadIO) || (actkind==TAKspillread))
{
if (activity.getStatRaw(StSizeDiskRead, StSkewMax)>options.queryOption(watOptSkewThreshold))
sizeSkew = true;
IWuEdge *wuEdge = activity.queryOutput(0);
}
if (wuEdge && wuEdge->getStatRaw(StNumRowsProcessed, StSkewMax)>options.queryOption(watOptSkewThreshold))
numRowsSkew = true;
cost = (timeMaxLocalExecute - timeAvgLocalExecute);

result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in records causes uneven %s time", category);
if (sizeSkew)
{
if (numRowsSkew)
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in number of records is causing uneven %s time", category);
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in record sizes is causing uneven %s time", category);
}
else
result.set(ANA_IOSKEW_RECORDS_ID, cost, "Significant skew in IO performance is causing uneven %s time", category);
}
updateInformation(result, activity);
return true;
}
Expand Down
Loading

0 comments on commit ec7635e

Please sign in to comment.