diff --git a/migrations/20240709145933_create_idempotency_table.sql b/migrations/20240709145933_create_idempotency_table.sql new file mode 100644 index 0000000..9d5de22 --- /dev/null +++ b/migrations/20240709145933_create_idempotency_table.sql @@ -0,0 +1,15 @@ +-- Add migration script here +CREATE TYPE header_pair AS ( + name TEXT, + value BYTEA +); + +CREATE TABLE idempotency ( + user_id uuid NOT NULL REFERENCES users(user_id), + idempotency_key TEXT NOT NULL, + response_status_code SMALLINT NOT NULL, + response_headers header_pair[] NOT NULL, + response_body BYTEA NOT NULL, + created_at timestamptz NOT NULL, + PRIMARY KEY(user_id, idempotency_key) +); \ No newline at end of file diff --git a/src/idempotency/key.rs b/src/idempotency/key.rs new file mode 100644 index 0000000..8bf10ff --- /dev/null +++ b/src/idempotency/key.rs @@ -0,0 +1,32 @@ +#[derive(Debug)] +pub struct IdempotencyKey(String); + +impl TryFrom for IdempotencyKey { + type Error = anyhow::Error; + + fn try_from(s: String) -> Result { + if s.is_empty() { + anyhow::bail!("The idempotency key cannot be empty"); + } + let max_length = 50; + if s.len() >= max_length { + anyhow::bail!( + "The idempotency key must be shorter + than {max_length} characters" + ); + } + Ok(Self(s)) + } +} + +impl From for String { + fn from(k: IdempotencyKey) -> Self { + k.0 + } +} + +impl AsRef for IdempotencyKey { + fn as_ref(&self) -> &str { + &self.0 + } +} diff --git a/src/idempotency/mod.rs b/src/idempotency/mod.rs new file mode 100644 index 0000000..bda225f --- /dev/null +++ b/src/idempotency/mod.rs @@ -0,0 +1,5 @@ +mod key; +mod persistence; + +pub use key::IdempotencyKey; +pub use persistence::{get_saved_response, save_response}; diff --git a/src/idempotency/persistence.rs b/src/idempotency/persistence.rs new file mode 100644 index 0000000..945dbba --- /dev/null +++ b/src/idempotency/persistence.rs @@ -0,0 +1,97 @@ +use super::IdempotencyKey; +use actix_web::body::to_bytes; +use actix_web::http::StatusCode; +use actix_web::HttpResponse; +use sqlx::postgres::PgHasArrayType; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, sqlx::Type)] +#[sqlx(type_name = "header_pair")] +struct HeaderPairRecord { + name: String, + value: Vec, +} + +pub async fn get_saved_response( + pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result, anyhow::Error> { + let saved_response = sqlx::query!( + r#" + SELECT + response_status_code, + response_headers as "response_headers: Vec", + response_body + FROM idempotency + WHERE + user_id = $1 AND + idempotency_key = $2 + "#, + user_id, + idempotency_key.as_ref() + ) + .fetch_optional(pool) + .await?; + + if let Some(r) = saved_response { + let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?; + let mut response = HttpResponse::build(status_code); + for HeaderPairRecord { name, value } in r.response_headers { + response.append_header((name, value)); + } + Ok(Some(response.body(r.response_body))) + } else { + Ok(None) + } +} + +impl PgHasArrayType for HeaderPairRecord { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { + sqlx::postgres::PgTypeInfo::with_name("_header_pair") + } +} + +pub async fn save_response( + pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, + http_response: HttpResponse, +) -> Result { + let (response_head, body) = http_response.into_parts(); + let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?; + let status_code = response_head.status().as_u16() as i16; + let headers = { + let mut h = Vec::with_capacity(response_head.headers().len()); + for (name, value) in response_head.headers().iter() { + let name = name.as_str().to_owned(); + let value = value.as_bytes().to_owned(); + h.push(HeaderPairRecord { name, value }); + } + h + }; + + sqlx::query_unchecked!( + r#" + INSERT INTO idempotency ( + user_id, + idempotency_key, + response_status_code, + response_headers, + response_body, + created_at) + VALUES ($1, $2, $3, $4, $5, now()) + "#, + user_id, + idempotency_key.as_ref(), + status_code, + headers, + body.as_ref() + ) + .execute(pool) + .await?; + + let http_response = response_head.set_body(body).map_into_boxed_body(); + Ok(http_response) +} diff --git a/src/lib.rs b/src/lib.rs index f469562..67bf1d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod authentication; pub mod configuration; pub mod domain; pub mod email_client; +pub mod idempotency; pub mod routes; pub mod session_state; pub mod startup; diff --git a/src/routes/admin/newsletters/get.rs b/src/routes/admin/newsletters/get.rs index 6731b0b..148bfe5 100644 --- a/src/routes/admin/newsletters/get.rs +++ b/src/routes/admin/newsletters/get.rs @@ -11,6 +11,7 @@ pub async fn publish_newsletter_form( writeln!(msg_html, "

