210 lines
6.0 KiB
Rust
210 lines
6.0 KiB
Rust
use crate::{
|
|
configuration::Settings, domain::SubscriberEmail, email_client::EmailClient, routes::EmailType,
|
|
};
|
|
use anyhow::Context;
|
|
use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions};
|
|
use std::time::Duration;
|
|
use tracing::{Span, field::display};
|
|
use uuid::Uuid;
|
|
|
|
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
|
|
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
|
|
let email_client = EmailClient::build(configuration.email_client).unwrap();
|
|
worker_loop(connection_pool, email_client).await
|
|
}
|
|
|
|
async fn worker_loop(
|
|
connection_pool: PgPool,
|
|
email_client: EmailClient,
|
|
) -> Result<(), anyhow::Error> {
|
|
loop {
|
|
match try_execute_task(&connection_pool, &email_client).await {
|
|
Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await,
|
|
Ok(ExecutionOutcome::TaskCompleted) => (),
|
|
Err(_) => tokio::time::sleep(Duration::from_secs(1)).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub enum ExecutionOutcome {
|
|
TaskCompleted,
|
|
EmptyQueue,
|
|
}
|
|
|
|
pub async fn try_execute_task(
|
|
connection_pool: &PgPool,
|
|
email_client: &EmailClient,
|
|
) -> Result<ExecutionOutcome, anyhow::Error> {
|
|
let task = dequeue_task(connection_pool).await?;
|
|
let (mut transaction, task) = match task {
|
|
Some((transaction, task)) => (transaction, task),
|
|
None => return Ok(ExecutionOutcome::EmptyQueue),
|
|
};
|
|
Span::current()
|
|
.record("newsletter_issue_id", display(task.newsletter_issue_id))
|
|
.record("subscriber_email", display(&task.subscriber_email));
|
|
match SubscriberEmail::parse(task.subscriber_email.clone()) {
|
|
Ok(email) => {
|
|
execute_task(
|
|
connection_pool,
|
|
&mut transaction,
|
|
&task,
|
|
email,
|
|
email_client,
|
|
)
|
|
.await?;
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(
|
|
error = %e,
|
|
"Skipping a subscriber. Their stored contact details are invalid."
|
|
);
|
|
}
|
|
}
|
|
delete_task(
|
|
transaction,
|
|
task.newsletter_issue_id,
|
|
&task.subscriber_email,
|
|
)
|
|
.await?;
|
|
|
|
Ok(ExecutionOutcome::TaskCompleted)
|
|
}
|
|
|
|
struct NewsletterIssue {
|
|
newsletter_issue_id: Uuid,
|
|
title: String,
|
|
text_content: String,
|
|
html_content: String,
|
|
}
|
|
|
|
impl NewsletterIssue {
|
|
fn inject_unsubscribe_token(&mut self, token: &str) {
|
|
self.text_content = self.text_content.replace("UNSUBSCRIBE_TOKEN", token);
|
|
self.html_content = self.html_content.replace("UNSUBSCRIBE_TOKEN", token);
|
|
}
|
|
|
|
async fn inject_tracking_info(
|
|
&mut self,
|
|
transaction: &mut Transaction<'static, Postgres>,
|
|
) -> Result<(), anyhow::Error> {
|
|
let email_id = Uuid::new_v4();
|
|
let query = sqlx::query!(
|
|
r#"
|
|
INSERT INTO notifications_delivered (email_id, newsletter_issue_id)
|
|
VALUES ($1, $2)
|
|
"#,
|
|
email_id,
|
|
self.newsletter_issue_id
|
|
);
|
|
transaction
|
|
.execute(query)
|
|
.await
|
|
.context("Failed to store email tracking info.")?;
|
|
self.text_content = self.text_content.replace("EMAIL_ID", &email_id.to_string());
|
|
self.html_content = self.html_content.replace("EMAIL_ID", &email_id.to_string());
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn get_issue(
|
|
connection_pool: &PgPool,
|
|
issue_id: Uuid,
|
|
) -> Result<NewsletterIssue, anyhow::Error> {
|
|
let issue = sqlx::query_as!(
|
|
NewsletterIssue,
|
|
r#"
|
|
SELECT newsletter_issue_id, title, text_content, html_content
|
|
FROM newsletter_issues
|
|
WHERE newsletter_issue_id = $1
|
|
"#,
|
|
issue_id
|
|
)
|
|
.fetch_one(connection_pool)
|
|
.await?;
|
|
Ok(issue)
|
|
}
|
|
|
|
pub struct Task {
|
|
pub newsletter_issue_id: Uuid,
|
|
pub subscriber_email: String,
|
|
pub unsubscribe_token: String,
|
|
pub kind: String,
|
|
}
|
|
|
|
async fn dequeue_task(
|
|
connection_pool: &PgPool,
|
|
) -> Result<Option<(Transaction<'static, Postgres>, Task)>, anyhow::Error> {
|
|
let mut transaction = connection_pool.begin().await?;
|
|
let query = sqlx::query!(
|
|
r#"
|
|
SELECT newsletter_issue_id, subscriber_email, unsubscribe_token, kind
|
|
FROM issue_delivery_queue
|
|
FOR UPDATE
|
|
SKIP LOCKED
|
|
LIMIT 1
|
|
"#
|
|
);
|
|
let r = transaction.fetch_optional(query).await?;
|
|
if let Some(row) = r {
|
|
let task = Task {
|
|
newsletter_issue_id: row.get("newsletter_issue_id"),
|
|
subscriber_email: row.get("subscriber_email"),
|
|
unsubscribe_token: row.get("unsubscribe_token"),
|
|
kind: row.get("kind"),
|
|
};
|
|
Ok(Some((transaction, task)))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
#[tracing::instrument(
|
|
name = "Executing task",
|
|
skip_all,
|
|
fields(email = %email),
|
|
)]
|
|
async fn execute_task(
|
|
connection_pool: &PgPool,
|
|
transaction: &mut Transaction<'static, Postgres>,
|
|
task: &Task,
|
|
email: SubscriberEmail,
|
|
email_client: &EmailClient,
|
|
) -> Result<(), anyhow::Error> {
|
|
let mut issue = get_issue(connection_pool, task.newsletter_issue_id).await?;
|
|
issue.inject_unsubscribe_token(&task.unsubscribe_token);
|
|
if task.kind == EmailType::NewPost.to_string() {
|
|
issue.inject_tracking_info(transaction).await?;
|
|
}
|
|
email_client
|
|
.send_email(
|
|
&email,
|
|
&issue.title,
|
|
&issue.html_content,
|
|
&issue.text_content,
|
|
)
|
|
.await
|
|
.context("Failed to deliver newsletter issue to subscriber..")?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_task(
|
|
mut transaction: Transaction<'static, Postgres>,
|
|
issue_id: Uuid,
|
|
email: &str,
|
|
) -> Result<(), anyhow::Error> {
|
|
let query = sqlx::query!(
|
|
r#"
|
|
DELETE FROM issue_delivery_queue
|
|
WHERE
|
|
newsletter_issue_id = $1
|
|
AND subscriber_email = $2
|
|
"#,
|
|
issue_id,
|
|
email
|
|
);
|
|
transaction.execute(query).await?;
|
|
transaction.commit().await?;
|
|
Ok(())
|
|
}
|