Unsubscribe link in emails sent
This commit is contained in:
@@ -44,13 +44,14 @@ pub async fn try_execute_task(
|
||||
if task.is_none() {
|
||||
return Ok(ExecutionOutcome::EmptyQueue);
|
||||
}
|
||||
let (transaction, issue_id, email) = task.unwrap();
|
||||
let (transaction, task) = task.unwrap();
|
||||
Span::current()
|
||||
.record("newsletter_issue_id", display(issue_id))
|
||||
.record("subscriber_email", display(&email));
|
||||
match SubscriberEmail::parse(email.clone()) {
|
||||
.record("newsletter_issue_id", display(task.newsletter_issue_id))
|
||||
.record("subscriber_email", display(&task.subscriber_email));
|
||||
match SubscriberEmail::parse(task.subscriber_email.clone()) {
|
||||
Ok(email) => {
|
||||
let issue = get_issue(connection_pool, issue_id).await?;
|
||||
let mut issue = get_issue(connection_pool, task.newsletter_issue_id).await?;
|
||||
issue.inject_unsubscribe_token(&task.unsubscribe_token);
|
||||
if let Err(e) = email_client
|
||||
.send_email(
|
||||
&email,
|
||||
@@ -73,7 +74,12 @@ pub async fn try_execute_task(
|
||||
);
|
||||
}
|
||||
}
|
||||
delete_task(transaction, issue_id, &email).await?;
|
||||
delete_task(
|
||||
transaction,
|
||||
task.newsletter_issue_id,
|
||||
&task.subscriber_email,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(ExecutionOutcome::TaskCompleted)
|
||||
}
|
||||
@@ -84,6 +90,13 @@ struct NewsletterIssue {
|
||||
html_content: String,
|
||||
}
|
||||
|
||||
impl NewsletterIssue {
|
||||
fn inject_unsubscribe_token(&mut self, token: &str) {
|
||||
self.text_content = self.text_content.replace("UNSUBSCRIBE_TOKEN", token);
|
||||
self.html_content = self.html_content.replace("UNSUBSCRIBE_TOKEN", token);
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn get_issue(
|
||||
connection_pool: &PgPool,
|
||||
@@ -103,14 +116,20 @@ async fn get_issue(
|
||||
Ok(issue)
|
||||
}
|
||||
|
||||
pub struct Task {
|
||||
pub newsletter_issue_id: Uuid,
|
||||
pub subscriber_email: String,
|
||||
pub unsubscribe_token: String,
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn dequeue_task(
|
||||
connection_pool: &PgPool,
|
||||
) -> Result<Option<(Transaction<'static, Postgres>, Uuid, String)>, anyhow::Error> {
|
||||
) -> Result<Option<(Transaction<'static, Postgres>, Task)>, anyhow::Error> {
|
||||
let mut transaction = connection_pool.begin().await?;
|
||||
let query = sqlx::query!(
|
||||
r#"
|
||||
SELECT newsletter_issue_id, subscriber_email
|
||||
SELECT newsletter_issue_id, subscriber_email, unsubscribe_token
|
||||
FROM issue_delivery_queue
|
||||
FOR UPDATE
|
||||
SKIP LOCKED
|
||||
@@ -119,11 +138,12 @@ async fn dequeue_task(
|
||||
);
|
||||
let r = transaction.fetch_optional(query).await?;
|
||||
if let Some(row) = r {
|
||||
Ok(Some((
|
||||
transaction,
|
||||
row.get("newsletter_issue_id"),
|
||||
row.get("subscriber_email"),
|
||||
)))
|
||||
let task = Task {
|
||||
newsletter_issue_id: row.get("newsletter_issue_id"),
|
||||
subscriber_email: row.get("subscriber_email"),
|
||||
unsubscribe_token: row.get("unsubscribe_token"),
|
||||
};
|
||||
Ok(Some((transaction, task)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user