Compute dashboard stats
Some checks failed
Rust / Test (push) Has been cancelled
Rust / Rustfmt (push) Has been cancelled
Rust / Clippy (push) Has been cancelled
Rust / Code coverage (push) Has been cancelled

Track open rate for new post notifications (user clicked the button in
the link or not). No data about the user is collected during the
process, it only uses an ID inserted by the issue delivery worker.
This commit is contained in:
Alphonse Paix
2025-09-24 04:30:27 +02:00
parent 33281132c6
commit 4cb1d2b6fd
19 changed files with 303 additions and 60 deletions

View File

@@ -1,4 +1,7 @@
use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient};
use crate::{
configuration::Settings, domain::SubscriberEmail, email_client::EmailClient, routes::EmailType,
};
use anyhow::Context;
use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions};
use std::time::Duration;
use tracing::{Span, field::display};
@@ -41,10 +44,10 @@ pub async fn try_execute_task(
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(connection_pool).await?;
if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}
let (transaction, task) = task.unwrap();
let (mut transaction, task) = match task {
Some((transaction, task)) => (transaction, task),
None => return Ok(ExecutionOutcome::EmptyQueue),
};
Span::current()
.record("newsletter_issue_id", display(task.newsletter_issue_id))
.record("subscriber_email", display(&task.subscriber_email));
@@ -52,6 +55,9 @@ pub async fn try_execute_task(
Ok(email) => {
let mut issue = get_issue(connection_pool, task.newsletter_issue_id).await?;
issue.inject_unsubscribe_token(&task.unsubscribe_token);
if task.kind == EmailType::NewPost.to_string() {
issue.create_tracking_info(&mut transaction).await?;
}
if let Err(e) = email_client
.send_email(
&email,
@@ -85,6 +91,7 @@ pub async fn try_execute_task(
}
struct NewsletterIssue {
newsletter_issue_id: Uuid,
title: String,
text_content: String,
html_content: String,
@@ -95,6 +102,28 @@ impl NewsletterIssue {
self.text_content = self.text_content.replace("UNSUBSCRIBE_TOKEN", token);
self.html_content = self.html_content.replace("UNSUBSCRIBE_TOKEN", token);
}
async fn create_tracking_info(
&mut self,
transaction: &mut Transaction<'static, Postgres>,
) -> Result<(), anyhow::Error> {
let email_id = Uuid::new_v4();
let query = sqlx::query!(
r#"
INSERT INTO notifications_delivered (email_id, newsletter_issue_id)
VALUES ($1, $2)
"#,
email_id,
self.newsletter_issue_id
);
transaction
.execute(query)
.await
.context("Failed to store email tracking info.")?;
self.text_content = self.text_content.replace("EMAIL_ID", &email_id.to_string());
self.html_content = self.html_content.replace("EMAIL_ID", &email_id.to_string());
Ok(())
}
}
#[tracing::instrument(skip_all)]
@@ -105,7 +134,7 @@ async fn get_issue(
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
SELECT title, text_content, html_content
SELECT newsletter_issue_id, title, text_content, html_content
FROM newsletter_issues
WHERE newsletter_issue_id = $1
"#,
@@ -120,6 +149,7 @@ pub struct Task {
pub newsletter_issue_id: Uuid,
pub subscriber_email: String,
pub unsubscribe_token: String,
pub kind: String,
}
#[tracing::instrument(skip_all)]
@@ -129,7 +159,7 @@ async fn dequeue_task(
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email, unsubscribe_token
SELECT newsletter_issue_id, subscriber_email, unsubscribe_token, kind
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
@@ -142,6 +172,7 @@ async fn dequeue_task(
newsletter_issue_id: row.get("newsletter_issue_id"),
subscriber_email: row.get("subscriber_email"),
unsubscribe_token: row.get("unsubscribe_token"),
kind: row.get("kind"),
};
Ok(Some((transaction, task)))
} else {