From acaeaaf8f04659abfe993f5be9f01489c40641da Mon Sep 17 00:00:00 2001 From: Miguel Oller Date: Sun, 11 Feb 2024 16:05:09 -0500 Subject: [PATCH] feat: add asynchronous newsletter delivery --- ...10890170b8f0871beafd67140df90fc00103d.json | 20 --- ...adde29ccb5c8248c2259360ff88ac71ba7445.json | 26 +++ ...2d8be125627936aa6453847266246a6ec56a6.json | 14 ++ ...024d2899e62a2a33f3f5e4205a117540cdaea.json | 17 ++ ...edfe56e400450fcc770e195dc6cf614dca56c.json | 15 ++ ...279f112233f94e92fbb5fb268f591008b4921.json | 34 ++++ Cargo.lock | 1 + Cargo.toml | 1 + ...1190953_create_newsletter_issues_table.sql | 8 + ...1447_create_issue_delivery_queue_table.sql | 6 + src/configuration.rs | 13 ++ src/issue_delivery_worker.rs | 168 ++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 37 +++- src/routes/admin/newsletters/post.rs | 109 +++++++----- src/startup.rs | 14 +- tests/api/helpers.rs | 16 ++ tests/api/newsletter.rs | 48 +++-- 18 files changed, 450 insertions(+), 98 deletions(-) delete mode 100644 .sqlx/query-23478780b3e38a2e2d874cabc6910890170b8f0871beafd67140df90fc00103d.json create mode 100644 .sqlx/query-3ae61fdaad5eb177486c789f79badde29ccb5c8248c2259360ff88ac71ba7445.json create mode 100644 .sqlx/query-8904842ca2081847a6be863851d2d8be125627936aa6453847266246a6ec56a6.json create mode 100644 .sqlx/query-a6bb6620fa3aef80bf97ba9bd93024d2899e62a2a33f3f5e4205a117540cdaea.json create mode 100644 .sqlx/query-a7fbc151e283d35219033b66b3aedfe56e400450fcc770e195dc6cf614dca56c.json create mode 100644 .sqlx/query-dfa8fd892a3063747feccb109a2279f112233f94e92fbb5fb268f591008b4921.json create mode 100644 migrations/20240211190953_create_newsletter_issues_table.sql create mode 100644 migrations/20240211191447_create_issue_delivery_queue_table.sql create mode 100644 src/issue_delivery_worker.rs diff --git a/.sqlx/query-23478780b3e38a2e2d874cabc6910890170b8f0871beafd67140df90fc00103d.json b/.sqlx/query-23478780b3e38a2e2d874cabc6910890170b8f0871beafd67140df90fc00103d.json deleted file mode 100644 index a24b14a..0000000 --- a/.sqlx/query-23478780b3e38a2e2d874cabc6910890170b8f0871beafd67140df90fc00103d.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n select email\n from subscriptions\n where status = 'confirmed'\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "email", - "type_info": "Text" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "23478780b3e38a2e2d874cabc6910890170b8f0871beafd67140df90fc00103d" -} diff --git a/.sqlx/query-3ae61fdaad5eb177486c789f79badde29ccb5c8248c2259360ff88ac71ba7445.json b/.sqlx/query-3ae61fdaad5eb177486c789f79badde29ccb5c8248c2259360ff88ac71ba7445.json new file mode 100644 index 0000000..1ef995b --- /dev/null +++ b/.sqlx/query-3ae61fdaad5eb177486c789f79badde29ccb5c8248c2259360ff88ac71ba7445.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select newsletter_issue_id, subscriber_email\n from issue_delivery_queue\n for update\n skip locked\n limit 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "newsletter_issue_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "subscriber_email", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "3ae61fdaad5eb177486c789f79badde29ccb5c8248c2259360ff88ac71ba7445" +} diff --git a/.sqlx/query-8904842ca2081847a6be863851d2d8be125627936aa6453847266246a6ec56a6.json b/.sqlx/query-8904842ca2081847a6be863851d2d8be125627936aa6453847266246a6ec56a6.json new file mode 100644 index 0000000..eace54e --- /dev/null +++ b/.sqlx/query-8904842ca2081847a6be863851d2d8be125627936aa6453847266246a6ec56a6.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n insert into issue_delivery_queue (\n newsletter_issue_id,\n subscriber_email\n )\n select $1, email\n from subscriptions\n where status = 'confirmed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8904842ca2081847a6be863851d2d8be125627936aa6453847266246a6ec56a6" +} diff --git a/.sqlx/query-a6bb6620fa3aef80bf97ba9bd93024d2899e62a2a33f3f5e4205a117540cdaea.json b/.sqlx/query-a6bb6620fa3aef80bf97ba9bd93024d2899e62a2a33f3f5e4205a117540cdaea.json new file mode 100644 index 0000000..d35f2c3 --- /dev/null +++ b/.sqlx/query-a6bb6620fa3aef80bf97ba9bd93024d2899e62a2a33f3f5e4205a117540cdaea.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n insert into newsletter_issues (\n newsletter_issue_id,\n title,\n text_content,\n html_content,\n published_at\n )\n values ($1, $2, $3, $4, now())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "a6bb6620fa3aef80bf97ba9bd93024d2899e62a2a33f3f5e4205a117540cdaea" +} diff --git a/.sqlx/query-a7fbc151e283d35219033b66b3aedfe56e400450fcc770e195dc6cf614dca56c.json b/.sqlx/query-a7fbc151e283d35219033b66b3aedfe56e400450fcc770e195dc6cf614dca56c.json new file mode 100644 index 0000000..0b486be --- /dev/null +++ b/.sqlx/query-a7fbc151e283d35219033b66b3aedfe56e400450fcc770e195dc6cf614dca56c.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n delete from issue_delivery_queue\n where\n newsletter_issue_id = $1\n and\n subscriber_email = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "a7fbc151e283d35219033b66b3aedfe56e400450fcc770e195dc6cf614dca56c" +} diff --git a/.sqlx/query-dfa8fd892a3063747feccb109a2279f112233f94e92fbb5fb268f591008b4921.json b/.sqlx/query-dfa8fd892a3063747feccb109a2279f112233f94e92fbb5fb268f591008b4921.json new file mode 100644 index 0000000..33358eb --- /dev/null +++ b/.sqlx/query-dfa8fd892a3063747feccb109a2279f112233f94e92fbb5fb268f591008b4921.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select title, text_content, html_content\n from newsletter_issues\n where newsletter_issue_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "title", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "text_content", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "html_content", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "dfa8fd892a3063747feccb109a2279f112233f94e92fbb5fb268f591008b4921" +} diff --git a/Cargo.lock b/Cargo.lock index b28f144..a8b413f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3732,6 +3732,7 @@ dependencies = [ "serde", "serde-aux", "serde_json", + "serde_urlencoded", "sqlx", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index d33dbad..5924832 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ secrecy = { version = "0.8", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde-aux = "4" serde_json = "1" +serde_urlencoded = "0.7.1" thiserror = "1" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tracing = { version = "0.1", features = ["log"] } diff --git a/migrations/20240211190953_create_newsletter_issues_table.sql b/migrations/20240211190953_create_newsletter_issues_table.sql new file mode 100644 index 0000000..612034b --- /dev/null +++ b/migrations/20240211190953_create_newsletter_issues_table.sql @@ -0,0 +1,8 @@ +create table newsletter_issues ( + newsletter_issue_id uuid not null, + title text not null, + text_content text not null, + html_content text not null, + published_at timestamptz not null, + primary key (newsletter_issue_id) +); diff --git a/migrations/20240211191447_create_issue_delivery_queue_table.sql b/migrations/20240211191447_create_issue_delivery_queue_table.sql new file mode 100644 index 0000000..313394e --- /dev/null +++ b/migrations/20240211191447_create_issue_delivery_queue_table.sql @@ -0,0 +1,6 @@ +create table issue_delivery_queue ( + newsletter_issue_id uuid not null + references newsletter_issues (newsletter_issue_id), + subscriber_email text not null, + primary key (newsletter_issue_id, subscriber_email) +); diff --git a/src/configuration.rs b/src/configuration.rs index 19fd0e0..f8bbd71 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -1,4 +1,5 @@ use crate::domain::SubscriberEmail; +use crate::email_client::EmailClient; use secrecy::{ExposeSecret, Secret}; use serde_aux::field_attributes::deserialize_number_from_string; use sqlx::postgres::PgConnectOptions; @@ -62,6 +63,18 @@ pub struct EmailClientSettings { } impl EmailClientSettings { + pub fn client(self) -> EmailClient { + let sender_email = self.sender().expect("Invalid sender email address."); + let timeout = self.timeout(); + + EmailClient::new( + self.base_url, + sender_email, + self.authorization_token, + timeout, + ) + } + pub fn sender(&self) -> Result { SubscriberEmail::parse(self.sender_email.clone()) } diff --git a/src/issue_delivery_worker.rs b/src/issue_delivery_worker.rs new file mode 100644 index 0000000..21beff8 --- /dev/null +++ b/src/issue_delivery_worker.rs @@ -0,0 +1,168 @@ +use std::time::Duration; + +use sqlx::{Executor, PgPool, Postgres, Transaction}; +use tracing::{field::display, Span}; +use uuid::Uuid; + +use crate::{ + configuration::Settings, domain::SubscriberEmail, email_client::EmailClient, + startup::get_connection_pool, +}; + +pub enum ExecutionOutcome { + TaskCompleted, + EmptyQueue, +} + +#[tracing::instrument( + skip_all, + fields( + newsletter_issue_id=tracing::field::Empty, + subscriber_email=tracing::field::Empty, + ), + err +)] +pub async fn try_execute_task( + pool: &PgPool, + email_client: &EmailClient, +) -> Result { + let task = dequeue_task(pool).await?; + + if task.is_none() { + return Ok(ExecutionOutcome::EmptyQueue); + } + + let (transaction, issue_id, email) = task.unwrap(); + + Span::current() + .record("newsletter_issue_id", &display(issue_id)) + .record("subscriber_email", &display(&email)); + + match SubscriberEmail::parse(email.clone()) { + Ok(email) => { + let issue = get_issue(pool, issue_id).await?; + if let Err(e) = email_client + .send_email( + &email, + &issue.title, + &issue.html_content, + &issue.text_content, + ) + .await + { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "Failed to deliver issue to a confirmed subscriber. \ + Skipping." + ); + } + } + Err(e) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "Skipping a confirmed subscriber. \ + Their stored contact details are invalid." + ); + } + } + + delete_task(transaction, issue_id, &email).await?; + + Ok(ExecutionOutcome::TaskCompleted) +} + +type PgTransaction = Transaction<'static, Postgres>; + +#[tracing::instrument(skip_all)] +async fn dequeue_task( + pool: &PgPool, +) -> Result, anyhow::Error> { + let mut transaction = pool.begin().await?; + let query = sqlx::query!( + r#" + select newsletter_issue_id, subscriber_email + from issue_delivery_queue + for update + skip locked + limit 1 + "# + ); + let r = query.fetch_optional(&mut *transaction).await?; + + if let Some(r) = r { + Ok(Some(( + transaction, + r.newsletter_issue_id, + r.subscriber_email, + ))) + } else { + Ok(None) + } +} + +#[tracing::instrument(skip_all)] +async fn delete_task( + mut transaction: PgTransaction, + issue_id: Uuid, + email: &str, +) -> Result<(), anyhow::Error> { + let query = sqlx::query!( + r#" + delete from issue_delivery_queue + where + newsletter_issue_id = $1 + and + subscriber_email = $2 + "#, + issue_id, + email + ); + transaction.execute(query).await?; + transaction.commit().await?; + Ok(()) +} + +struct NewsletterIssue { + title: String, + text_content: String, + html_content: String, +} + +#[tracing::instrument(skip_all)] +async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result { + let issue = sqlx::query_as!( + NewsletterIssue, + r#" + select title, text_content, html_content + from newsletter_issues + where newsletter_issue_id = $1 + "#, + issue_id + ) + .fetch_one(pool) + .await?; + Ok(issue) +} + +async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> { + loop { + match try_execute_task(&pool, &email_client).await { + Ok(ExecutionOutcome::EmptyQueue) => { + tokio::time::sleep(Duration::from_secs(10)).await; + } + Err(_) => { + tokio::time::sleep(Duration::from_secs(1)).await; + } + Ok(ExecutionOutcome::TaskCompleted) => {} + } + } +} + +pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { + let connection_pool = get_connection_pool(&configuration.database); + let email_client = configuration.email_client.client(); + + worker_loop(connection_pool, email_client).await +} diff --git a/src/lib.rs b/src/lib.rs index 67bf1d0..d5d0ffe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod configuration; pub mod domain; pub mod email_client; pub mod idempotency; +pub mod issue_delivery_worker; pub mod routes; pub mod session_state; pub mod startup; diff --git a/src/main.rs b/src/main.rs index d313ef8..e6e4873 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ +use std::fmt::{Debug, Display}; +use tokio::task::JoinError; use zero2prod::{ configuration::get_configuration, + issue_delivery_worker::run_worker_until_stopped, startup::Application, telemetry::{get_subscriber, init_subscriber}, }; @@ -10,10 +13,38 @@ async fn main() -> anyhow::Result<()> { init_subscriber(subscriber); let configuration = get_configuration().expect("Failed to read configuration."); + let application = Application::build(configuration.clone()).await?; + let application_task = tokio::spawn(application.run_until_stopped()); + let worker_task = tokio::spawn(run_worker_until_stopped(configuration)); - let application = Application::build(configuration).await?; - - application.run_until_stopped().await?; + tokio::select! { + o = application_task => report_exit("API", o), + o = worker_task => report_exit("Background worker", o), + } Ok(()) } + +fn report_exit(task_name: &str, outcome: Result, JoinError>) { + match outcome { + Ok(Ok(())) => { + tracing::info!("{} has exited", task_name) + } + Ok(Err(e)) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "{} failed", + task_name + ) + } + Err(e) => { + tracing::error!( + error.cause_chain = ?e, + error.message = %e, + "{} task failed to complete", + task_name + ) + } + } +} diff --git a/src/routes/admin/newsletters/post.rs b/src/routes/admin/newsletters/post.rs index 411b5bc..30837a4 100644 --- a/src/routes/admin/newsletters/post.rs +++ b/src/routes/admin/newsletters/post.rs @@ -1,12 +1,11 @@ use crate::authentication::UserId; -use crate::domain::SubscriberEmail; -use crate::email_client::EmailClient; use crate::idempotency::{save_response, try_processing, IdempotencyKey, NextAction}; use crate::utils::{e400, e500, see_other}; use actix_web::{web, HttpResponse}; use actix_web_flash_messages::FlashMessage; use anyhow::Context; -use sqlx::PgPool; +use sqlx::{Executor, PgPool, Postgres, Transaction}; +use uuid::Uuid; #[derive(serde::Deserialize)] pub struct FormData { @@ -18,13 +17,12 @@ pub struct FormData { #[tracing::instrument( name = "Publish a newsletter issue", - skip(form, pool, email_client), + skip_all, fields(user_id=%*user_id) )] pub async fn publish_newsletter( form: web::Form, pool: web::Data, - email_client: web::Data, user_id: web::ReqData, ) -> Result { let user_id = user_id.into_inner(); @@ -35,7 +33,8 @@ pub async fn publish_newsletter( idempotency_key, } = form.0; let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?; - let transaction = match try_processing(&pool, &idempotency_key, *user_id) + + let mut transaction = match try_processing(&pool, &idempotency_key, *user_id) .await .map_err(e500)? { @@ -46,64 +45,78 @@ pub async fn publish_newsletter( } }; - let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?; - for subscriber in subscribers { - match subscriber { - Ok(subscriber) => { - email_client - .send_email(&subscriber.email, &title, &html_content, &text_content) - .await - .with_context(|| { - format!("Failed to send newsletter issue to {}.", subscriber.email) - }) - .map_err(e500)?; - } - Err(error) => { - tracing::warn!( - error.cause_chain = ?error, - "Skipping a confirmed subscriber. \ - Their scored contact details are invalid." - ) - } - } - } + let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content) + .await + .context("Failed to store newsletter issue details.") + .map_err(e500)?; + + enqueue_delivery_tasks(&mut transaction, issue_id) + .await + .context("Failed to enqueue delivery tasks") + .map_err(e500)?; - success_message().send(); let response = see_other("/admin/newsletters"); let response = save_response(transaction, &idempotency_key, *user_id, response) .await .map_err(e500)?; + success_message().send(); + Ok(response) } fn success_message() -> FlashMessage { - FlashMessage::info("The newsletter issue has been published!") + FlashMessage::info( + "The newsletter issue has been accepted - \ + emails will go out shortly.", + ) } -struct ConfirmedSubscriber { - email: SubscriberEmail, +#[tracing::instrument(skip_all)] +async fn insert_newsletter_issue( + transaction: &mut Transaction<'_, Postgres>, + title: &str, + text_content: &str, + html_content: &str, +) -> Result { + let newsletter_issue_id = Uuid::new_v4(); + let query = sqlx::query!( + r#" + insert into newsletter_issues ( + newsletter_issue_id, + title, + text_content, + html_content, + published_at + ) + values ($1, $2, $3, $4, now()) + "#, + newsletter_issue_id, + title, + text_content, + html_content + ); + transaction.execute(query).await?; + Ok(newsletter_issue_id) } -#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))] -async fn get_confirmed_subscribers( - pool: &PgPool, -) -> Result>, anyhow::Error> { - let confirmed_subscribers = sqlx::query!( +#[tracing::instrument(skip_all)] +async fn enqueue_delivery_tasks( + transaction: &mut Transaction<'_, Postgres>, + newsletter_issue_id: Uuid, +) -> Result<(), sqlx::Error> { + let query = sqlx::query!( r#" - select email + insert into issue_delivery_queue ( + newsletter_issue_id, + subscriber_email + ) + select $1, email from subscriptions where status = 'confirmed' "#, - ) - .fetch_all(pool) - .await? - .into_iter() - .map(|r| match SubscriberEmail::parse(r.email) { - Ok(email) => Ok(ConfirmedSubscriber { email }), - Err(error) => Err(anyhow::anyhow!(error)), - }) - .collect(); - - Ok(confirmed_subscribers) + newsletter_issue_id + ); + transaction.execute(query).await?; + Ok(()) } diff --git a/src/startup.rs b/src/startup.rs index 6dd381b..a4423d3 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -27,19 +27,7 @@ pub struct Application { impl Application { pub async fn build(configuration: Settings) -> Result { let connection_pool = get_connection_pool(&configuration.database); - - let sender_email = configuration - .email_client - .sender() - .expect("Invalid sender email address."); - let timeout = configuration.email_client.timeout(); - let email_client = EmailClient::new( - configuration.email_client.base_url, - sender_email, - configuration.email_client.authorization_token, - timeout, - ); - + let email_client = configuration.email_client.client(); let address = format!( "{}:{}", configuration.application.host, configuration.application.port diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index 386f6be..66c6693 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -4,6 +4,8 @@ use once_cell::sync::Lazy; use sqlx::{Connection, Executor, PgConnection, PgPool}; use uuid::Uuid; use wiremock::MockServer; +use zero2prod::email_client::EmailClient; +use zero2prod::issue_delivery_worker::{try_execute_task, ExecutionOutcome}; use zero2prod::{ configuration::{get_configuration, DatabaseSettings}, startup::{get_connection_pool, Application}, @@ -78,6 +80,7 @@ pub struct TestApp { pub email_server: MockServer, pub test_user: TestUser, pub api_client: reqwest::Client, + pub email_client: EmailClient, } pub struct ConfirmationLinks { @@ -211,6 +214,18 @@ impl TestApp { ConfirmationLinks { html, plain_text } } + + pub async fn dispatch_all_pending_emails(&self) { + loop { + if let ExecutionOutcome::EmptyQueue = + try_execute_task(&self.db_pool, &self.email_client) + .await + .unwrap() + { + break; + } + } + } } pub async fn spawn_app() -> TestApp { @@ -250,6 +265,7 @@ pub async fn spawn_app() -> TestApp { email_server, test_user: TestUser::generate(), api_client: client, + email_client: configuration.email_client.client(), }; test_app.test_user.store(&test_app.db_pool).await; diff --git a/tests/api/newsletter.rs b/tests/api/newsletter.rs index 83594f0..ad1388d 100644 --- a/tests/api/newsletter.rs +++ b/tests/api/newsletter.rs @@ -1,12 +1,20 @@ use std::time::Duration; +use fake::{ + faker::{internet::en::SafeEmail, name::en::Name}, + Fake, +}; use wiremock::{ matchers::{any, method, path}, - Mock, ResponseTemplate, + Mock, MockBuilder, ResponseTemplate, }; use crate::helpers::{assert_is_redirect_to, spawn_app, ConfirmationLinks, TestApp}; +fn when_sending_an_email() -> MockBuilder { + Mock::given(path("/emails")).and(method("POST")) +} + #[tokio::test] async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { // Arrange @@ -30,6 +38,7 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { app.post_newsletters(&newsletter_request_body).await; // Assert + app.dispatch_all_pending_emails().await; // Mock verifies on drop that we haven't sent the newsletter email. } @@ -40,8 +49,7 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { create_confirmed_subscriber(&app).await; app.test_user.login(&app).await; - Mock::given(path("/emails")) - .and(method("POST")) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -57,23 +65,29 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { app.post_newsletters(&newsletter_request_body).await; // Assert + app.dispatch_all_pending_emails().await; // Mock verifies on Drop that we have sent the newsletter email. } /// Use the public API of the application under test to create /// an unconfirmed subscriber. async fn create_uncorfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { - let body = "name=le%20guin&email=ursula_le_guin%40gmail.com"; - - let _mock_guard = Mock::given(path("/emails")) - .and(method("POST")) + let name: String = Name().fake(); + let email: String = SafeEmail().fake(); + let body = serde_urlencoded::to_string(serde_json::json!({ + "name": name, + "email": email + })) + .unwrap(); + + let _mock_guard = when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .named("Create unconfirmed subscriber") .expect(1) .mount_as_scoped(&app.email_server) .await; - app.post_subscriptions(body.into()) + app.post_subscriptions(body) .await .error_for_status() .unwrap(); @@ -124,8 +138,7 @@ async fn newsletter_creation_is_idempotent() { create_confirmed_subscriber(&app).await; app.test_user.login(&app).await; - Mock::given(path("/emails")) - .and(method("POST")) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -144,7 +157,10 @@ async fn newsletter_creation_is_idempotent() { // Act/Assert - Part 2 - Follow the redirect let html_page = app.get_publish_newsletter_html().await; - assert!(html_page.contains("

The newsletter issue has been published!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); // Act/Assert - Part 3 - Submit newsletter form **again** let response = app.post_newsletters(&newsletter_request_body).await; @@ -152,8 +168,12 @@ async fn newsletter_creation_is_idempotent() { // Act - Part 4 - Follow the redirect let html_page = app.get_publish_newsletter_html().await; - assert!(html_page.contains("

The newsletter issue has been published!

")); + assert!(html_page.contains( + "

The newsletter issue has been accepted - \ + emails will go out shortly.

" + )); + app.dispatch_all_pending_emails().await; // Mock verifies on Drop that we have sent the newsletter email **once**. } @@ -164,8 +184,7 @@ async fn concurrent_form_submission_is_handled_gracefully() { create_confirmed_subscriber(&app).await; app.test_user.login(&app).await; - Mock::given(path("/emails")) - .and(method("POST")) + when_sending_an_email() .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2))) .expect(1) .mount(&app.email_server) @@ -188,5 +207,6 @@ async fn concurrent_form_submission_is_handled_gracefully() { response2.text().await.unwrap() ); + app.dispatch_all_pending_emails().await; // Mock verifies on Drop that we have sent the newsletter email **once**. }