Skip to content

Commit

Permalink
feat: add asynchronous newsletter delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
migueloller committed Feb 11, 2024
1 parent ec9bfd0 commit acaeaaf
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 98 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ secrecy = { version = "0.8", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde-aux = "4"
serde_json = "1"
serde_urlencoded = "0.7.1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1", features = ["log"] }
Expand Down
8 changes: 8 additions & 0 deletions migrations/20240211190953_create_newsletter_issues_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
create table newsletter_issues (
newsletter_issue_id uuid not null,
title text not null,
text_content text not null,
html_content text not null,
published_at timestamptz not null,
primary key (newsletter_issue_id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table issue_delivery_queue (
newsletter_issue_id uuid not null
references newsletter_issues (newsletter_issue_id),
subscriber_email text not null,
primary key (newsletter_issue_id, subscriber_email)
);
13 changes: 13 additions & 0 deletions src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::domain::SubscriberEmail;
use crate::email_client::EmailClient;
use secrecy::{ExposeSecret, Secret};
use serde_aux::field_attributes::deserialize_number_from_string;
use sqlx::postgres::PgConnectOptions;
Expand Down Expand Up @@ -62,6 +63,18 @@ pub struct EmailClientSettings {
}

impl EmailClientSettings {
pub fn client(self) -> EmailClient {
let sender_email = self.sender().expect("Invalid sender email address.");
let timeout = self.timeout();

EmailClient::new(
self.base_url,
sender_email,
self.authorization_token,
timeout,
)
}

pub fn sender(&self) -> Result<SubscriberEmail, String> {
SubscriberEmail::parse(self.sender_email.clone())
}
Expand Down
168 changes: 168 additions & 0 deletions src/issue_delivery_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
use std::time::Duration;

use sqlx::{Executor, PgPool, Postgres, Transaction};
use tracing::{field::display, Span};
use uuid::Uuid;

use crate::{
configuration::Settings, domain::SubscriberEmail, email_client::EmailClient,
startup::get_connection_pool,
};

pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}

#[tracing::instrument(
skip_all,
fields(
newsletter_issue_id=tracing::field::Empty,
subscriber_email=tracing::field::Empty,
),
err
)]
pub async fn try_execute_task(
pool: &PgPool,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(pool).await?;

if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}

let (transaction, issue_id, email) = task.unwrap();

Span::current()
.record("newsletter_issue_id", &display(issue_id))
.record("subscriber_email", &display(&email));

match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(pool, issue_id).await?;
if let Err(e) = email_client
.send_email(
&email,
&issue.title,
&issue.html_content,
&issue.text_content,
)
.await
{
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"Failed to deliver issue to a confirmed subscriber. \
Skipping."
);
}
}
Err(e) => {
tracing::error!(
error.cause_chain = ?e,
error.message = %e,
"Skipping a confirmed subscriber. \
Their stored contact details are invalid."
);
}
}

delete_task(transaction, issue_id, &email).await?;

Ok(ExecutionOutcome::TaskCompleted)
}

type PgTransaction = Transaction<'static, Postgres>;

#[tracing::instrument(skip_all)]
async fn dequeue_task(
pool: &PgPool,
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
let mut transaction = pool.begin().await?;
let query = sqlx::query!(
r#"
select newsletter_issue_id, subscriber_email
from issue_delivery_queue
for update
skip locked
limit 1
"#
);
let r = query.fetch_optional(&mut *transaction).await?;

if let Some(r) = r {
Ok(Some((
transaction,
r.newsletter_issue_id,
r.subscriber_email,
)))
} else {
Ok(None)
}
}

#[tracing::instrument(skip_all)]
async fn delete_task(
mut transaction: PgTransaction,
issue_id: Uuid,
email: &str,
) -> Result<(), anyhow::Error> {
let query = sqlx::query!(
r#"
delete from issue_delivery_queue
where
newsletter_issue_id = $1
and
subscriber_email = $2
"#,
issue_id,
email
);
transaction.execute(query).await?;
transaction.commit().await?;
Ok(())
}

struct NewsletterIssue {
title: String,
text_content: String,
html_content: String,
}

#[tracing::instrument(skip_all)]
async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, anyhow::Error> {
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
select title, text_content, html_content
from newsletter_issues
where newsletter_issue_id = $1
"#,
issue_id
)
.fetch_one(pool)
.await?;
Ok(issue)
}

async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
tokio::time::sleep(Duration::from_secs(10)).await;
}
Err(_) => {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(ExecutionOutcome::TaskCompleted) => {}
}
}
}

pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
let connection_pool = get_connection_pool(&configuration.database);
let email_client = configuration.email_client.client();

worker_loop(connection_pool, email_client).await
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod issue_delivery_worker;
pub mod routes;
pub mod session_state;
pub mod startup;
Expand Down
Loading

0 comments on commit acaeaaf

Please sign in to comment.