Skip to content

Commit

Permalink
feat: Add idempotency module and table
Browse files Browse the repository at this point in the history
  • Loading branch information
josemoura212 committed Jul 9, 2024
1 parent be13f41 commit df40619
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 11 deletions.
15 changes: 15 additions & 0 deletions migrations/20240709145933_create_idempotency_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Add migration script here
CREATE TYPE header_pair AS (
name TEXT,
value BYTEA
);

CREATE TABLE idempotency (
user_id uuid NOT NULL REFERENCES users(user_id),
idempotency_key TEXT NOT NULL,
response_status_code SMALLINT NOT NULL,
response_headers header_pair[] NOT NULL,
response_body BYTEA NOT NULL,
created_at timestamptz NOT NULL,
PRIMARY KEY(user_id, idempotency_key)
);
32 changes: 32 additions & 0 deletions src/idempotency/key.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[derive(Debug)]
pub struct IdempotencyKey(String);

impl TryFrom<String> for IdempotencyKey {
type Error = anyhow::Error;

fn try_from(s: String) -> Result<Self, Self::Error> {
if s.is_empty() {
anyhow::bail!("The idempotency key cannot be empty");
}
let max_length = 50;
if s.len() >= max_length {
anyhow::bail!(
"The idempotency key must be shorter
than {max_length} characters"
);
}
Ok(Self(s))
}
}

impl From<IdempotencyKey> for String {
fn from(k: IdempotencyKey) -> Self {
k.0
}
}

impl AsRef<str> for IdempotencyKey {
fn as_ref(&self) -> &str {
&self.0
}
}
5 changes: 5 additions & 0 deletions src/idempotency/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod key;
mod persistence;

pub use key::IdempotencyKey;
pub use persistence::{get_saved_response, save_response};
97 changes: 97 additions & 0 deletions src/idempotency/persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use super::IdempotencyKey;
use actix_web::body::to_bytes;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use sqlx::postgres::PgHasArrayType;
use sqlx::PgPool;
use uuid::Uuid;

#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
name: String,
value: Vec<u8>,
}

pub async fn get_saved_response(
pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<HttpResponse>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code,
response_headers as "response_headers: Vec<HeaderPairRecord>",
response_body
FROM idempotency
WHERE
user_id = $1 AND
idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref()
)
.fetch_optional(pool)
.await?;

if let Some(r) = saved_response {
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
let mut response = HttpResponse::build(status_code);
for HeaderPairRecord { name, value } in r.response_headers {
response.append_header((name, value));
}
Ok(Some(response.body(r.response_body)))
} else {
Ok(None)
}
}

impl PgHasArrayType for HeaderPairRecord {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
sqlx::postgres::PgTypeInfo::with_name("_header_pair")
}
}