{}

", m.content()).unwrap(); } + let idempotency_key = uuid::Uuid::new_v4(); Ok(HttpResponse::Ok() .content_type(ContentType::html()) .body(format!( @@ -49,6 +50,7 @@ pub async fn publish_newsletter_form( >
+

<- Back

diff --git a/src/routes/admin/newsletters/post.rs b/src/routes/admin/newsletters/post.rs index eb766eb..9f35811 100644 --- a/src/routes/admin/newsletters/post.rs +++ b/src/routes/admin/newsletters/post.rs @@ -1,6 +1,8 @@ use crate::authentication::UserId; use crate::domain::SubscriberEmail; use crate::email_client::EmailClient; +use crate::idempotency::{get_saved_response, save_response, IdempotencyKey}; +use crate::utils::e400; use crate::utils::{e500, see_other}; use actix_web::web::ReqData; use actix_web::{web, HttpResponse}; @@ -13,6 +15,7 @@ pub struct FormData { title: String, text_content: String, html_content: String, + idempotency_key: String, } #[tracing::instrument( @@ -22,21 +25,32 @@ pub struct FormData { )] pub async fn publish_newsletter( form: web::Form, - user_id: ReqData, pool: web::Data, email_client: web::Data, + user_id: ReqData, ) -> Result { + let user_id = user_id.into_inner(); + let FormData { + title, + text_content, + html_content, + idempotency_key, + } = form.0; + let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?; + + if let Some(saved_response) = get_saved_response(&pool, &idempotency_key, *user_id) + .await + .map_err(e500)? + { + return Ok(saved_response); + } + let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?; for subscriber in subscribers { match subscriber { Ok(subscriber) => { email_client - .send_email( - &subscriber.email, - &form.title, - &form.html_content, - &form.text_content, - ) + .send_email(&subscriber.email, &title, &html_content, &text_content) .await .with_context(|| { format!("Failed to send newsletter issue to {}", subscriber.email) @@ -53,7 +67,12 @@ pub async fn publish_newsletter( } } FlashMessage::info("The newsletter issue has been published!").send(); - Ok(see_other("/admin/newsletters")) + let response = see_other("/admin/newsletters"); + let response = save_response(&pool, &idempotency_key, *user_id, response) + .await + .map_err(e500)?; + + Ok(response) } struct ConfirmedSubscriber { diff --git a/src/utils.rs b/src/utils.rs index 08c76ab..819c97e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,6 @@ use actix_web::http::header::LOCATION; use actix_web::HttpResponse; -// Return an opaque 500 while preserving the error root's cause for logging. pub fn e500(e: T) -> actix_web::Error where T: std::fmt::Debug + std::fmt::Display + 'static, @@ -9,6 +8,13 @@ where actix_web::error::ErrorInternalServerError(e) } +pub fn e400(e: T) -> actix_web::Error +where + T: std::fmt::Debug + std::fmt::Display + 'static, +{ + actix_web::error::ErrorBadRequest(e) +} + pub fn see_other(location: &str) -> HttpResponse { HttpResponse::SeeOther() .insert_header((LOCATION, location)) diff --git a/tests/api/newsletter.rs b/tests/api/newsletter.rs index 4a134d3..b6faae8 100644 --- a/tests/api/newsletter.rs +++ b/tests/api/newsletter.rs @@ -54,6 +54,7 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; assert_is_redirect_to(&response, "/admin/newsletters"); @@ -83,6 +84,7 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; assert_is_redirect_to(&response, "/admin/newsletters"); @@ -115,6 +117,7 @@ async fn you_must_be_logged_in_to_publish_a_newsletter() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", + "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; @@ -142,8 +145,6 @@ async fn newsletter_creation_is_idempotent() { "title": "Newsletter title", "text_content": "Newsletter body as plain text", "html_content": "

Newsletter body as HTML

", - // We expect the idempotency key as part of the - // form data, not as an header "idempotency_key": uuid::Uuid::new_v4().to_string() }); let response = app.post_publish_newsletter(&newsletter_request_body).await; @@ -159,6 +160,7 @@ async fn newsletter_creation_is_idempotent() { // Act - Part 4 - Follow the redirect let html_page = app.get_publish_newsletter_html().await; + dbg!(&html_page); assert!(html_page.contains("

The newsletter issue has been published!

")); // Mock verifies on Drop that we have sent the newsletter email **once**