Fault-tolerant delivery system
This commit is contained in:
151
src/issue_delivery_worker.rs
Normal file
151
src/issue_delivery_worker.rs
Normal file
@@ -0,0 +1,151 @@
|
||||
use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient};
|
||||
use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions};
|
||||
use std::time::Duration;
|
||||
use tracing::{Span, field::display};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
|
||||
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
|
||||
let email_client = EmailClient::build(configuration.email_client).unwrap();
|
||||
worker_loop(connection_pool, email_client).await
|
||||
}
|
||||
|
||||
async fn worker_loop(
|
||||
connection_pool: PgPool,
|
||||
email_client: EmailClient,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
loop {
|
||||
match try_execute_task(&connection_pool, &email_client).await {
|
||||
Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await,
|
||||
Ok(ExecutionOutcome::TaskCompleted) => (),
|
||||
Err(_) => tokio::time::sleep(Duration::from_secs(1)).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ExecutionOutcome {
|
||||
TaskCompleted,
|
||||
EmptyQueue,
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
skip_all,
|
||||
fields(
|
||||
newsletter_issue_id=tracing::field::Empty,
|
||||
subscriber_email=tracing::field::Empty
|
||||
),
|
||||
err
|
||||
)]
|
||||
pub async fn try_execute_task(
|
||||
connection_pool: &PgPool,
|
||||
email_client: &EmailClient,
|
||||
) -> Result<ExecutionOutcome, anyhow::Error> {
|
||||
let task = dequeue_task(connection_pool).await?;
|
||||
if task.is_none() {
|
||||
return Ok(ExecutionOutcome::EmptyQueue);
|
||||
}
|
||||
let (transaction, issue_id, email) = task.unwrap();
|
||||
Span::current()
|
||||
.record("newsletter_issue_id", display(issue_id))
|
||||
.record("subscriber_email", display(&email));
|
||||
match SubscriberEmail::parse(email.clone()) {
|
||||
Ok(email) => {
|
||||
let issue = get_issue(connection_pool, issue_id).await?;
|
||||
if let Err(e) = email_client
|
||||
.send_email(
|
||||
&email,
|
||||
&issue.title,
|
||||
&issue.html_content,
|
||||
&issue.text_content,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
error.message = %e,
|
||||
"Failed to deliver issue to confirmed subscriber. Skipping."
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
error.message = %e,
|
||||
"Skipping a subscriber. Their stored contact details are invalid."
|
||||
);
|
||||
}
|
||||
}
|
||||
delete_task(transaction, issue_id, &email).await?;
|
||||
|
||||
Ok(ExecutionOutcome::TaskCompleted)
|
||||
}
|
||||
|
||||
struct NewsletterIssue {
|
||||
title: String,
|
||||
text_content: String,
|
||||
html_content: String,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_issue(
|
||||
connection_pool: &PgPool,
|
||||
issue_id: Uuid,
|
||||
) -> Result<NewsletterIssue, anyhow::Error> {
|
||||
let issue = sqlx::query_as!(
|
||||
NewsletterIssue,
|
||||
r#"
|
||||
SELECT title, text_content, html_content
|
||||
FROM newsletter_issues
|
||||
WHERE newsletter_issue_id = $1
|
||||
"#,
|
||||
issue_id
|
||||
)
|
||||
.fetch_one(connection_pool)
|
||||
.await?;
|
||||
Ok(issue)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn dequeue_task(
|
||||
connection_pool: &PgPool,
|
||||
) -> Result<Option<(Transaction<'static, Postgres>, Uuid, String)>, anyhow::Error> {
|
||||
let mut transaction = connection_pool.begin().await?;
|
||||
let query = sqlx::query!(
|
||||
r#"
|
||||
SELECT newsletter_issue_id, subscriber_email
|
||||
FROM issue_delivery_queue
|
||||
FOR UPDATE
|
||||
SKIP LOCKED
|
||||
LIMIT 1
|
||||
"#
|
||||
);
|
||||
let r = transaction.fetch_optional(query).await?;
|
||||
if let Some(row) = r {
|
||||
Ok(Some((
|
||||
transaction,
|
||||
row.get("newsletter_issue_id"),
|
||||
row.get("subscriber_email"),
|
||||
)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn delete_task(
|
||||
mut transaction: Transaction<'static, Postgres>,
|
||||
issue_id: Uuid,
|
||||
email: &str,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let query = sqlx::query!(
|
||||
r#"
|
||||
DELETE FROM issue_delivery_queue
|
||||
WHERE
|
||||
newsletter_issue_id = $1
|
||||
AND subscriber_email = $2
|
||||
"#,
|
||||
issue_id,
|
||||
email
|
||||
);
|
||||
transaction.execute(query).await?;
|
||||
transaction.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user