diff --git a/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json b/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json new file mode 100644 index 0000000..c63606e --- /dev/null +++ b/.sqlx/query-06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT newsletter_issue_id, subscriber_email\n FROM issue_delivery_queue\n FOR UPDATE\n SKIP LOCKED\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "newsletter_issue_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "subscriber_email", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a" +} diff --git a/.sqlx/query-0851bf5e8d147f0ace037c6f434bcc4e04d330e3c4259ef8c8097e61f77b64e2.json b/.sqlx/query-0851bf5e8d147f0ace037c6f434bcc4e04d330e3c4259ef8c8097e61f77b64e2.json new file mode 100644 index 0000000..7136bb0 --- /dev/null +++ b/.sqlx/query-0851bf5e8d147f0ace037c6f434bcc4e04d330e3c4259ef8c8097e61f77b64e2.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE idempotency\n SET\n response_status_code = $3,\n response_headers = $4,\n response_body = $5\n WHERE\n user_id = $1\n AND idempotency_key = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Int2", + { + "Custom": { + "name": "header_pair[]", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + }, + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "0851bf5e8d147f0ace037c6f434bcc4e04d330e3c4259ef8c8097e61f77b64e2" +} diff --git a/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json b/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json new file mode 100644 index 0000000..193d3c4 --- /dev/null +++ b/.sqlx/query-43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46.json @@ -0,0 +1,34 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT title, text_content, html_content\n FROM newsletter_issues\n WHERE newsletter_issue_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "title", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "text_content", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "html_content", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46" +} diff --git a/.sqlx/query-605c5893a2a89a84c201a6a2ae52a3c00cb4db064a52ea9f198c24de4b877ba2.json b/.sqlx/query-605c5893a2a89a84c201a6a2ae52a3c00cb4db064a52ea9f198c24de4b877ba2.json new file mode 100644 index 0000000..586c0c7 --- /dev/null +++ b/.sqlx/query-605c5893a2a89a84c201a6a2ae52a3c00cb4db064a52ea9f198c24de4b877ba2.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO newsletter_issues (\n newsletter_issue_id, title, text_content, html_content, published_at\n )\n VALUES ($1, $2, $3, $4, now())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "605c5893a2a89a84c201a6a2ae52a3c00cb4db064a52ea9f198c24de4b877ba2" +} diff --git a/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json b/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json new file mode 100644 index 0000000..93e684b --- /dev/null +++ b/.sqlx/query-9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO issue_delivery_queue (\n newsletter_issue_id,\n subscriber_email\n )\n SELECT $1, email\n FROM subscriptions\n WHERE status = 'confirmed'\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d" +} diff --git a/.sqlx/query-acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58.json b/.sqlx/query-acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58.json new file mode 100644 index 0000000..1ac5932 --- /dev/null +++ b/.sqlx/query-acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT user_id, password_hash\n FROM users\n WHERE username = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "password_hash", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58" +} diff --git a/.sqlx/query-b399033752641396cfe752e930e073765335a6c6e84935f60f4918576b47c249.json b/.sqlx/query-b399033752641396cfe752e930e073765335a6c6e84935f60f4918576b47c249.json new file mode 100644 index 0000000..e58daf4 --- /dev/null +++ b/.sqlx/query-b399033752641396cfe752e930e073765335a6c6e84935f60f4918576b47c249.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM issue_delivery_queue\n WHERE\n newsletter_issue_id = $1\n AND subscriber_email = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "b399033752641396cfe752e930e073765335a6c6e84935f60f4918576b47c249" +} diff --git a/.sqlx/query-eae27786a7c81ee2199fe3d5c10ac52c8067c61d6992f8f5045b908eb73bab8b.json b/.sqlx/query-eae27786a7c81ee2199fe3d5c10ac52c8067c61d6992f8f5045b908eb73bab8b.json new file mode 100644 index 0000000..d69d932 --- /dev/null +++ b/.sqlx/query-eae27786a7c81ee2199fe3d5c10ac52c8067c61d6992f8f5045b908eb73bab8b.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE users SET password_hash = $1 WHERE user_id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "eae27786a7c81ee2199fe3d5c10ac52c8067c61d6992f8f5045b908eb73bab8b" +} diff --git a/.sqlx/query-ed9f14ed1476ef5a9dc8b7aabf38fd31e127e2a6246d5a14f4ef624f0302eac8.json b/.sqlx/query-ed9f14ed1476ef5a9dc8b7aabf38fd31e127e2a6246d5a14f4ef624f0302eac8.json new file mode 100644 index 0000000..97b87b0 --- /dev/null +++ b/.sqlx/query-ed9f14ed1476ef5a9dc8b7aabf38fd31e127e2a6246d5a14f4ef624f0302eac8.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n response_status_code as \"response_status_code!\",\n response_headers as \"response_headers!: Vec\",\n response_body as \"response_body!\"\n FROM idempotency\n WHERE\n user_id = $1\n AND idempotency_key = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "response_status_code!", + "type_info": "Int2" + }, + { + "ordinal": 1, + "name": "response_headers!: Vec", + "type_info": { + "Custom": { + "name": "header_pair[]", + "kind": { + "Array": { + "Custom": { + "name": "header_pair", + "kind": { + "Composite": [ + [ + "name", + "Text" + ], + [ + "value", + "Bytea" + ] + ] + } + } + } + } + } + } + }, + { + "ordinal": 2, + "name": "response_body!", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "ed9f14ed1476ef5a9dc8b7aabf38fd31e127e2a6246d5a14f4ef624f0302eac8" +} diff --git a/.sqlx/query-f007c2d5d9ae67a2412c6a70a2228390c5bd4835fcf71fd17a00fe521b43415d.json b/.sqlx/query-f007c2d5d9ae67a2412c6a70a2228390c5bd4835fcf71fd17a00fe521b43415d.json new file mode 100644 index 0000000..ced2af2 --- /dev/null +++ b/.sqlx/query-f007c2d5d9ae67a2412c6a70a2228390c5bd4835fcf71fd17a00fe521b43415d.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO idempotency (user_id, idempotency_key, created_at)\n VALUES ($1, $2, now())\n ON CONFLICT DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [] + }, + "hash": "f007c2d5d9ae67a2412c6a70a2228390c5bd4835fcf71fd17a00fe521b43415d" +} diff --git a/Cargo.lock b/Cargo.lock index f9ec8cd..d0eebd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3818,6 +3818,7 @@ dependencies = [ "serde", "serde-aux", "serde_json", + "serde_urlencoded", "sha3", "sqlx", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 5cded43..ed740e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,4 +60,5 @@ once_cell = "1.21.3" quickcheck = "1.0.3" quickcheck_macros = "1.1.0" serde_json = "1.0.143" +serde_urlencoded = "0.7.1" wiremock = "0.6.4" diff --git a/configuration/local.yaml b/configuration/local.yaml index 9ba41cb..df4bde1 100644 --- a/configuration/local.yaml +++ b/configuration/local.yaml @@ -3,7 +3,3 @@ application: base_url: "http://127.0.0.1:8000" database: require_ssl: false -email_client: - base_url: "https://api.mailersend.com" - sender_email: "MS_PTrumQ@test-r6ke4n1mmzvgon12.mlsender.net" - authorization_token: "mlsn.9ea7aaeaa328b4d2eac74dc823deecf25e6bae1933bfe5c3d0304b1b3f2bc36c" diff --git a/migrations/20250901135528_create_idempotency_table.sql b/migrations/20250901135528_create_idempotency_table.sql new file mode 100644 index 0000000..169a2e1 --- /dev/null +++ b/migrations/20250901135528_create_idempotency_table.sql @@ -0,0 +1,14 @@ +CREATE TYPE header_pair AS ( + name TEXT, + value BYTEA +); + +CREATE TABLE idempotency ( + user_id UUID NOT NULL REFERENCES users (user_id), + idempotency_key TEXT NOT NULL, + response_status_code SMALLINT NOT NULL, + response_headers header_pair[] NOT NULL, + response_body BYTEA NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (user_id, idempotency_key) +); diff --git a/migrations/20250903130815_relax_null_checks_on_idempotency.sql b/migrations/20250903130815_relax_null_checks_on_idempotency.sql new file mode 100644 index 0000000..2d6958d --- /dev/null +++ b/migrations/20250903130815_relax_null_checks_on_idempotency.sql @@ -0,0 +1,3 @@ +ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL; +ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL; diff --git a/migrations/20250903151350_create_newsletter_issues_table.sql b/migrations/20250903151350_create_newsletter_issues_table.sql new file mode 100644 index 0000000..ef58a62 --- /dev/null +++ b/migrations/20250903151350_create_newsletter_issues_table.sql @@ -0,0 +1,8 @@ +CREATE TABLE newsletter_issues ( + newsletter_issue_id UUID NOT NULL, + title TEXT NOT NULL, + text_content TEXT NOT NULL, + html_content TEXT NOT NULL, + published_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (newsletter_issue_id) +); diff --git a/migrations/20250903153225_create_issue_delivery_queue_table.sql b/migrations/20250903153225_create_issue_delivery_queue_table.sql new file mode 100644 index 0000000..dc9ee86 --- /dev/null +++ b/migrations/20250903153225_create_issue_delivery_queue_table.sql @@ -0,0 +1,6 @@ +CREATE TABLE issue_delivery_queue ( + newsletter_issue_id UUID NOT NULL + REFERENCES newsletter_issues (newsletter_issue_id), + subscriber_email TEXT NOT NULL, + PRIMARY KEY (newsletter_issue_id, subscriber_email) +); diff --git a/src/authentication.rs b/src/authentication.rs index bb60701..d1582ca 100644 --- a/src/authentication.rs +++ b/src/authentication.rs @@ -121,7 +121,7 @@ async fn get_stored_credentials( SELECT user_id, password_hash FROM users WHERE username = $1 - "#, + "#, username, ) .fetch_optional(connection_pool) diff --git a/src/configuration.rs b/src/configuration.rs index 34e0bb7..4912ebb 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -55,7 +55,7 @@ impl TryFrom for Environment { } } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct Settings { pub application: ApplicationSettings, pub database: DatabaseSettings, @@ -63,7 +63,7 @@ pub struct Settings { pub redis_uri: SecretString, } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct ApplicationSettings { #[serde(deserialize_with = "deserialize_number_from_string")] pub port: u16, @@ -71,7 +71,7 @@ pub struct ApplicationSettings { pub base_url: String, } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct EmailClientSettings { pub base_url: String, sender_email: String, @@ -100,7 +100,7 @@ impl EmailClientSettings { } } -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] pub struct DatabaseSettings { pub username: String, pub password: SecretString, diff --git a/src/email_client.rs b/src/email_client.rs index a4f181f..96dc095 100644 --- a/src/email_client.rs +++ b/src/email_client.rs @@ -11,16 +11,17 @@ pub struct EmailClient { } impl EmailClient { - pub fn new(config: EmailClientSettings) -> Self { - Self { + pub fn build(config: EmailClientSettings) -> Result { + let client = Self { http_client: Client::builder() .timeout(Duration::from_millis(config.timeout_milliseconds)) .build() .unwrap(), - base_url: reqwest::Url::parse(&config.base_url).unwrap(), - sender: config.sender().unwrap(), + base_url: reqwest::Url::parse(&config.base_url)?, + sender: config.sender().map_err(|e| anyhow::anyhow!(e))?, authorization_token: config.authorization_token, - } + }; + Ok(client) } pub async fn send_email( @@ -125,7 +126,7 @@ mod tests { let sender_email = SafeEmail().fake(); let token: String = Faker.fake(); let settings = EmailClientSettings::new(base_url, sender_email, token, 200); - EmailClient::new(settings) + EmailClient::build(settings).unwrap() } #[tokio::test] diff --git a/src/idempotency.rs b/src/idempotency.rs new file mode 100644 index 0000000..5c7ac84 --- /dev/null +++ b/src/idempotency.rs @@ -0,0 +1,5 @@ +mod key; +mod persistance; + +pub use key::*; +pub use persistance::*; diff --git a/src/idempotency/key.rs b/src/idempotency/key.rs new file mode 100644 index 0000000..4c89397 --- /dev/null +++ b/src/idempotency/key.rs @@ -0,0 +1,28 @@ +pub struct IdempotencyKey(String); + +impl TryFrom for IdempotencyKey { + type Error = String; + + fn try_from(value: String) -> Result { + if value.is_empty() { + return Err("The idempotency key cannot be empty.".into()); + } + let max_length = 50; + if value.len() >= max_length { + return Err("The idempotency key must be shorter than {max_length} characters.".into()); + } + Ok(Self(value)) + } +} + +impl From for String { + fn from(value: IdempotencyKey) -> Self { + value.0 + } +} + +impl AsRef for IdempotencyKey { + fn as_ref(&self) -> &str { + &self.0 + } +} diff --git a/src/idempotency/persistance.rs b/src/idempotency/persistance.rs new file mode 100644 index 0000000..c9701bf --- /dev/null +++ b/src/idempotency/persistance.rs @@ -0,0 +1,128 @@ +use crate::idempotency::IdempotencyKey; +use axum::{ + body::{self, Body}, + http::{HeaderName, HeaderValue}, + response::{IntoResponse, Response}, +}; +use reqwest::StatusCode; +use sqlx::{Executor, PgPool, Postgres, Transaction}; +use std::str::FromStr; +use uuid::Uuid; + +#[derive(Debug, sqlx::Type)] +#[sqlx(type_name = "header_pair")] +struct HeaderPairRecord { + name: String, + value: Vec, +} + +pub async fn get_saved_response( + connection_pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result, anyhow::Error> { + let saved_response = sqlx::query!( + r#" + SELECT + response_status_code as "response_status_code!", + response_headers as "response_headers!: Vec", + response_body as "response_body!" + FROM idempotency + WHERE + user_id = $1 + AND idempotency_key = $2 + "#, + user_id, + idempotency_key.as_ref() + ) + .fetch_optional(connection_pool) + .await?; + if let Some(r) = saved_response { + let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?; + let mut response = status_code.into_response(); + for HeaderPairRecord { name, value } in r.response_headers { + response.headers_mut().insert( + HeaderName::from_str(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + *response.body_mut() = r.response_body.into(); + Ok(Some(response)) + } else { + Ok(None) + } +} + +pub async fn save_response( + mut transaction: Transaction<'static, Postgres>, + idempotency_key: &IdempotencyKey, + user_id: Uuid, + response: Response, +) -> Result, anyhow::Error> { + let status_code = response.status().as_u16() as i16; + let headers = response + .headers() + .into_iter() + .map(|(name, value)| HeaderPairRecord { + name: name.to_string(), + value: value.as_bytes().to_vec(), + }) + .collect::>(); + let (response_head, body) = response.into_parts(); + let body = body::to_bytes(body, usize::MAX).await?.to_vec(); + + let query = sqlx::query_unchecked!( + r#" + UPDATE idempotency + SET + response_status_code = $3, + response_headers = $4, + response_body = $5 + WHERE + user_id = $1 + AND idempotency_key = $2 + "#, + user_id, + idempotency_key.as_ref(), + status_code, + headers, + &body, + ); + transaction.execute(query).await?; + transaction.commit().await?; + + let mut r = response_head.into_response(); + *r.body_mut() = body.into(); + Ok(r) +} + +pub enum NextAction { + StartProcessing(Transaction<'static, Postgres>), + ReturnSavedResponse(Response), +} + +pub async fn try_processing( + connection_pool: &PgPool, + idempotency_key: &IdempotencyKey, + user_id: Uuid, +) -> Result { + let mut transaction = connection_pool.begin().await?; + let query = sqlx::query!( + r#" + INSERT INTO idempotency (user_id, idempotency_key, created_at) + VALUES ($1, $2, now()) + ON CONFLICT DO NOTHING + "#, + user_id, + idempotency_key.as_ref() + ); + let n_inserted_rows = transaction.execute(query).await?.rows_affected(); + if n_inserted_rows > 0 { + Ok(NextAction::StartProcessing(transaction)) + } else { + let saved_response = get_saved_response(connection_pool, idempotency_key, user_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Could not find saved response."))?; + Ok(NextAction::ReturnSavedResponse(saved_response)) + } +} diff --git a/src/issue_delivery_worker.rs b/src/issue_delivery_worker.rs new file mode 100644 index 0000000..540c4ee --- /dev/null +++ b/src/issue_delivery_worker.rs @@ -0,0 +1,151 @@ +use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient}; +use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions}; +use std::time::Duration; +use tracing::{Span, field::display}; +use uuid::Uuid; + +pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { + let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db()); + let email_client = EmailClient::build(configuration.email_client).unwrap(); + worker_loop(connection_pool, email_client).await +} + +async fn worker_loop( + connection_pool: PgPool, + email_client: EmailClient, +) -> Result<(), anyhow::Error> { + loop { + match try_execute_task(&connection_pool, &email_client).await { + Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await, + Ok(ExecutionOutcome::TaskCompleted) => (), + Err(_) => tokio::time::sleep(Duration::from_secs(1)).await, + } + } +} + +pub enum ExecutionOutcome { + TaskCompleted, + EmptyQueue, +} + +#[tracing::instrument( + skip_all, + fields( + newsletter_issue_id=tracing::field::Empty, + subscriber_email=tracing::field::Empty + ), + err +)] +pub async fn try_execute_task( + connection_pool: &PgPool, + email_client: &EmailClient, +) -> Result { + let task = dequeue_task(connection_pool).await?; + if task.is_none() { + return Ok(ExecutionOutcome::EmptyQueue); + } + let (transaction, issue_id, email) = task.unwrap(); + Span::current() + .record("newsletter_issue_id", display(issue_id)) + .record("subscriber_email", display(&email)); + match SubscriberEmail::parse(email.clone()) { + Ok(email) => { + let issue = get_issue(connection_pool, issue_id).await?; + if let Err(e) = email_client + .send_email( + &email, + &issue.title, + &issue.html_content, + &issue.text_content, + ) + .await + { + tracing::error!( + error.message = %e, + "Failed to deliver issue to confirmed subscriber. Skipping." + ); + } + } + Err(e) => { + tracing::error!( + error.message = %e, + "Skipping a subscriber. Their stored contact details are invalid." + ); + } + } + delete_task(transaction, issue_id, &email).await?; + + Ok(ExecutionOutcome::TaskCompleted) +} + +struct NewsletterIssue { + title: String, + text_content: String, + html_content: String, +} + +#[tracing::instrument(skip_all)] +async fn get_issue( + connection_pool: &PgPool, + issue_id: Uuid, +) -> Result { + let issue = sqlx::query_as!( + NewsletterIssue, + r#" + SELECT title, text_content, html_content + FROM newsletter_issues + WHERE newsletter_issue_id = $1 + "#, + issue_id + ) + .fetch_one(connection_pool) + .await?; + Ok(issue) +} + +#[tracing::instrument(skip_all)] +async fn dequeue_task( + connection_pool: &PgPool, +) -> Result, Uuid, String)>, anyhow::Error> { + let mut transaction = connection_pool.begin().await?; + let query = sqlx::query!( + r#" + SELECT newsletter_issue_id, subscriber_email + FROM issue_delivery_queue + FOR UPDATE + SKIP LOCKED + LIMIT 1 + "# + ); + let r = transaction.fetch_optional(query).await?; + if let Some(row) = r { + Ok(Some(( + transaction, + row.get("newsletter_issue_id"), + row.get("subscriber_email"), + ))) + } else { + Ok(None) + } +} + +#[tracing::instrument(skip_all)] +async fn delete_task( + mut transaction: Transaction<'static, Postgres>, + issue_id: Uuid, + email: &str, +) -> Result<(), anyhow::Error> { + let query = sqlx::query!( + r#" + DELETE FROM issue_delivery_queue + WHERE + newsletter_issue_id = $1 + AND subscriber_email = $2 + "#, + issue_id, + email + ); + transaction.execute(query).await?; + transaction.commit().await?; + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index f0ffaa1..ef0b064 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ pub mod authentication; pub mod configuration; pub mod domain; pub mod email_client; +pub mod idempotency; +pub mod issue_delivery_worker; pub mod routes; pub mod session_state; pub mod startup; diff --git a/src/main.rs b/src/main.rs index 0bb26e2..89e0fa4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use zero2prod::{ - configuration::get_configuration, startup::Application, telemetry::init_subscriber, + configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped, + startup::Application, telemetry::init_subscriber, }; #[tokio::main] @@ -7,7 +8,15 @@ async fn main() -> Result<(), std::io::Error> { init_subscriber(std::io::stdout); let configuration = get_configuration().expect("Failed to read configuration"); - let application = Application::build(configuration).await?; - application.run_until_stopped().await?; + let application = Application::build(configuration.clone()).await?; + + let application_task = tokio::spawn(application.run_until_stopped()); + let worker_task = tokio::spawn(run_worker_until_stopped(configuration)); + + tokio::select! { + _ = application_task => {}, + _ = worker_task => {}, + }; + Ok(()) } diff --git a/src/routes/admin.rs b/src/routes/admin.rs index 45afd10..da9ad45 100644 --- a/src/routes/admin.rs +++ b/src/routes/admin.rs @@ -22,7 +22,9 @@ pub enum AdminError { #[error("Updating password failed.")] ChangePassword, #[error("Could not publish newsletter.")] - Publish, + Publish(#[source] anyhow::Error), + #[error("The idempotency key was invalid.")] + Idempotency(String), } impl std::fmt::Debug for AdminError { @@ -50,7 +52,10 @@ impl IntoResponse for AdminError { .into_response(), AdminError::NotAuthenticated => Redirect::to("/login").into_response(), AdminError::ChangePassword => Redirect::to("/admin/password").into_response(), - AdminError::Publish => Redirect::to("/admin/newsletters").into_response(), + AdminError::Publish(_) => Redirect::to("/admin/newsletters").into_response(), + AdminError::Idempotency(e) => { + (StatusCode::BAD_REQUEST, Json(ErrorResponse { message: e })).into_response() + } } } } diff --git a/src/routes/admin/html/send_newsletter_form.html b/src/routes/admin/html/send_newsletter_form.html index e421421..0371dbf 100644 --- a/src/routes/admin/html/send_newsletter_form.html +++ b/src/routes/admin/html/send_newsletter_form.html @@ -10,6 +10,7 @@ + {} diff --git a/src/routes/admin/newsletters.rs b/src/routes/admin/newsletters.rs index 924d0a5..898361c 100644 --- a/src/routes/admin/newsletters.rs +++ b/src/routes/admin/newsletters.rs @@ -1,75 +1,136 @@ -use crate::{domain::SubscriberEmail, routes::AdminError, startup::AppState}; +use crate::{ + authentication::AuthenticatedUser, + idempotency::{IdempotencyKey, save_response, try_processing}, + routes::AdminError, + startup::AppState, +}; use anyhow::Context; use axum::{ - Form, + Extension, Form, extract::State, response::{Html, IntoResponse, Redirect, Response}, }; use axum_messages::Messages; -use sqlx::PgPool; +use sqlx::{Executor, Postgres, Transaction}; use std::fmt::Write; +use uuid::Uuid; #[derive(serde::Deserialize)] pub struct BodyData { title: String, html: String, text: String, + idempotency_key: String, } -pub async fn publish_form(messages: Messages) -> Response { +pub async fn publish_newsletter_form(messages: Messages) -> Response { let mut error_html = String::new(); for message in messages { writeln!(error_html, "

