Fault-tolerant delivery system

This commit is contained in:
Alphonse Paix
2025-09-04 02:54:49 +02:00
parent 9a184b93ac
commit f8dee295cd
32 changed files with 872 additions and 120 deletions

View File

@@ -121,7 +121,7 @@ async fn get_stored_credentials(
SELECT user_id, password_hash
FROM users
WHERE username = $1
"#,
"#,
username,
)
.fetch_optional(connection_pool)

View File

@@ -55,7 +55,7 @@ impl TryFrom<String> for Environment {
}
}
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct Settings {
pub application: ApplicationSettings,
pub database: DatabaseSettings,
@@ -63,7 +63,7 @@ pub struct Settings {
pub redis_uri: SecretString,
}
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct ApplicationSettings {
#[serde(deserialize_with = "deserialize_number_from_string")]
pub port: u16,
@@ -71,7 +71,7 @@ pub struct ApplicationSettings {
pub base_url: String,
}
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct EmailClientSettings {
pub base_url: String,
sender_email: String,
@@ -100,7 +100,7 @@ impl EmailClientSettings {
}
}
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct DatabaseSettings {
pub username: String,
pub password: SecretString,

View File

@@ -11,16 +11,17 @@ pub struct EmailClient {
}
impl EmailClient {
pub fn new(config: EmailClientSettings) -> Self {
Self {
pub fn build(config: EmailClientSettings) -> Result<Self, anyhow::Error> {
let client = Self {
http_client: Client::builder()
.timeout(Duration::from_millis(config.timeout_milliseconds))
.build()
.unwrap(),
base_url: reqwest::Url::parse(&config.base_url).unwrap(),
sender: config.sender().unwrap(),
base_url: reqwest::Url::parse(&config.base_url)?,
sender: config.sender().map_err(|e| anyhow::anyhow!(e))?,
authorization_token: config.authorization_token,
}
};
Ok(client)
}
pub async fn send_email(
@@ -125,7 +126,7 @@ mod tests {
let sender_email = SafeEmail().fake();
let token: String = Faker.fake();
let settings = EmailClientSettings::new(base_url, sender_email, token, 200);
EmailClient::new(settings)
EmailClient::build(settings).unwrap()
}
#[tokio::test]

5
src/idempotency.rs Normal file
View File

@@ -0,0 +1,5 @@
mod key;
mod persistance;
pub use key::*;
pub use persistance::*;

28
src/idempotency/key.rs Normal file
View File

@@ -0,0 +1,28 @@
pub struct IdempotencyKey(String);
impl TryFrom<String> for IdempotencyKey {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
if value.is_empty() {
return Err("The idempotency key cannot be empty.".into());
}
let max_length = 50;
if value.len() >= max_length {
return Err("The idempotency key must be shorter than {max_length} characters.".into());
}
Ok(Self(value))
}
}
impl From<IdempotencyKey> for String {
fn from(value: IdempotencyKey) -> Self {
value.0
}
}
impl AsRef<str> for IdempotencyKey {
fn as_ref(&self) -> &str {
&self.0
}
}

View File

@@ -0,0 +1,128 @@
use crate::idempotency::IdempotencyKey;
use axum::{
body::{self, Body},
http::{HeaderName, HeaderValue},
response::{IntoResponse, Response},
};
use reqwest::StatusCode;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use std::str::FromStr;
use uuid::Uuid;
#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
name: String,
value: Vec<u8>,
}
pub async fn get_saved_response(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<Response>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code as "response_status_code!",
response_headers as "response_headers!: Vec<HeaderPairRecord>",
response_body as "response_body!"
FROM idempotency
WHERE
user_id = $1
AND idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref()
)
.fetch_optional(connection_pool)
.await?;
if let Some(r) = saved_response {
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
let mut response = status_code.into_response();
for HeaderPairRecord { name, value } in r.response_headers {
response.headers_mut().insert(
HeaderName::from_str(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
*response.body_mut() = r.response_body.into();
Ok(Some(response))
} else {
Ok(None)
}
}
pub async fn save_response(
mut transaction: Transaction<'static, Postgres>,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
response: Response<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let status_code = response.status().as_u16() as i16;
let headers = response
.headers()
.into_iter()
.map(|(name, value)| HeaderPairRecord {
name: name.to_string(),
value: value.as_bytes().to_vec(),
})
.collect::<Vec<_>>();
let (response_head, body) = response.into_parts();
let body = body::to_bytes(body, usize::MAX).await?.to_vec();
let query = sqlx::query_unchecked!(
r#"
UPDATE idempotency
SET
response_status_code = $3,
response_headers = $4,
response_body = $5
WHERE
user_id = $1
AND idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
&body,
);
transaction.execute(query).await?;
transaction.commit().await?;
let mut r = response_head.into_response();
*r.body_mut() = body.into();
Ok(r)
}
pub enum NextAction {
StartProcessing(Transaction<'static, Postgres>),
ReturnSavedResponse(Response),
}
pub async fn try_processing(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
INSERT INTO idempotency (user_id, idempotency_key, created_at)
VALUES ($1, $2, now())
ON CONFLICT DO NOTHING
"#,
user_id,
idempotency_key.as_ref()
);
let n_inserted_rows = transaction.execute(query).await?.rows_affected();
if n_inserted_rows > 0 {
Ok(NextAction::StartProcessing(transaction))
} else {
let saved_response = get_saved_response(connection_pool, idempotency_key, user_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find saved response."))?;
Ok(NextAction::ReturnSavedResponse(saved_response))
}
}

View File

@@ -0,0 +1,151 @@
use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient};
use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions};
use std::time::Duration;
use tracing::{Span, field::display};
use uuid::Uuid;
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
let email_client = EmailClient::build(configuration.email_client).unwrap();
worker_loop(connection_pool, email_client).await
}
async fn worker_loop(
connection_pool: PgPool,
email_client: EmailClient,
) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&connection_pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await,
Ok(ExecutionOutcome::TaskCompleted) => (),
Err(_) => tokio::time::sleep(Duration::from_secs(1)).await,
}
}
}
pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}
#[tracing::instrument(
skip_all,
fields(
newsletter_issue_id=tracing::field::Empty,
subscriber_email=tracing::field::Empty
),
err
)]
pub async fn try_execute_task(
connection_pool: &PgPool,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(connection_pool).await?;
if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}
let (transaction, issue_id, email) = task.unwrap();
Span::current()
.record("newsletter_issue_id", display(issue_id))
.record("subscriber_email", display(&email));
match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(connection_pool, issue_id).await?;
if let Err(e) = email_client
.send_email(
&email,
&issue.title,
&issue.html_content,
&issue.text_content,
)
.await
{
tracing::error!(
error.message = %e,
"Failed to deliver issue to confirmed subscriber. Skipping."
);
}
}
Err(e) => {
tracing::error!(
error.message = %e,
"Skipping a subscriber. Their stored contact details are invalid."
);
}
}
delete_task(transaction, issue_id, &email).await?;
Ok(ExecutionOutcome::TaskCompleted)
}
struct NewsletterIssue {
title: String,
text_content: String,
html_content: String,
}
#[tracing::instrument(skip_all)]
async fn get_issue(
connection_pool: &PgPool,
issue_id: Uuid,
) -> Result<NewsletterIssue, anyhow::Error> {
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
SELECT title, text_content, html_content
FROM newsletter_issues
WHERE newsletter_issue_id = $1
"#,
issue_id
)
.fetch_one(connection_pool)
.await?;
Ok(issue)
}
#[tracing::instrument(skip_all)]
async fn dequeue_task(
connection_pool: &PgPool,
) -> Result<Option<(Transaction<'static, Postgres>, Uuid, String)>, anyhow::Error> {
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#
);
let r = transaction.fetch_optional(query).await?;
if let Some(row) = r {
Ok(Some((
transaction,
row.get("newsletter_issue_id"),
row.get("subscriber_email"),
)))
} else {
Ok(None)
}
}
#[tracing::instrument(skip_all)]
async fn delete_task(
mut transaction: Transaction<'static, Postgres>,
issue_id: Uuid,
email: &str,
) -> Result<(), anyhow::Error> {
let query = sqlx::query!(
r#"
DELETE FROM issue_delivery_queue
WHERE
newsletter_issue_id = $1
AND subscriber_email = $2
"#,
issue_id,
email
);
transaction.execute(query).await?;
transaction.commit().await?;
Ok(())
}

