From a26fd02f590ad4a6b1c6a425085e84f9027b9443 Mon Sep 17 00:00:00 2001 From: An Qi Date: Sun, 26 May 2024 14:50:02 +0800 Subject: [PATCH] feat(core): notify user by email after Transform Job finished or failed (#340) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(core): notify user by email after Transform Job finished or failed * fix(test): adjust test for new feat * feat(core): show exception when failed 详见文档: https://oxlh5mrwi0.feishu.cn/docx/WNmXdqQWNoG6ZFxqOmbcXZwpnYX?from=from_copylink --- conf/config.properties | 19 +++ core/pom.xml | 10 ++ .../cn/edu/tsinghua/iginx/conf/Config.java | 72 +++++++++++ .../tsinghua/iginx/conf/ConfigDescriptor.java | 16 +++ .../tsinghua/iginx/notice/EmailNotifier.java | 122 ++++++++++++++++++ .../iginx/transform/exec/JobRunner.java | 2 + .../transform/exec/TransformJobManager.java | 2 + .../tsinghua/iginx/transform/pojo/Job.java | 1 + .../iginx/notice/EmailNotifierTest.java | 102 +++++++++++++++ pom.xml | 5 + .../integration/func/sql/SQLSessionIT.java | 10 +- 11 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/notice/EmailNotifier.java create mode 100644 core/src/test/java/cn/edu/tsinghua/iginx/notice/EmailNotifierTest.java diff --git a/conf/config.properties b/conf/config.properties index fc6f2acee0..733df2d9ca 100644 --- a/conf/config.properties +++ b/conf/config.properties @@ -254,3 +254,22 @@ enableEnvParameter=false # 读取csv文件时,每批数据的行数 batchSizeImportCsv=10000 + +################# +### configure email smtp to send notification +################# + +enableEmailNotification=false + +mailSmtpHost=smtp.qq.com + +# make sure enable ssl on connection +mailSmtpPort=465 + +mailSmtpUser=12345678@qq.com + +mailSmtpPassword=12345678 + +mailSender=12345678@qq.com + +mailRecipient=12345678@qq.com diff --git a/core/pom.xml b/core/pom.xml index add5c67d1a..4c3389c57c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -165,6 +165,10 @@ jaxb-api 2.2.3 + + org.apache.commons + commons-email + org.apache.commons commons-csv @@ -183,6 +187,12 @@ + + com.icegreen + greenmail-junit4 + 1.6.15 + test + diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java index f46934b7e9..f2711520a0 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/Config.java @@ -209,6 +209,22 @@ public class Config { ///////////// + private boolean enableEmailNotification = false; + + private String mailSmtpHost = ""; + + private int mailSmtpPort = 465; + + private String mailSmtpUser = ""; + + private String mailSmtpPassword = ""; + + private String mailSender = ""; + + private String mailRecipient = ""; + + ///////////// + private int batchSizeImportCsv = 10000; private boolean isUTTestEnv = false; // 是否是单元测试环境 @@ -896,4 +912,60 @@ public String getRuleBasedOptimizer() { public void setRuleBasedOptimizer(String ruleBasedOptimizer) { this.ruleBasedOptimizer = ruleBasedOptimizer; } + + public boolean isEnableEmailNotification() { + return enableEmailNotification; + } + + public void setEnableEmailNotification(boolean enableEmailNotification) { + this.enableEmailNotification = enableEmailNotification; + } + + public String getMailSmtpHost() { + return mailSmtpHost; + } + + public void setMailSmtpHost(String mailSmtpHost) { + this.mailSmtpHost = mailSmtpHost; + } + + public int getMailSmtpPort() { + return mailSmtpPort; + } + + public void setMailSmtpPort(int mailSmtpPort) { + this.mailSmtpPort = mailSmtpPort; + } + + public String getMailSmtpUser() { + return mailSmtpUser; + } + + public void setMailSmtpUser(String mailSmtpUser) { + this.mailSmtpUser = mailSmtpUser; + } + + public String getMailSmtpPassword() { + return mailSmtpPassword; + } + + public void setMailSmtpPassword(String mailSmtpPassword) { + this.mailSmtpPassword = mailSmtpPassword; + } + + public String getMailSender() { + return mailSender; + } + + public void setMailSender(String mailSender) { + this.mailSender = mailSender; + } + + public String getMailRecipient() { + return mailRecipient; + } + + public void setMailRecipient(String mailRecipients) { + this.mailRecipient = mailRecipients; + } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java index a92ecb2581..03d8707c6e 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/conf/ConfigDescriptor.java @@ -234,6 +234,14 @@ private void loadPropsFromFile() { properties.getProperty( "ruleBasedOptimizer", "NotFilterRemoveRule=on,FragmentPruningByFilterRule=on,ColumnPruningRule=on,FragmentPruningByPatternRule=on")); + config.setEnableEmailNotification( + Boolean.parseBoolean(properties.getProperty("enableEmailNotification", "false"))); + config.setMailSmtpHost(properties.getProperty("mailSmtpHost", "")); + config.setMailSmtpPort(Integer.parseInt(properties.getProperty("mailSmtpPort", "465"))); + config.setMailSmtpUser(properties.getProperty("mailSmtpUser", "")); + config.setMailSmtpPassword(properties.getProperty("mailSmtpPassword", "")); + config.setMailSender(properties.getProperty("mailSender", "")); + config.setMailRecipient(properties.getProperty("mailRecipient", "")); } catch (IOException e) { config.setUTTestEnv(true); config.setNeedInitBasicUDFFunctions(false); @@ -360,6 +368,14 @@ private void loadPropsFromEnv() { config.setUTTestEnv(EnvUtils.loadEnv("utTestEnv", config.isUTTestEnv())); config.setRuleBasedOptimizer( EnvUtils.loadEnv("ruleBasedOptimizer", config.getRuleBasedOptimizer())); + config.setEnableEmailNotification( + Boolean.parseBoolean(EnvUtils.loadEnv("enableEmailNotification", "false"))); + config.setMailSmtpHost(EnvUtils.loadEnv("mailSmtpHost", "")); + config.setMailSmtpPort(Integer.parseInt(EnvUtils.loadEnv("mailSmtpPort", "465"))); + config.setMailSmtpUser(EnvUtils.loadEnv("mailSmtpUser", "")); + config.setMailSmtpPassword(EnvUtils.loadEnv("mailSmtpPassword", "")); + config.setMailSender(EnvUtils.loadEnv("mailSender", "")); + config.setMailRecipient(EnvUtils.loadEnv("mailRecipient", "")); } private void loadUDFListFromFile() { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/notice/EmailNotifier.java b/core/src/main/java/cn/edu/tsinghua/iginx/notice/EmailNotifier.java new file mode 100644 index 0000000000..72bfa22149 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/notice/EmailNotifier.java @@ -0,0 +1,122 @@ +package cn.edu.tsinghua.iginx.notice; + +import cn.edu.tsinghua.iginx.conf.Config; +import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.thrift.JobState; +import cn.edu.tsinghua.iginx.transform.pojo.Job; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Date; +import org.apache.commons.mail.Email; +import org.apache.commons.mail.EmailException; +import org.apache.commons.mail.SimpleEmail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EmailNotifier { + private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotifier.class); + private static final EmailNotifier INSTANCE = new EmailNotifier(); + + boolean mailEnable; + private final String mailHost; + private final int mailPort; + private final String mailUsername; + private final String mailPassword; + private final String mailSender; + private final String recipients; + private final String iginxHost; + private final int iginxPort; + + public static EmailNotifier getInstance() { + return INSTANCE; + } + + public EmailNotifier() { + this(ConfigDescriptor.getInstance().getConfig()); + } + + public EmailNotifier(Config config) { + this( + config.isEnableEmailNotification(), + config.getMailSmtpHost(), + config.getMailSmtpPort(), + config.getMailSmtpUser(), + config.getMailSmtpPassword(), + config.getMailSender(), + config.getMailRecipient(), + config.getIp(), + config.getPort()); + } + + public EmailNotifier( + boolean mailEnable, + String mailHost, + int mailPort, + String mailUsername, + String mailPassword, + String mailSender, + String mailRecipient, + String iginxHost, + int iginxPort) { + this.mailEnable = mailEnable; + this.mailHost = mailHost; + this.mailPort = mailPort; + this.mailUsername = mailUsername; + this.mailPassword = mailPassword; + this.mailSender = mailSender; + this.recipients = mailRecipient; + this.iginxHost = iginxHost; + this.iginxPort = iginxPort; + } + + void sendEmail(String subject, String content) { + if (!mailEnable) { + LOGGER.debug("Email notification is disabled. Subject: {}, Content: {}", subject, content); + return; + } + + try { + Email email = new SimpleEmail(); + email.setHostName(mailHost); + email.setSslSmtpPort(String.valueOf(mailPort)); + email.setSSLOnConnect(true); + email.setAuthentication(mailUsername, mailPassword); + email.setFrom(mailSender); + email.setSubject(subject); + email.setMsg(content); + email.addTo(recipients.split(",")); + email.send(); + LOGGER.info("Email notification sent. Subject: {}", subject); + } catch (EmailException e) { + LOGGER.error("Failed to send email notification. Subject: {}", subject, e); + } + } + + public void send(Job job) { + JobState jobState = job.getState(); + String jobStateStr = jobState.name().split("_")[1].toLowerCase(); + String subject = String.format("Job %d is %s", job.getJobId(), jobStateStr); + + StringBuilder content = new StringBuilder(); + content.append("Job ID: ").append(job.getJobId()).append("\n"); + content.append("Job State: ").append(jobStateStr).append("\n"); + content.append("Job Start Time: ").append(new Date(job.getStartTime())).append("\n"); + long endTime = job.getEndTime(); + if (endTime != 0) { + content.append("Job End Time: ").append(new Date(endTime)).append("\n"); + } + content.append("IGinX Host: ").append(iginxHost).append("\n"); + content.append("IGinX Port: ").append(iginxPort).append("\n"); + + Exception e = job.getException(); + if (e != null) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + content.append("Exception: ").append("\n"); + content.append(sw); + } + + sendEmail(subject, content.toString()); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/JobRunner.java b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/JobRunner.java index cd9ec1bb04..6871c2686f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/JobRunner.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/JobRunner.java @@ -56,11 +56,13 @@ public void run() { // we don't need this.close() because all children runners are closed. if (job.getActive().compareAndSet(true, false)) { job.setState(JobState.JOB_FINISHED); + job.setException(null); } } catch (TransformException e) { LOGGER.error("Fail to run transform job id={}, because", job.getJobId(), e); if (job.getActive().compareAndSet(true, false)) { job.setState(JobState.JOB_FAILING); + job.setException(e); close(); job.setState(JobState.JOB_FAILED); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java index 6407a4dec7..63fc1ca2d2 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/transform/exec/TransformJobManager.java @@ -2,6 +2,7 @@ import cn.edu.tsinghua.iginx.conf.Config; import cn.edu.tsinghua.iginx.conf.ConfigDescriptor; +import cn.edu.tsinghua.iginx.notice.EmailNotifier; import cn.edu.tsinghua.iginx.thrift.CommitTransformJobReq; import cn.edu.tsinghua.iginx.thrift.JobState; import cn.edu.tsinghua.iginx.thrift.TaskType; @@ -78,6 +79,7 @@ private void processWithRetry(Job job, int retryTimes) { LOGGER.error("retry process, executed times: {}", (processCnt + 1)); } } + EmailNotifier.getInstance().send(job); } private void process(Job job) throws Exception { diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/transform/pojo/Job.java b/core/src/main/java/cn/edu/tsinghua/iginx/transform/pojo/Job.java index 8b4129ff9f..d2f71812d9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/transform/pojo/Job.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/transform/pojo/Job.java @@ -21,6 +21,7 @@ public class Job { private final AtomicBoolean active; private long startTime; private long endTime; + private Exception exception; private boolean needExport; private ExportType exportType; diff --git a/core/src/test/java/cn/edu/tsinghua/iginx/notice/EmailNotifierTest.java b/core/src/test/java/cn/edu/tsinghua/iginx/notice/EmailNotifierTest.java new file mode 100644 index 0000000000..be4e4dbc77 --- /dev/null +++ b/core/src/test/java/cn/edu/tsinghua/iginx/notice/EmailNotifierTest.java @@ -0,0 +1,102 @@ +package cn.edu.tsinghua.iginx.notice; + +import static org.junit.Assert.*; + +import cn.edu.tsinghua.iginx.thrift.JobState; +import cn.edu.tsinghua.iginx.transform.pojo.Job; +import cn.edu.tsinghua.iginx.utils.JobFromYAML; +import com.icegreen.greenmail.junit4.GreenMailRule; +import com.icegreen.greenmail.util.GreenMailUtil; +import com.icegreen.greenmail.util.ServerSetupTest; +import java.util.Arrays; +import java.util.Collections; +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EmailNotifierTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(EmailNotifierTest.class); + + @Rule public final GreenMailRule greenMail = new GreenMailRule(ServerSetupTest.SMTPS); + + @BeforeClass + public static void setUp() { + System.setProperty("mail.smtp.ssl.trust", "127.0.0.1"); + } + + EmailNotifier emailNotifier; + + @Before + public void before() { + greenMail.setUser("from@localhost", "password"); + emailNotifier = + new EmailNotifier( + true, + "127.0.0.1", + 3465, + "from@localhost", + "password", + "from@localhost", + "to@localhost", + "localhost", + 6888); + } + + @Test + public void testSendEmail() throws MessagingException { + emailNotifier.sendEmail("subject", "body"); + assertEquals(1, greenMail.getReceivedMessages().length); + MimeMessage mimeMessage = greenMail.getReceivedMessages()[0]; + + assertEquals("subject", mimeMessage.getSubject()); + assertEquals("body", GreenMailUtil.getBody(mimeMessage)); + assertEquals("[from@localhost]", Arrays.toString(mimeMessage.getFrom())); + assertEquals("[to@localhost]", Arrays.toString(mimeMessage.getAllRecipients())); + } + + @Test + public void testNotifyJobState() throws MessagingException { + JobFromYAML jobFromYAML = new JobFromYAML(); + jobFromYAML.setExportType("csv"); + jobFromYAML.setTaskList(Collections.emptyList()); + Job job = new Job(53, 102, jobFromYAML); + job.setStartTime(1716384072742L); + job.setState(JobState.JOB_FINISHED); + job.setEndTime(1716384072743L); + emailNotifier.send(job); + assertEquals(1, greenMail.getReceivedMessages().length); + MimeMessage mimeMessage = greenMail.getReceivedMessages()[0]; + + assertEquals("Job 53 is finished", mimeMessage.getSubject()); + } + + @Test + public void testNotifyJobStateException() throws MessagingException { + JobFromYAML jobFromYAML = new JobFromYAML(); + jobFromYAML.setExportType("csv"); + jobFromYAML.setTaskList(Collections.emptyList()); + Job job = new Job(53, 102, jobFromYAML); + try { + throw new Exception("example exception"); + } catch (Exception e) { + job.setException(e); + } + job.setStartTime(1716384072742L); + job.setState(JobState.JOB_FINISHED); + job.setEndTime(1716384072743L); + emailNotifier.send(job); + assertEquals(1, greenMail.getReceivedMessages().length); + MimeMessage mimeMessage = greenMail.getReceivedMessages()[0]; + + LOGGER.info(GreenMailUtil.getBody(mimeMessage)); + + assertEquals("Job 53 is finished", mimeMessage.getSubject()); + assertTrue(GreenMailUtil.getBody(mimeMessage).contains("example exception")); + } +} diff --git a/pom.xml b/pom.xml index bdca6a88c3..034c86e16c 100644 --- a/pom.xml +++ b/pom.xml @@ -166,6 +166,11 @@ commons-pool2 2.12.0 + + org.apache.commons + commons-email + 1.6.0 + org.apache.zookeeper zookeeper diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java index 24de334ebb..f63c5008fe 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java @@ -6223,7 +6223,15 @@ public void testShowConfig() { "maxTimeseriesLength", "batchSize", "parallelGroupByPoolSize", - "username")); + "username", + "enableEmailNotification", + "mailSmtpHost", + "mailSmtpPort", + "mailSmtpUser", + "mailSmtpPassword", + "mailSender", + "mailRecipient")); + assertEquals(expectedConfigNames, configs.keySet()); }