{}

", message).unwrap(); } + let idempotency_key = Uuid::new_v4(); Html(format!( include_str!("html/send_newsletter_form.html"), - error_html + idempotency_key, error_html )) .into_response() } +#[tracing::instrument(skip_all)] +pub async fn insert_newsletter_issue( + transaction: &mut Transaction<'static, Postgres>, + title: &str, + text_content: &str, + html_content: &str, +) -> Result { + let newsletter_issue_id = Uuid::new_v4(); + let query = sqlx::query!( + r#" + INSERT INTO newsletter_issues ( + newsletter_issue_id, title, text_content, html_content, published_at + ) + VALUES ($1, $2, $3, $4, now()) + "#, + newsletter_issue_id, + title, + text_content, + html_content + ); + transaction.execute(query).await?; + Ok(newsletter_issue_id) +} + +#[tracing::instrument(skip_all)] +async fn enqueue_delivery_tasks( + transaction: &mut Transaction<'static, Postgres>, + newsletter_issue_id: Uuid, +) -> Result<(), sqlx::Error> { + let query = sqlx::query!( + r#" + INSERT INTO issue_delivery_queue ( + newsletter_issue_id, + subscriber_email + ) + SELECT $1, email + FROM subscriptions + WHERE status = 'confirmed' + "#, + newsletter_issue_id, + ); + transaction.execute(query).await?; + Ok(()) +} + #[tracing::instrument( name = "Publishing a newsletter", - skip(connection_pool, email_client, form) + skip(connection_pool, form, messages) )] -pub async fn publish( +pub async fn publish_newsletter( State(AppState { - connection_pool, - email_client, - .. + connection_pool, .. }): State, + Extension(AuthenticatedUser { user_id, .. }): Extension, messages: Messages, Form(form): Form, ) -> Result { if let Err(e) = validate_form(&form) { messages.error(e); - return Err(AdminError::Publish); + return Err(AdminError::Publish(anyhow::anyhow!(e))); } - let subscribers = get_confirmed_subscribers(&connection_pool).await?; - for subscriber in subscribers { - match subscriber { - Ok(ConfirmedSubscriber { name, email }) => { - let title = format!("{}, we have news for you! {}", name, form.title); - email_client - .send_email(&email, &title, &form.html, &form.text) - .await - .with_context(|| { - format!("Failed to send newsletter issue to {}", email.as_ref()) - })?; - } - Err(e) => { - tracing::warn!( - "Skipping a confirmed subscriber. Their stored contact details are invalid: {}", - e - ) - } + + let idempotency_key: IdempotencyKey = form + .idempotency_key + .try_into() + .map_err(AdminError::Idempotency)?; + + let success_message = || { + messages.success(format!( + "The newsletter issue '{}' has been published!", + form.title + )) + }; + + let mut transaction = match try_processing(&connection_pool, &idempotency_key, user_id).await? { + crate::idempotency::NextAction::StartProcessing(t) => t, + crate::idempotency::NextAction::ReturnSavedResponse(response) => { + success_message(); + return Ok(response); } - } - messages.success(format!( - "The newsletter issue '{}' has been published!", - form.title, - )); - Ok(Redirect::to("/admin/newsletters").into_response()) + }; + + let issue_id = insert_newsletter_issue(&mut transaction, &form.title, &form.text, &form.html) + .await + .context("Failed to store newsletter issue details")?; + + enqueue_delivery_tasks(&mut transaction, issue_id) + .await + .context("Failed to enqueue delivery tasks")?; + + let response = Redirect::to("/admin/newsletters").into_response(); + success_message(); + save_response(transaction, &idempotency_key, user_id, response) + .await + .map_err(AdminError::UnexpectedError) } fn validate_form(form: &BodyData) -> Result<(), &'static str> { @@ -82,27 +143,27 @@ fn validate_form(form: &BodyData) -> Result<(), &'static str> { Ok(()) } -struct ConfirmedSubscriber { - name: String, - email: SubscriberEmail, -} +// struct ConfirmedSubscriber { +// name: String, +// email: SubscriberEmail, +// } -#[tracing::instrument(name = "Get confirmed subscribers", skip(connection_pool))] -async fn get_confirmed_subscribers( - connection_pool: &PgPool, -) -> Result>, anyhow::Error> { - let rows = sqlx::query!("SELECT name, email FROM subscriptions WHERE status = 'confirmed'") - .fetch_all(connection_pool) - .await?; - let confirmed_subscribers = rows - .into_iter() - .map(|r| match SubscriberEmail::parse(r.email) { - Ok(email) => Ok(ConfirmedSubscriber { - name: r.name, - email, - }), - Err(e) => Err(anyhow::anyhow!(e)), - }) - .collect(); - Ok(confirmed_subscribers) -} +// #[tracing::instrument(name = "Get confirmed subscribers", skip(connection_pool))] +// async fn get_confirmed_subscribers( +// connection_pool: &PgPool, +// ) -> Result>, anyhow::Error> { +// let rows = sqlx::query!("SELECT name, email FROM subscriptions WHERE status = 'confirmed'") +// .fetch_all(connection_pool) +// .await?; +// let confirmed_subscribers = rows +// .into_iter() +// .map(|r| match SubscriberEmail::parse(r.email) { +// Ok(email) => Ok(ConfirmedSubscriber { +// name: r.name, +// email, +// }), +// Err(e) => Err(anyhow::anyhow!(e)), +// }) +// .collect(); +// Ok(confirmed_subscribers) +// } diff --git a/src/startup.rs b/src/startup.rs index 0b6cbe0..1ff574d 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -42,7 +42,7 @@ impl Application { let listener = TcpListener::bind(address).await?; let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db()); - let email_client = EmailClient::new(configuration.email_client); + let email_client = EmailClient::build(configuration.email_client).unwrap(); let pool = Pool::new( Config::from_url(configuration.redis_uri.expose_secret()) .expect("Failed to parse Redis URL string"), @@ -88,7 +88,10 @@ pub fn app( let admin_routes = Router::new() .route("/dashboard", get(admin_dashboard)) .route("/password", get(change_password_form).post(change_password)) - .route("/newsletters", get(publish_form).post(publish)) + .route( + "/newsletters", + get(publish_newsletter_form).post(publish_newsletter), + ) .route("/logout", post(logout)) .layer(middleware::from_fn(require_auth)); Router::new() diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index 6a37f06..1dffd3d 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -9,6 +9,8 @@ use uuid::Uuid; use wiremock::MockServer; use zero2prod::{ configuration::{DatabaseSettings, get_configuration}, + email_client::EmailClient, + issue_delivery_worker::{ExecutionOutcome, try_execute_task}, startup::Application, telemetry::init_subscriber, }; @@ -70,6 +72,7 @@ pub struct TestApp { pub port: u16, pub test_user: TestUser, pub api_client: reqwest::Client, + pub email_client: EmailClient, } impl TestApp { @@ -85,6 +88,7 @@ impl TestApp { c }; let connection_pool = configure_database(&configuration.database).await; + let email_client = EmailClient::build(configuration.email_client.clone()).unwrap(); let application = Application::build(configuration) .await .expect("Failed to build application"); @@ -110,6 +114,7 @@ impl TestApp { port, test_user, api_client, + email_client, }; tokio::spawn(application.run_until_stopped()); @@ -117,6 +122,18 @@ impl TestApp { app } + pub async fn dispatch_all_pending_emails(&self) { + loop { + if let ExecutionOutcome::EmptyQueue = + try_execute_task(&self.connection_pool, &self.email_client) + .await + .unwrap() + { + break; + } + } + } + pub fn get_confirmation_links(&self, request: &wiremock::Request) -> ConfirmationLinks { let body: serde_json::Value = serde_json::from_slice(&request.body).unwrap(); let get_link = |s: &str| { @@ -218,6 +235,14 @@ impl TestApp { .expect("failed to execute request") } + pub async fn admin_login(&self) { + let login_body = serde_json::json!({ + "username": self.test_user.username, + "password": self.test_user.password + }); + self.post_login(&login_body).await; + } + pub async fn post_logout(&self) -> reqwest::Response { self.api_client .post(format!("{}/admin/logout", self.address)) diff --git a/tests/api/newsletters.rs b/tests/api/newsletters.rs index 3787cfc..89afee7 100644 --- a/tests/api/newsletters.rs +++ b/tests/api/newsletters.rs @@ -1,21 +1,22 @@ use crate::helpers::{ConfirmationLinks, TestApp, assert_is_redirect_to}; +use fake::{ + Fake, + faker::{internet::en::SafeEmail, name::fr_fr::Name}, +}; +use std::time::Duration; +use uuid::Uuid; use wiremock::{ - Mock, ResponseTemplate, - matchers::{any, method, path}, + Mock, MockBuilder, ResponseTemplate, + matchers::{method, path}, }; #[tokio::test] async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { let app = TestApp::spawn().await; create_unconfirmed_subscriber(&app).await; + app.admin_login().await; - let login_body = serde_json::json!({ - "username": app.test_user.username, - "password": app.test_user.password - }); - app.post_login(&login_body).await; - - Mock::given(any()) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(0) .mount(&app.email_server) @@ -27,13 +28,15 @@ async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() { "html": "