View File

@@ -2,6 +2,8 @@ pub mod authentication;
pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod issue_delivery_worker;
pub mod routes;
pub mod session_state;
pub mod startup;

View File

@@ -1,5 +1,6 @@
use zero2prod::{
configuration::get_configuration, startup::Application, telemetry::init_subscriber,
configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped,
startup::Application, telemetry::init_subscriber,
};
#[tokio::main]
@@ -7,7 +8,15 @@ async fn main() -> Result<(), std::io::Error> {
init_subscriber(std::io::stdout);
let configuration = get_configuration().expect("Failed to read configuration");
let application = Application::build(configuration).await?;
application.run_until_stopped().await?;
let application = Application::build(configuration.clone()).await?;
let application_task = tokio::spawn(application.run_until_stopped());
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
tokio::select! {
_ = application_task => {},
_ = worker_task => {},
};
Ok(())
}

View File

@@ -22,7 +22,9 @@ pub enum AdminError {
#[error("Updating password failed.")]
ChangePassword,
#[error("Could not publish newsletter.")]
Publish,
Publish(#[source] anyhow::Error),
#[error("The idempotency key was invalid.")]
Idempotency(String),
}
impl std::fmt::Debug for AdminError {
@@ -50,7 +52,10 @@ impl IntoResponse for AdminError {
.into_response(),
AdminError::NotAuthenticated => Redirect::to("/login").into_response(),
AdminError::ChangePassword => Redirect::to("/admin/password").into_response(),
AdminError::Publish => Redirect::to("/admin/newsletters").into_response(),
AdminError::Publish(_) => Redirect::to("/admin/newsletters").into_response(),
AdminError::Idempotency(e) => {
(StatusCode::BAD_REQUEST, Json(ErrorResponse { message: e })).into_response()
}
}
}
}

