use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient}; use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions}; use std::time::Duration; use tracing::{Span, field::display}; use uuid::Uuid; pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db()); let email_client = EmailClient::build(configuration.email_client).unwrap(); worker_loop(connection_pool, email_client).await } async fn worker_loop( connection_pool: PgPool, email_client: EmailClient, ) -> Result<(), anyhow::Error> { loop { match try_execute_task(&connection_pool, &email_client).await { Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await, Ok(ExecutionOutcome::TaskCompleted) => (), Err(_) => tokio::time::sleep(Duration::from_secs(1)).await, } } } pub enum ExecutionOutcome { TaskCompleted, EmptyQueue, } #[tracing::instrument( skip_all, fields( newsletter_issue_id=tracing::field::Empty, subscriber_email=tracing::field::Empty ), err )] pub async fn try_execute_task( connection_pool: &PgPool, email_client: &EmailClient, ) -> Result { let task = dequeue_task(connection_pool).await?; if task.is_none() { return Ok(ExecutionOutcome::EmptyQueue); } let (transaction, issue_id, email) = task.unwrap(); Span::current() .record("newsletter_issue_id", display(issue_id)) .record("subscriber_email", display(&email)); match SubscriberEmail::parse(email.clone()) { Ok(email) => { let issue = get_issue(connection_pool, issue_id).await?; if let Err(e) = email_client .send_email( &email, &issue.title, &issue.html_content, &issue.text_content, ) .await { tracing::error!( error.message = %e, "Failed to deliver issue to confirmed subscriber. Skipping." ); } } Err(e) => { tracing::error!( error.message = %e, "Skipping a subscriber. Their stored contact details are invalid." ); } } delete_task(transaction, issue_id, &email).await?; Ok(ExecutionOutcome::TaskCompleted) } struct NewsletterIssue { title: String, text_content: String, html_content: String, } #[tracing::instrument(skip_all)] async fn get_issue( connection_pool: &PgPool, issue_id: Uuid, ) -> Result { let issue = sqlx::query_as!( NewsletterIssue, r#" SELECT title, text_content, html_content FROM newsletter_issues WHERE newsletter_issue_id = $1 "#, issue_id ) .fetch_one(connection_pool) .await?; Ok(issue) } #[tracing::instrument(skip_all)] async fn dequeue_task( connection_pool: &PgPool, ) -> Result, Uuid, String)>, anyhow::Error> { let mut transaction = connection_pool.begin().await?; let query = sqlx::query!( r#" SELECT newsletter_issue_id, subscriber_email FROM issue_delivery_queue FOR UPDATE SKIP LOCKED LIMIT 1 "# ); let r = transaction.fetch_optional(query).await?; if let Some(row) = r { Ok(Some(( transaction, row.get("newsletter_issue_id"), row.get("subscriber_email"), ))) } else { Ok(None) } } #[tracing::instrument(skip_all)] async fn delete_task( mut transaction: Transaction<'static, Postgres>, issue_id: Uuid, email: &str, ) -> Result<(), anyhow::Error> { let query = sqlx::query!( r#" DELETE FROM issue_delivery_queue WHERE newsletter_issue_id = $1 AND subscriber_email = $2 "#, issue_id, email ); transaction.execute(query).await?; transaction.commit().await?; Ok(()) }