use crate::{ configuration::Settings, domain::SubscriberEmail, email_client::EmailClient, routes::EmailType, }; use anyhow::Context; use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions}; use std::time::Duration; use tracing::{Span, field::display}; use uuid::Uuid; pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db()); let email_client = EmailClient::build(configuration.email_client).unwrap(); worker_loop(connection_pool, email_client).await } async fn worker_loop( connection_pool: PgPool, email_client: EmailClient, ) -> Result<(), anyhow::Error> { loop { match try_execute_task(&connection_pool, &email_client).await { Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await, Ok(ExecutionOutcome::TaskCompleted) => (), Err(_) => tokio::time::sleep(Duration::from_secs(1)).await, } } } pub enum ExecutionOutcome { TaskCompleted, EmptyQueue, } #[tracing::instrument( skip_all, fields( newsletter_issue_id=tracing::field::Empty, subscriber_email=tracing::field::Empty ), err )] pub async fn try_execute_task( connection_pool: &PgPool, email_client: &EmailClient, ) -> Result { let task = dequeue_task(connection_pool).await?; let (mut transaction, task) = match task { Some((transaction, task)) => (transaction, task), None => return Ok(ExecutionOutcome::EmptyQueue), }; Span::current() .record("newsletter_issue_id", display(task.newsletter_issue_id)) .record("subscriber_email", display(&task.subscriber_email)); match SubscriberEmail::parse(task.subscriber_email.clone()) { Ok(email) => { let mut issue = get_issue(connection_pool, task.newsletter_issue_id).await?; issue.inject_unsubscribe_token(&task.unsubscribe_token); if task.kind == EmailType::NewPost.to_string() { issue.inject_tracking_info(&mut transaction).await?; } if let Err(e) = email_client .send_email( &email, &issue.title, &issue.html_content, &issue.text_content, ) .await { tracing::error!( error = %e, "Failed to deliver issue to confirmed subscriber. Skipping." ); } } Err(e) => { tracing::error!( error = %e, "Skipping a subscriber. Their stored contact details are invalid." ); } } delete_task( transaction, task.newsletter_issue_id, &task.subscriber_email, ) .await?; Ok(ExecutionOutcome::TaskCompleted) } struct NewsletterIssue { newsletter_issue_id: Uuid, title: String, text_content: String, html_content: String, } impl NewsletterIssue { fn inject_unsubscribe_token(&mut self, token: &str) { self.text_content = self.text_content.replace("UNSUBSCRIBE_TOKEN", token); self.html_content = self.html_content.replace("UNSUBSCRIBE_TOKEN", token); } async fn inject_tracking_info( &mut self, transaction: &mut Transaction<'static, Postgres>, ) -> Result<(), anyhow::Error> { let email_id = Uuid::new_v4(); let query = sqlx::query!( r#" INSERT INTO notifications_delivered (email_id, newsletter_issue_id) VALUES ($1, $2) "#, email_id, self.newsletter_issue_id ); transaction .execute(query) .await .context("Failed to store email tracking info.")?; self.text_content = self.text_content.replace("EMAIL_ID", &email_id.to_string()); self.html_content = self.html_content.replace("EMAIL_ID", &email_id.to_string()); Ok(()) } } async fn get_issue( connection_pool: &PgPool, issue_id: Uuid, ) -> Result { let issue = sqlx::query_as!( NewsletterIssue, r#" SELECT newsletter_issue_id, title, text_content, html_content FROM newsletter_issues WHERE newsletter_issue_id = $1 "#, issue_id ) .fetch_one(connection_pool) .await?; Ok(issue) } pub struct Task { pub newsletter_issue_id: Uuid, pub subscriber_email: String, pub unsubscribe_token: String, pub kind: String, } async fn dequeue_task( connection_pool: &PgPool, ) -> Result, Task)>, anyhow::Error> { let mut transaction = connection_pool.begin().await?; let query = sqlx::query!( r#" SELECT newsletter_issue_id, subscriber_email, unsubscribe_token, kind FROM issue_delivery_queue FOR UPDATE SKIP LOCKED LIMIT 1 "# ); let r = transaction.fetch_optional(query).await?; if let Some(row) = r { let task = Task { newsletter_issue_id: row.get("newsletter_issue_id"), subscriber_email: row.get("subscriber_email"), unsubscribe_token: row.get("unsubscribe_token"), kind: row.get("kind"), }; Ok(Some((transaction, task))) } else { Ok(None) } } async fn delete_task( mut transaction: Transaction<'static, Postgres>, issue_id: Uuid, email: &str, ) -> Result<(), anyhow::Error> { let query = sqlx::query!( r#" DELETE FROM issue_delivery_queue WHERE newsletter_issue_id = $1 AND subscriber_email = $2 "#, issue_id, email ); transaction.execute(query).await?; transaction.commit().await?; Ok(()) }