From 8466b935cca666dfa1f13957827e3098fc763810 Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Sun, 18 Feb 2024 23:11:20 +0530 Subject: [PATCH] chore(notifications): complete the logic for sending email --- core/notifications/notifications.yml | 1 + core/notifications/src/app/mod.rs | 9 ++-- core/notifications/src/email_executor/mod.rs | 16 ++++-- .../src/email_executor/smtp/config.rs | 1 + .../src/email_executor/smtp/mod.rs | 12 +++-- core/notifications/src/job/error.rs | 6 ++- core/notifications/src/job/mod.rs | 51 ++++++++++++++++-- .../src/job/send_email_notification.rs | 32 +++++++++++ core/notifications/src/notification_event.rs | 53 ++++++++++++++++--- core/notifications/src/primitives.rs | 4 +- 10 files changed, 159 insertions(+), 26 deletions(-) create mode 100644 core/notifications/src/job/send_email_notification.rs diff --git a/core/notifications/notifications.yml b/core/notifications/notifications.yml index 669b2dc7c04..c7b90bc1341 100644 --- a/core/notifications/notifications.yml +++ b/core/notifications/notifications.yml @@ -15,3 +15,4 @@ app: smtp: username: "" from_email: "" + relay: "" diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index d1a7f244cc6..92fc238af38 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -26,9 +26,9 @@ pub struct NotificationsApp { impl NotificationsApp { pub async fn init(pool: Pool, config: AppConfig) -> Result { let settings = UserNotificationSettingsRepo::new(&pool); - let executor = PushExecutor::init(config.executor.clone(), settings.clone()).await?; - let _email_executor = EmailExecutor::init(config.email_executor.clone(), settings.clone())?; - let runner = job::start_job_runner(&pool, executor).await?; + let push_executor = PushExecutor::init(config.executor.clone(), settings.clone()).await?; + let email_executor = EmailExecutor::init(config.email_executor.clone(), settings.clone())?; + let runner = job::start_job_runner(&pool, push_executor, email_executor).await?; Ok(Self { _config: config, pool, @@ -175,6 +175,9 @@ impl NotificationsApp { event: T, ) -> Result<(), ApplicationError> { let mut tx = self.pool.begin().await?; + if event.should_send_email() { + job::spawn_send_email_notification(&mut tx, event.clone().into()).await?; + } job::spawn_send_push_notification(&mut tx, event.into()).await?; tx.commit().await?; Ok(()) diff --git a/core/notifications/src/email_executor/mod.rs b/core/notifications/src/email_executor/mod.rs index f5f6ef6a7af..50ff3aefab7 100644 --- a/core/notifications/src/email_executor/mod.rs +++ b/core/notifications/src/email_executor/mod.rs @@ -2,6 +2,8 @@ mod config; pub mod error; mod smtp; +use tracing::instrument; + use crate::{notification_event::*, user_notification_settings::*}; pub use config::*; @@ -28,10 +30,18 @@ impl EmailExecutor { }) } + #[instrument(name = "email_executor.notify", skip(self))] pub async fn notify(&self, event: &T) -> Result<(), EmailExecutorError> { - let settings = self.settings.find_for_user_id(event.user_id()).await?; - let msg = event.to_localized_msg(settings.locale().unwrap_or_default()); - self.smtp.send_email(msg).await?; + if let Some((settings, addr)) = self + .settings + .find_for_user_id(event.user_id()) + .await + .ok() + .and_then(|s| s.email_address().map(|addr| (s, addr))) + { + let msg = event.to_localized_msg(settings.locale().unwrap_or_default()); + self.smtp.send_email(msg, addr).await?; + } Ok(()) } } diff --git a/core/notifications/src/email_executor/smtp/config.rs b/core/notifications/src/email_executor/smtp/config.rs index 017d37f7f57..787241a29cc 100644 --- a/core/notifications/src/email_executor/smtp/config.rs +++ b/core/notifications/src/email_executor/smtp/config.rs @@ -6,4 +6,5 @@ pub struct SmtpConfig { #[serde(default)] pub password: String, pub from_email: String, + pub relay: String, } diff --git a/core/notifications/src/email_executor/smtp/mod.rs b/core/notifications/src/email_executor/smtp/mod.rs index c037be1509d..b3b3c04594a 100644 --- a/core/notifications/src/email_executor/smtp/mod.rs +++ b/core/notifications/src/email_executor/smtp/mod.rs @@ -7,7 +7,7 @@ use lettre::{ AsyncSmtpTransport, AsyncTransport, Tokio1Executor, }; -use crate::messages::LocalizedMessage; +use crate::{messages::LocalizedMessage, primitives::GaloyEmailAddress}; pub use config::*; use error::*; @@ -22,7 +22,7 @@ impl SmtpClient { pub fn init(config: SmtpConfig) -> Result { let creds = Credentials::new(config.username, config.password); let client: AsyncSmtpTransport = - AsyncSmtpTransport::::starttls_relay("smtp.gmail.com")? + AsyncSmtpTransport::::starttls_relay(&config.relay)? .credentials(creds) .build(); Ok(Self { @@ -31,10 +31,14 @@ impl SmtpClient { }) } - pub async fn send_email(&self, msg: LocalizedMessage) -> Result<(), SmtpError> { + pub async fn send_email( + &self, + msg: LocalizedMessage, + recipient_addr: GaloyEmailAddress, + ) -> Result<(), SmtpError> { let email = Message::builder() .from(Mailbox::new(None, self.from_email.parse()?)) - .to(Mailbox::new(None, "some-email".parse()?)) + .to(Mailbox::new(None, recipient_addr.into_inner().parse()?)) .subject(msg.title) .body(msg.body)?; self.client.send(email).await?; diff --git a/core/notifications/src/job/error.rs b/core/notifications/src/job/error.rs index c52a01a0f11..86db86603da 100644 --- a/core/notifications/src/job/error.rs +++ b/core/notifications/src/job/error.rs @@ -1,13 +1,15 @@ use thiserror::Error; -use crate::push_executor::error::PushExecutorError; +use crate::{email_executor::error::EmailExecutorError, push_executor::error::PushExecutorError}; #[derive(Error, Debug)] pub enum JobError { #[error("JobError - Sqlx: {0}")] Sqlx(#[from] sqlx::Error), - #[error("JobError - ExecutorError: {0}")] + #[error("JobError - PushExecutorError: {0}")] PushExecutor(#[from] PushExecutorError), + #[error("JobError - EmailExecutorError: {0}")] + EmailExecutor(#[from] EmailExecutorError), } impl job_executor::JobExecutionError for JobError {} diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index fa662d29634..9f0a9ccb162 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -1,3 +1,4 @@ +mod send_email_notification; mod send_push_notification; pub mod error; @@ -7,18 +8,21 @@ use tracing::instrument; use job_executor::JobExecutor; -use crate::push_executor::PushExecutor; +use crate::{email_executor::EmailExecutor, push_executor::PushExecutor}; use error::JobError; +use send_email_notification::SendEmailNotificationData; use send_push_notification::SendPushNotificationData; pub async fn start_job_runner( pool: &sqlx::PgPool, - executor: PushExecutor, + push_executor: PushExecutor, + email_executor: EmailExecutor, ) -> Result { - let mut registry = JobRegistry::new(&[send_push_notification]); - registry.set_context(executor); + let mut registry = JobRegistry::new(&[send_push_notification, send_email_notification]); + registry.set_context(push_executor); + registry.set_context(email_executor); Ok(registry.runner(pool).set_keep_alive(false).run().await?) } @@ -61,3 +65,42 @@ pub async fn spawn_send_push_notification( } Ok(()) } + +#[job( + name = "send_email_notification", + channel_name = "send_email_notification" +)] +async fn send_email_notification( + mut current_job: CurrentJob, + executor: EmailExecutor, +) -> Result<(), JobError> { + JobExecutor::builder(&mut current_job) + .build() + .expect("couldn't build JobExecutor") + .execute(|data| async move { + let data: SendEmailNotificationData = + data.expect("no SendEmailNotificationData available"); + send_email_notification::execute(data, executor).await + }) + .await?; + Ok(()) +} + +#[instrument(name = "job.spawn_send_email_notification", skip_all, fields(error, error.level, error.message), err)] +pub async fn spawn_send_email_notification( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + data: impl Into, +) -> Result<(), JobError> { + let data = data.into(); + if let Err(e) = send_email_notification + .builder() + .set_json(&data) + .expect("Couldn't set json") + .spawn(&mut **tx) + .await + { + tracing::insert_error_fields(tracing::Level::WARN, &e); + return Err(e.into()); + } + Ok(()) +} diff --git a/core/notifications/src/job/send_email_notification.rs b/core/notifications/src/job/send_email_notification.rs new file mode 100644 index 00000000000..1a9ae9e5778 --- /dev/null +++ b/core/notifications/src/job/send_email_notification.rs @@ -0,0 +1,32 @@ +use serde::{Deserialize, Serialize}; +use tracing::instrument; + +use std::collections::HashMap; + +use super::error::JobError; +use crate::{email_executor::EmailExecutor, notification_event::NotificationEventPayload}; + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct SendEmailNotificationData { + payload: NotificationEventPayload, + #[serde(flatten)] + pub(super) tracing_data: HashMap, +} + +impl From for SendEmailNotificationData { + fn from(payload: NotificationEventPayload) -> Self { + Self { + payload, + tracing_data: tracing::extract_tracing_data(), + } + } +} + +#[instrument(name = "job.send_email_notification", skip(executor), err)] +pub async fn execute( + data: SendEmailNotificationData, + executor: EmailExecutor, +) -> Result { + executor.notify(&data.payload).await?; + Ok(data) +} diff --git a/core/notifications/src/notification_event.rs b/core/notifications/src/notification_event.rs index cf55cf5073a..7d988024db3 100644 --- a/core/notifications/src/notification_event.rs +++ b/core/notifications/src/notification_event.rs @@ -7,14 +7,15 @@ pub enum DeepLink { Circles, } -pub trait NotificationEvent: std::fmt::Debug + Into { +pub trait NotificationEvent: std::fmt::Debug + Into + Clone { fn category(&self) -> UserNotificationCategory; fn user_id(&self) -> &GaloyUserId; fn deep_link(&self) -> DeepLink; fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage; + fn should_send_email(&self) -> bool; } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "type", rename_all = "snake_case")] pub enum NotificationEventPayload { CircleGrew(CircleGrew), @@ -72,9 +73,25 @@ impl NotificationEvent for NotificationEventPayload { } } } + + fn should_send_email(&self) -> bool { + match self { + NotificationEventPayload::CircleGrew(event) => event.should_send_email(), + NotificationEventPayload::CircleThresholdReached(event) => event.should_send_email(), + NotificationEventPayload::IdentityVerificationApproved(event) => { + event.should_send_email() + } + NotificationEventPayload::IdentityVerificationDeclined(event) => { + event.should_send_email() + } + NotificationEventPayload::IdentityVerificationReviewPending(event) => { + event.should_send_email() + } + } + } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct CircleGrew { pub user_id: GaloyUserId, pub circle_type: CircleType, @@ -98,6 +115,10 @@ impl NotificationEvent for CircleGrew { fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { Messages::circle_grew(locale.as_ref(), self) } + + fn should_send_email(&self) -> bool { + false + } } impl From for NotificationEventPayload { @@ -106,7 +127,7 @@ impl From for NotificationEventPayload { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct CircleThresholdReached { pub user_id: GaloyUserId, pub circle_type: CircleType, @@ -130,6 +151,10 @@ impl NotificationEvent for CircleThresholdReached { fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { Messages::circle_threshold_reached(locale.as_ref(), self) } + + fn should_send_email(&self) -> bool { + false + } } impl From for NotificationEventPayload { @@ -138,7 +163,7 @@ impl From for NotificationEventPayload { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct IdentityVerificationApproved { pub user_id: GaloyUserId, } @@ -159,6 +184,10 @@ impl NotificationEvent for IdentityVerificationApproved { fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { Messages::identity_verification_approved(locale.as_ref(), self) } + + fn should_send_email(&self) -> bool { + true + } } impl From for NotificationEventPayload { @@ -167,13 +196,13 @@ impl From for NotificationEventPayload { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum IdentityVerificationDeclinedReason { DocumentsNotClear, VerificationPhotoNotClear, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct IdentityVerificationDeclined { pub user_id: GaloyUserId, pub declined_reason: IdentityVerificationDeclinedReason, @@ -195,6 +224,10 @@ impl NotificationEvent for IdentityVerificationDeclined { fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { Messages::identity_verification_declined(locale.as_ref(), self) } + + fn should_send_email(&self) -> bool { + true + } } impl From for NotificationEventPayload { @@ -203,7 +236,7 @@ impl From for NotificationEventPayload { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct IdentityVerificationReviewPending { pub user_id: GaloyUserId, } @@ -224,6 +257,10 @@ impl NotificationEvent for IdentityVerificationReviewPending { fn to_localized_msg(&self, locale: GaloyLocale) -> LocalizedMessage { Messages::identity_verification_review_pending(locale.as_ref(), self) } + + fn should_send_email(&self) -> bool { + true + } } impl From for NotificationEventPayload { diff --git a/core/notifications/src/primitives.rs b/core/notifications/src/primitives.rs index 1fec7bece24..aa9ddd35213 100644 --- a/core/notifications/src/primitives.rs +++ b/core/notifications/src/primitives.rs @@ -120,7 +120,7 @@ pub enum UserNotificationCategory { AdminNotification, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum CircleType { Inner, Outer, @@ -135,7 +135,7 @@ impl std::fmt::Display for CircleType { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum CircleTimeFrame { Month, AllTime,