Newsletter body as HTML

" }); app.post_newsletters(&newsletter_request_body).await; + + app.dispatch_all_pending_emails().await; } #[tokio::test] async fn requests_without_authentication_are_redirected() { let app = TestApp::spawn().await; - Mock::given(any()) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(0) .mount(&app.email_server) @@ -52,14 +55,9 @@ async fn requests_without_authentication_are_redirected() { async fn newsletters_are_delivered_to_confirmed_subscribers() { let app = TestApp::spawn().await; create_confirmed_subscriber(&app).await; + app.admin_login().await; - let login_body = serde_json::json!({ - "username": app.test_user.username, - "password": app.test_user.password - }); - app.post_login(&login_body).await; - - Mock::given(any()) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -69,7 +67,8 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { let newsletter_request_body = serde_json::json!({ "title": newsletter_title, "text": "Newsletter body as plain text", - "html": "

Newsletter body as HTML

" + "html": "

Newsletter body as HTML

", + "idempotency_key": Uuid::new_v4().to_string(), }); let response = app.post_newsletters(&newsletter_request_body).await; @@ -80,19 +79,16 @@ async fn newsletters_are_delivered_to_confirmed_subscribers() { "The newsletter issue '{}' has been published", newsletter_title ))); + + app.dispatch_all_pending_emails().await; } #[tokio::test] async fn form_shows_error_for_invalid_data() { let app = TestApp::spawn().await; + app.admin_login().await; - let login_body = serde_json::json!({ - "username": app.test_user.username, - "password": app.test_user.password - }); - app.post_login(&login_body).await; - - Mock::given(any()) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(0) .mount(&app.email_server) @@ -101,14 +97,20 @@ async fn form_shows_error_for_invalid_data() { let test_cases = [ ( serde_json::json!({ - "title": "", - "text": "Newsletter body as plain text", - "html": "