pub async fn save_response(
pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
http_response: HttpResponse,
) -> Result<HttpResponse, anyhow::Error> {
let (response_head, body) = http_response.into_parts();
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
let status_code = response_head.status().as_u16() as i16;
let headers = {
let mut h = Vec::with_capacity(response_head.headers().len());
for (name, value) in response_head.headers().iter() {
let name = name.as_str().to_owned();
let value = value.as_bytes().to_owned();
h.push(HeaderPairRecord { name, value });
}
h
};

sqlx::query_unchecked!(
r#"
INSERT INTO idempotency (
user_id,
idempotency_key,
response_status_code,
response_headers,
response_body,
created_at)
VALUES ($1, $2, $3, $4, $5, now())
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
body.as_ref()
)
.execute(pool)
.await?;

let http_response = response_head.set_body(body).map_into_boxed_body();
Ok(http_response)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod authentication;
pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod routes;
pub mod session_state;
pub mod startup;
Expand Down
2 changes: 2 additions & 0 deletions src/routes/admin/newsletters/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub async fn publish_newsletter_form(
writeln!(msg_html, "<p><i>{}</i></p>", m.content()).unwrap();
}

let idempotency_key = uuid::Uuid::new_v4();
Ok(HttpResponse::Ok()
.content_type(ContentType::html())
.body(format!(
Expand Down Expand Up @@ -49,6 +50,7 @@ pub async fn publish_newsletter_form(
></textarea>
</label>
<br>
<input hidden type="text" name="idempotency_key" value="{idempotency_key}">
<button type="submit">Publish</button>
</form>
<p><a href="/admin/dashboard">&lt;- Back</a></p>
Expand Down
35 changes: 27 additions & 8 deletions src/routes/admin/newsletters/post.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::authentication::UserId;
use crate::domain::SubscriberEmail;
use crate::email_client::EmailClient;
use crate::idempotency::{get_saved_response, save_response, IdempotencyKey};
use crate::utils::e400;
use crate::utils::{e500, see_other};
use actix_web::web::ReqData;
use actix_web::{web, HttpResponse};
Expand All @@ -13,6 +15,7 @@ pub struct FormData {
title: String,
text_content: String,
html_content: String,
idempotency_key: String,
}

#[tracing::instrument(
Expand All @@ -22,21 +25,32 @@ pub struct FormData {
)]
pub async fn publish_newsletter(
form: web::Form<FormData>,
user_id: ReqData<UserId>,
pool: web::Data<PgPool>,
email_client: web::Data<EmailClient>,
user_id: ReqData<UserId>,
) -> Result<HttpResponse, actix_web::Error> {
let user_id = user_id.into_inner();
let FormData {
title,
text_content,
html_content,
idempotency_key,
} = form.0;
let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;

if let Some(saved_response) = get_saved_response(&pool, &idempotency_key, *user_id)
.await
.map_err(e500)?
{
return Ok(saved_response);
}

let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
for subscriber in subscribers {
match subscriber {
Ok(subscriber) => {
email_client
.send_email(
&subscriber.email,
&form.title,
&form.html_content,
&form.text_content,
)
.send_email(&subscriber.email, &title, &html_content, &text_content)
.await
.with_context(|| {
format!("Failed to send newsletter issue to {}", subscriber.email)
Expand All @@ -53,7 +67,12 @@ pub async fn publish_newsletter(
}
}
FlashMessage::info("The newsletter issue has been published!").send();
Ok(see_other("/admin/newsletters"))
let response = see_other("/admin/newsletters");
let response = save_response(&pool, &idempotency_key, *user_id, response)
.await
.map_err(e500)?;

Ok(response)
}

struct ConfirmedSubscriber {
Expand Down
8 changes: 7 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use actix_web::http::header::LOCATION;
use actix_web::HttpResponse;

// Return an opaque 500 while preserving the error root's cause for logging.
pub fn e500<T>(e: T) -> actix_web::Error
where
T: std::fmt::Debug + std::fmt::Display + 'static,
{
actix_web::error::ErrorInternalServerError(e)
}

pub fn e400<T: std::fmt::Debug + std::fmt::Display>(e: T) -> actix_web::Error
where
T: std::fmt::Debug + std::fmt::Display + 'static,
{
actix_web::error::ErrorBadRequest(e)
}

pub fn see_other(location: &str) -> HttpResponse {
HttpResponse::SeeOther()
.insert_header((LOCATION, location))
Expand Down
6 changes: 4 additions & 2 deletions tests/api/newsletter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
Expand Down Expand Up @@ -83,6 +84,7 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
Expand Down Expand Up @@ -115,6 +117,7 @@ async fn you_must_be_logged_in_to_publish_a_newsletter() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;

Expand Down Expand Up @@ -142,8 +145,6 @@ async fn newsletter_creation_is_idempotent() {
"title": "Newsletter title",
"text_content": "Newsletter body as plain text",
"html_content": "<p>Newsletter body as HTML</p>",
// We expect the idempotency key as part of the
// form data, not as an header
"idempotency_key": uuid::Uuid::new_v4().to_string()
});
let response = app.post_publish_newsletter(&newsletter_request_body).await;
Expand All @@ -159,6 +160,7 @@ async fn newsletter_creation_is_idempotent() {

// Act - Part 4 - Follow the redirect
let html_page = app.get_publish_newsletter_html().await;
dbg!(&html_page);
assert!(html_page.contains("<p><i>The newsletter issue has been published!</i></p>"));

// Mock verifies on Drop that we have sent the newsletter email **once**
Expand Down

0 comments on commit df40619

Please sign in to comment.