View File

@@ -10,6 +10,7 @@
<input type="text" name="title" placeholder="Subject" />
<input type="text" name="html" placeholder="Content (HTML)" />
<input type="text" name="text" placeholder="Content (text)" />
<input hidden type="text" name="idempotency_key" value="{}" />
<button type="submit">Send</button>
</form>
{}

View File

@@ -1,75 +1,136 @@
use crate::{domain::SubscriberEmail, routes::AdminError, startup::AppState};
use crate::{
authentication::AuthenticatedUser,
idempotency::{IdempotencyKey, save_response, try_processing},
routes::AdminError,
startup::AppState,
};
use anyhow::Context;
use axum::{
Form,
Extension, Form,
extract::State,
response::{Html, IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
use sqlx::PgPool;
use sqlx::{Executor, Postgres, Transaction};
use std::fmt::Write;
use uuid::Uuid;
#[derive(serde::Deserialize)]
pub struct BodyData {
title: String,
html: String,
text: String,
idempotency_key: String,
}
pub async fn publish_form(messages: Messages) -> Response {
pub async fn publish_newsletter_form(messages: Messages) -> Response {
let mut error_html = String::new();
for message in messages {
writeln!(error_html, "<p><i>{}</i></p>", message).unwrap();
}
let idempotency_key = Uuid::new_v4();
Html(format!(
include_str!("html/send_newsletter_form.html"),
error_html
idempotency_key, error_html
))
.into_response()
}
#[tracing::instrument(skip_all)]
pub async fn insert_newsletter_issue(
transaction: &mut Transaction<'static, Postgres>,
title: &str,
text_content: &str,
html_content: &str,
) -> Result<Uuid, sqlx::Error> {
let newsletter_issue_id = Uuid::new_v4();
let query = sqlx::query!(
r#"
INSERT INTO newsletter_issues (
newsletter_issue_id, title, text_content, html_content, published_at
)
VALUES ($1, $2, $3, $4, now())
"#,
newsletter_issue_id,
title,
text_content,
html_content
);
transaction.execute(query).await?;
Ok(newsletter_issue_id)
}
#[tracing::instrument(skip_all)]
async fn enqueue_delivery_tasks(
transaction: &mut Transaction<'static, Postgres>,
newsletter_issue_id: Uuid,
) -> Result<(), sqlx::Error> {
let query = sqlx::query!(
r#"
INSERT INTO issue_delivery_queue (
newsletter_issue_id,
subscriber_email
)
SELECT $1, email
FROM subscriptions
WHERE status = 'confirmed'
"#,
newsletter_issue_id,
);
transaction.execute(query).await?;
Ok(())
}
#[tracing::instrument(
name = "Publishing a newsletter",
skip(connection_pool, email_client, form)
skip(connection_pool, form, messages)
)]
pub async fn publish(
pub async fn publish_newsletter(
State(AppState {
connection_pool,
email_client,
..
connection_pool, ..
}): State<AppState>,
Extension(AuthenticatedUser { user_id, .. }): Extension<AuthenticatedUser>,
messages: Messages,
Form(form): Form<BodyData>,
) -> Result<Response, AdminError> {
if let Err(e) = validate_form(&form) {
messages.error(e);
return Err(AdminError::Publish);
return Err(AdminError::Publish(anyhow::anyhow!(e)));
}
let subscribers = get_confirmed_subscribers(&connection_pool).await?;
for subscriber in subscribers {
match subscriber {
Ok(ConfirmedSubscriber { name, email }) => {
let title = format!("{}, we have news for you! {}", name, form.title);
email_client
.send_email(&email, &title, &form.html, &form.text)
.await
.with_context(|| {
format!("Failed to send newsletter issue to {}", email.as_ref())
})?;
}
Err(e) => {
tracing::warn!(
"Skipping a confirmed subscriber. Their stored contact details are invalid: {}",
e
)
}
let idempotency_key: IdempotencyKey = form
.idempotency_key
.try_into()
.map_err(AdminError::Idempotency)?;
let success_message = || {
messages.success(format!(
"The newsletter issue '{}' has been published!",
form.title
))
};
let mut transaction = match try_processing(&connection_pool, &idempotency_key, user_id).await? {
crate::idempotency::NextAction::StartProcessing(t) => t,
crate::idempotency::NextAction::ReturnSavedResponse(response) => {
success_message();
return Ok(response);
}
}
messages.success(format!(
"The newsletter issue '{}' has been published!",
form.title,
));
Ok(Redirect::to("/admin/newsletters").into_response())
};
let issue_id = insert_newsletter_issue(&mut transaction, &form.title, &form.text, &form.html)
.await
.context("Failed to store newsletter issue details")?;
enqueue_delivery_tasks(&mut transaction, issue_id)
.await
.context("Failed to enqueue delivery tasks")?;
let response = Redirect::to("/admin/newsletters").into_response();
success_message();
save_response(transaction, &idempotency_key, user_id, response)
.await
.map_err(AdminError::UnexpectedError)
}
fn validate_form(form: &BodyData) -> Result<(), &'static str> {
@@ -82,27 +143,27 @@ fn validate_form(form: &BodyData) -> Result<(), &'static str> {
Ok(())
}
struct ConfirmedSubscriber {
name: String,
email: SubscriberEmail,
}
// struct ConfirmedSubscriber {
// name: String,
// email: SubscriberEmail,
// }
#[tracing::instrument(name = "Get confirmed subscribers", skip(connection_pool))]
async fn get_confirmed_subscribers(
connection_pool: &PgPool,
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
let rows = sqlx::query!("SELECT name, email FROM subscriptions WHERE status = 'confirmed'")
.fetch_all(connection_pool)
.await?;
let confirmed_subscribers = rows
.into_iter()
.map(|r| match SubscriberEmail::parse(r.email) {
Ok(email) => Ok(ConfirmedSubscriber {
name: r.name,
email,
}),
Err(e) => Err(anyhow::anyhow!(e)),
})
.collect();
Ok(confirmed_subscribers)
}
// #[tracing::instrument(name = "Get confirmed subscribers", skip(connection_pool))]
// async fn get_confirmed_subscribers(
// connection_pool: &PgPool,
// ) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
// let rows = sqlx::query!("SELECT name, email FROM subscriptions WHERE status = 'confirmed'")
// .fetch_all(connection_pool)
// .await?;
// let confirmed_subscribers = rows
// .into_iter()
// .map(|r| match SubscriberEmail::parse(r.email) {
// Ok(email) => Ok(ConfirmedSubscriber {
// name: r.name,
// email,
// }),
// Err(e) => Err(anyhow::anyhow!(e)),
// })
// .collect();
// Ok(confirmed_subscribers)
// }

View File

@@ -42,7 +42,7 @@ impl Application {
let listener = TcpListener::bind(address).await?;
let connection_pool =
PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
let email_client = EmailClient::new(configuration.email_client);
let email_client = EmailClient::build(configuration.email_client).unwrap();
let pool = Pool::new(
Config::from_url(configuration.redis_uri.expose_secret())
.expect("Failed to parse Redis URL string"),
@@ -88,7 +88,10 @@ pub fn app(
let admin_routes = Router::new()
.route("/dashboard", get(admin_dashboard))
.route("/password", get(change_password_form).post(change_password))
.route("/newsletters", get(publish_form).post(publish))
.route(
"/newsletters",
get(publish_newsletter_form).post(publish_newsletter),
)
.route("/logout", post(logout))
.layer(middleware::from_fn(require_auth));
Router::new()