Newsletter body as HTML

" - }), + "title": "", + "text": "Newsletter body as plain text", + "html": "

Newsletter body as HTML

", + "idempotency_key": Uuid::new_v4().to_string(), + }), "The title was empty", ), ( - serde_json::json!({ "title": "Newsletter", "text": "", "html": "" }), + serde_json::json!({ + "title": "Newsletter", + "text": "", + "html": "", + "idempotency_key": Uuid::new_v4().to_string(), + }), "The content was empty", ), ]; @@ -124,14 +126,9 @@ async fn form_shows_error_for_invalid_data() { async fn newsletter_creation_is_idempotent() { let app = TestApp::spawn().await; create_confirmed_subscriber(&app).await; + app.admin_login().await; - let login_body = serde_json::json!({ - "username": app.test_user.username, - "password": app.test_user.password - }); - app.post_login(&login_body).await; - - Mock::given(any()) + when_sending_an_email() .respond_with(ResponseTemplate::new(200)) .expect(1) .mount(&app.email_server) @@ -141,7 +138,8 @@ async fn newsletter_creation_is_idempotent() { let newsletter_request_body = serde_json::json!({ "title": newsletter_title, "text": "Newsletter body as plain text", - "html": "

Newsletter body as HTML

" + "html": "

Newsletter body as HTML

