Skip to content

Commit

Permalink
feat(core): notify user by email after Transform Job finished or fail…
Browse files Browse the repository at this point in the history
…ed (#340)

* feat(core): notify user by email after Transform Job finished or failed

* fix(test): adjust test for new feat

* feat(core): show exception when failed

详见文档:
https://oxlh5mrwi0.feishu.cn/docx/WNmXdqQWNoG6ZFxqOmbcXZwpnYX?from=from_copylink
  • Loading branch information
aqni authored May 26, 2024
1 parent 641ec82 commit a26fd02
Show file tree
Hide file tree
Showing 11 changed files with 360 additions and 1 deletion.
19 changes: 19 additions & 0 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,22 @@ enableEnvParameter=false

# 读取csv文件时,每批数据的行数
batchSizeImportCsv=10000

#################
### configure email smtp to send notification
#################

enableEmailNotification=false

mailSmtpHost=smtp.qq.com

# make sure enable ssl on connection
mailSmtpPort=465

mailSmtpUser=[email protected]

mailSmtpPassword=12345678

mailSender=[email protected]

mailRecipient=[email protected]
10 changes: 10 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@
<artifactId>jaxb-api</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-email</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
Expand All @@ -183,6 +187,12 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.icegreen</groupId>
<artifactId>greenmail-junit4</artifactId>
<version>1.6.15</version>
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,22 @@ public class Config {

/////////////

private boolean enableEmailNotification = false;

private String mailSmtpHost = "";

private int mailSmtpPort = 465;

private String mailSmtpUser = "";

private String mailSmtpPassword = "";

private String mailSender = "";

private String mailRecipient = "";

/////////////

private int batchSizeImportCsv = 10000;

private boolean isUTTestEnv = false; // 是否是单元测试环境
Expand Down Expand Up @@ -896,4 +912,60 @@ public String getRuleBasedOptimizer() {
public void setRuleBasedOptimizer(String ruleBasedOptimizer) {
this.ruleBasedOptimizer = ruleBasedOptimizer;
}

public boolean isEnableEmailNotification() {
return enableEmailNotification;
}

public void setEnableEmailNotification(boolean enableEmailNotification) {
this.enableEmailNotification = enableEmailNotification;
}

public String getMailSmtpHost() {
return mailSmtpHost;
}

public void setMailSmtpHost(String mailSmtpHost) {
this.mailSmtpHost = mailSmtpHost;
}

public int getMailSmtpPort() {
return mailSmtpPort;
}

public void setMailSmtpPort(int mailSmtpPort) {
this.mailSmtpPort = mailSmtpPort;
}

public String getMailSmtpUser() {
return mailSmtpUser;
}

public void setMailSmtpUser(String mailSmtpUser) {
this.mailSmtpUser = mailSmtpUser;
}

public String getMailSmtpPassword() {
return mailSmtpPassword;
}

public void setMailSmtpPassword(String mailSmtpPassword) {
this.mailSmtpPassword = mailSmtpPassword;
}

public String getMailSender() {
return mailSender;
}

public void setMailSender(String mailSender) {
this.mailSender = mailSender;
}

public String getMailRecipient() {
return mailRecipient;
}

public void setMailRecipient(String mailRecipients) {
this.mailRecipient = mailRecipients;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ private void loadPropsFromFile() {
properties.getProperty(
"ruleBasedOptimizer",
"NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on"));
config.setEnableEmailNotification(
Boolean.parseBoolean(properties.getProperty("enableEmailNotification", "false")));
config.setMailSmtpHost(properties.getProperty("mailSmtpHost", ""));
config.setMailSmtpPort(Integer.parseInt(properties.getProperty("mailSmtpPort", "465")));
config.setMailSmtpUser(properties.getProperty("mailSmtpUser", ""));
config.setMailSmtpPassword(properties.getProperty("mailSmtpPassword", ""));
config.setMailSender(properties.getProperty("mailSender", ""));
config.setMailRecipient(properties.getProperty("mailRecipient", ""));
} catch (IOException e) {
config.setUTTestEnv(true);
config.setNeedInitBasicUDFFunctions(false);
Expand Down Expand Up @@ -360,6 +368,14 @@ private void loadPropsFromEnv() {
config.setUTTestEnv(EnvUtils.loadEnv("utTestEnv", config.isUTTestEnv()));
config.setRuleBasedOptimizer(
EnvUtils.loadEnv("ruleBasedOptimizer", config.getRuleBasedOptimizer()));
config.setEnableEmailNotification(
Boolean.parseBoolean(EnvUtils.loadEnv("enableEmailNotification", "false")));
config.setMailSmtpHost(EnvUtils.loadEnv("mailSmtpHost", ""));
config.setMailSmtpPort(Integer.parseInt(EnvUtils.loadEnv("mailSmtpPort", "465")));
config.setMailSmtpUser(EnvUtils.loadEnv("mailSmtpUser", ""));
config.setMailSmtpPassword(EnvUtils.loadEnv("mailSmtpPassword", ""));
config.setMailSender(EnvUtils.loadEnv("mailSender", ""));
config.setMailRecipient(EnvUtils.loadEnv("mailRecipient", ""));
}

private void loadUDFListFromFile() {
Expand Down
122 changes: 122 additions & 0 deletions core/src/main/java/cn/edu/tsinghua/iginx/notice/EmailNotifier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package cn.edu.tsinghua.iginx.notice;

import cn.edu.tsinghua.iginx.conf.Config;
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.thrift.JobState;
import cn.edu.tsinghua.iginx.transform.pojo.Job;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import org.apache.commons.mail.Email;
import org.apache.commons.mail.EmailException;
import org.apache.commons.mail.SimpleEmail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmailNotifier {
private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotifier.class);
private static final EmailNotifier INSTANCE = new EmailNotifier();

boolean mailEnable;
private final String mailHost;
private final int mailPort;
private final String mailUsername;
private final String mailPassword;
private final String mailSender;
private final String recipients;
private final String iginxHost;
private final int iginxPort;

public static EmailNotifier getInstance() {
return INSTANCE;
}

public EmailNotifier() {
this(ConfigDescriptor.getInstance().getConfig());
}

public EmailNotifier(Config config) {
this(
config.isEnableEmailNotification(),
config.getMailSmtpHost(),
config.getMailSmtpPort(),
config.getMailSmtpUser(),
config.getMailSmtpPassword(),
config.getMailSender(),
config.getMailRecipient(),
config.getIp(),
config.getPort());
}

public EmailNotifier(
boolean mailEnable,
String mailHost,
int mailPort,
String mailUsername,
String mailPassword,
String mailSender,
String mailRecipient,
String iginxHost,
int iginxPort) {
this.mailEnable = mailEnable;
this.mailHost = mailHost;
this.mailPort = mailPort;
this.mailUsername = mailUsername;
this.mailPassword = mailPassword;
this.mailSender = mailSender;
this.recipients = mailRecipient;
this.iginxHost = iginxHost;
this.iginxPort = iginxPort;
}

void sendEmail(String subject, String content) {
if (!mailEnable) {
LOGGER.debug("Email notification is disabled. Subject: {}, Content: {}", subject, content);
return;
}

try {
Email email = new SimpleEmail();
email.setHostName(mailHost);
email.setSslSmtpPort(String.valueOf(mailPort));
email.setSSLOnConnect(true);
email.setAuthentication(mailUsername, mailPassword);
email.setFrom(mailSender);
email.setSubject(subject);
email.setMsg(content);
email.addTo(recipients.split(","));
email.send();
LOGGER.info("Email notification sent. Subject: {}", subject);
} catch (EmailException e) {
LOGGER.error("Failed to send email notification. Subject: {}", subject, e);
}
}

public void send(Job job) {
JobState jobState = job.getState();
String jobStateStr = jobState.name().split("_")[1].toLowerCase();
String subject = String.format("Job %d is %s", job.getJobId(), jobStateStr);

StringBuilder content = new StringBuilder();
content.append("Job ID: ").append(job.getJobId()).append("\n");
content.append("Job State: ").append(jobStateStr).append("\n");
content.append("Job Start Time: ").append(new Date(job.getStartTime())).append("\n");
long endTime = job.getEndTime();
if (endTime != 0) {
content.append("Job End Time: ").append(new Date(endTime)).append("\n");
}
content.append("IGinX Host: ").append(iginxHost).append("\n");
content.append("IGinX Port: ").append(iginxPort).append("\n");

Exception e = job.getException();
if (e != null) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
content.append("Exception: ").append("\n");
content.append(sw);
}

sendEmail(subject, content.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public void run() {
// we don't need this.close() because all children runners are closed.
if (job.getActive().compareAndSet(true, false)) {
job.setState(JobState.JOB_FINISHED);
job.setException(null);
}
} catch (TransformException e) {
LOGGER.error("Fail to run transform job id={}, because", job.getJobId(), e);
if (job.getActive().compareAndSet(true, false)) {
job.setState(JobState.JOB_FAILING);
job.setException(e);
close();
job.setState(JobState.JOB_FAILED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cn.edu.tsinghua.iginx.conf.Config;
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.notice.EmailNotifier;
import cn.edu.tsinghua.iginx.thrift.CommitTransformJobReq;
import cn.edu.tsinghua.iginx.thrift.JobState;
import cn.edu.tsinghua.iginx.thrift.TaskType;
Expand Down Expand Up @@ -78,6 +79,7 @@ private void processWithRetry(Job job, int retryTimes) {
LOGGER.error("retry process, executed times: {}", (processCnt + 1));
}
}
EmailNotifier.getInstance().send(job);
}

private void process(Job job) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class Job {
private final AtomicBoolean active;
private long startTime;
private long endTime;
private Exception exception;

private boolean needExport;
private ExportType exportType;
Expand Down
Loading

0 comments on commit a26fd02

Please sign in to comment.