", + "idempotency_key": Uuid::new_v4().to_string(), }); let response = app.post_newsletters(&newsletter_request_body).await; @@ -161,10 +159,49 @@ async fn newsletter_creation_is_idempotent() { "The newsletter issue '{}' has been published", newsletter_title ))); + + app.dispatch_all_pending_emails().await; +} + +#[tokio::test] +async fn concurrent_form_submission_is_handled_gracefully() { + let app = TestApp::spawn().await; + create_confirmed_subscriber(&app).await; + app.admin_login().await; + + when_sending_an_email() + .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2))) + .expect(1) + .mount(&app.email_server) + .await; + + let newsletter_request_body = serde_json::json!({ + "title": "Newsletter title", + "text": "Newsletter body as plain text", + "html": "

Newsletter body as HTML

", + "idempotency_key": Uuid::new_v4().to_string(), + }); + let response1 = app.post_newsletters(&newsletter_request_body); + let response2 = app.post_newsletters(&newsletter_request_body); + let (response1, response2) = tokio::join!(response1, response2); + + assert_eq!(response1.status(), response2.status()); + assert_eq!( + response1.text().await.unwrap(), + response2.text().await.unwrap(), + ); + + app.dispatch_all_pending_emails().await; } async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { - let body = "name=Alphonse&email=alphonse.paix%40outlook.com"; + let name: String = Name().fake(); + let email: String = SafeEmail().fake(); + let body = serde_urlencoded::to_string(serde_json::json!({ + "name": name, + "email": email + })) + .unwrap(); let _mock_guard = Mock::given(path("/v1/email")) .and(method("POST")) @@ -173,7 +210,7 @@ async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks { .expect(1) .mount_as_scoped(&app.email_server) .await; - app.post_subscriptions(body.into()) + app.post_subscriptions(body) .await .error_for_status() .unwrap(); @@ -196,3 +233,7 @@ async fn create_confirmed_subscriber(app: &TestApp) { .error_for_status() .unwrap(); } + +fn when_sending_an_email() -> MockBuilder { + Mock::given(path("/v1/email")).and(method("POST")) +}