Handler to send emails to confirmed subscribers
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
mod health_check;
|
mod health_check;
|
||||||
|
mod newsletters;
|
||||||
mod subscriptions;
|
mod subscriptions;
|
||||||
mod subscriptions_confirm;
|
mod subscriptions_confirm;
|
||||||
|
|
||||||
pub use health_check::*;
|
pub use health_check::*;
|
||||||
|
pub use newsletters::*;
|
||||||
pub use subscriptions::*;
|
pub use subscriptions::*;
|
||||||
pub use subscriptions_confirm::*;
|
pub use subscriptions_confirm::*;
|
||||||
|
|||||||
110
src/routes/newsletters.rs
Normal file
110
src/routes/newsletters.rs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
use crate::{domain::SubscriberEmail, routes::error_chain_fmt, startup::AppState};
|
||||||
|
use anyhow::Context;
|
||||||
|
use axum::{
|
||||||
|
Json,
|
||||||
|
extract::State,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use reqwest::StatusCode;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
#[derive(thiserror::Error)]
|
||||||
|
pub enum PublishError {
|
||||||
|
#[error(transparent)]
|
||||||
|
UnexpectedError(#[from] anyhow::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for PublishError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
error_chain_fmt(self, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for PublishError {
|
||||||
|
fn into_response(self) -> Response {
|
||||||
|
#[derive(serde::Serialize)]
|
||||||
|
struct ErrorResponse<'a> {
|
||||||
|
message: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::error!("{:?}", self);
|
||||||
|
|
||||||
|
let status = match self {
|
||||||
|
PublishError::UnexpectedError(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
};
|
||||||
|
let message = "An internal server error occured.";
|
||||||
|
(status, Json(ErrorResponse { message })).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
pub struct BodyData {
|
||||||
|
title: String,
|
||||||
|
content: Content,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
pub struct Content {
|
||||||
|
html: String,
|
||||||
|
text: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "Publishing a newsletter",
|
||||||
|
skip(connection_pool, email_client, body)
|
||||||
|
)]
|
||||||
|
pub async fn publish_newsletter(
|
||||||
|
State(AppState {
|
||||||
|
connection_pool,
|
||||||
|
email_client,
|
||||||
|
..
|
||||||
|
}): State<AppState>,
|
||||||
|
body: Json<BodyData>,
|
||||||
|
) -> Result<Response, PublishError> {
|
||||||
|
let subscribers = get_confirmed_subscribers(&connection_pool).await?;
|
||||||
|
for subscriber in subscribers {
|
||||||
|
match subscriber {
|
||||||
|
Ok(ConfirmedSubscriber { email, .. }) => {
|
||||||
|
email_client
|
||||||
|
.send_email(&email, &body.title, &body.content.html, &body.content.text)
|
||||||
|
.await
|
||||||
|
.with_context(|| {
|
||||||
|
format!("Failed to send newsletter issue to {}", email.as_ref())
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Skipping a confirmed subscriber. Their stored contact details are invalid: {}",
|
||||||
|
e
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(StatusCode::OK.into_response())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct ConfirmedSubscriber {
|
||||||
|
name: String,
|
||||||
|
email: SubscriberEmail,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(name = "Get confirmed subscribers", skip(connection_pool))]
|
||||||
|
async fn get_confirmed_subscribers(
|
||||||
|
connection_pool: &PgPool,
|
||||||
|
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
|
||||||
|
let rows = sqlx::query!("SELECT name, email FROM subscriptions WHERE status = 'confirmed'")
|
||||||
|
.fetch_all(connection_pool)
|
||||||
|
.await?;
|
||||||
|
let confirmed_subscribers = rows
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| match SubscriberEmail::parse(r.email) {
|
||||||
|
Ok(email) => Ok(ConfirmedSubscriber {
|
||||||
|
name: r.name,
|
||||||
|
email,
|
||||||
|
}),
|
||||||
|
Err(e) => Err(anyhow::anyhow!(e)),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(confirmed_subscribers)
|
||||||
|
}
|
||||||
@@ -24,7 +24,10 @@ fn generate_subscription_token() -> String {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error_chain_fmt(e: &impl std::error::Error, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
pub fn error_chain_fmt(
|
||||||
|
e: &impl std::error::Error,
|
||||||
|
f: &mut std::fmt::Formatter,
|
||||||
|
) -> std::fmt::Result {
|
||||||
writeln!(f, "{}", e)?;
|
writeln!(f, "{}", e)?;
|
||||||
let mut current = e.source();
|
let mut current = e.source();
|
||||||
while let Some(cause) = current {
|
while let Some(cause) = current {
|
||||||
@@ -37,24 +40,12 @@ fn error_chain_fmt(e: &impl std::error::Error, f: &mut std::fmt::Formatter) -> s
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[derive(thiserror::Error)]
|
|
||||||
// pub enum SubscribeError {
|
|
||||||
// #[error("Failed to store the confirmation token for a new subscriber.")]
|
|
||||||
// StoreToken(#[from] StoreTokenError),
|
|
||||||
// #[error("A database error occured.")]
|
|
||||||
// Database(#[from] sqlx::Error),
|
|
||||||
// #[error("Failed to send a confirmation email.")]
|
|
||||||
// SendEmail(#[from] reqwest::Error),
|
|
||||||
// #[error("{0}")]
|
|
||||||
// Validation(String),
|
|
||||||
// }
|
|
||||||
|
|
||||||
#[derive(thiserror::Error)]
|
#[derive(thiserror::Error)]
|
||||||
pub enum SubscribeError {
|
pub enum SubscribeError {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
UnexpectedError(#[from] anyhow::Error),
|
UnexpectedError(#[from] anyhow::Error),
|
||||||
#[error("{0}")]
|
#[error("{0}")]
|
||||||
Validation(String),
|
ValidationError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for SubscribeError {
|
impl std::fmt::Debug for SubscribeError {
|
||||||
@@ -72,15 +63,9 @@ impl IntoResponse for SubscribeError {
|
|||||||
|
|
||||||
tracing::error!("{:?}", self);
|
tracing::error!("{:?}", self);
|
||||||
|
|
||||||
// let status = match self {
|
|
||||||
// SubscribeError::StoreToken(_)
|
|
||||||
// | SubscribeError::Database(_)
|
|
||||||
// | SubscribeError::SendEmail(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
// SubscribeError::Validation(_) => StatusCode::BAD_REQUEST,
|
|
||||||
// };
|
|
||||||
let status = match self {
|
let status = match self {
|
||||||
SubscribeError::UnexpectedError(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
SubscribeError::UnexpectedError(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
SubscribeError::Validation(_) => StatusCode::BAD_REQUEST,
|
SubscribeError::ValidationError(_) => StatusCode::BAD_REQUEST,
|
||||||
};
|
};
|
||||||
let message = "An internal server error occured.";
|
let message = "An internal server error occured.";
|
||||||
(status, Json(ErrorResponse { message })).into_response()
|
(status, Json(ErrorResponse { message })).into_response()
|
||||||
@@ -107,7 +92,7 @@ pub async fn subscribe(
|
|||||||
.begin()
|
.begin()
|
||||||
.await
|
.await
|
||||||
.context("Failed to acquire a Postgres connection from the pool.")?;
|
.context("Failed to acquire a Postgres connection from the pool.")?;
|
||||||
let new_subscriber = form.try_into().map_err(SubscribeError::Validation)?;
|
let new_subscriber = form.try_into().map_err(SubscribeError::ValidationError)?;
|
||||||
let subscriber_id = insert_subscriber(&mut transaction, &new_subscriber)
|
let subscriber_id = insert_subscriber(&mut transaction, &new_subscriber)
|
||||||
.await
|
.await
|
||||||
.context("Failed to insert new subscriber in the database.")?;
|
.context("Failed to insert new subscriber in the database.")?;
|
||||||
|
|||||||
@@ -61,6 +61,7 @@ pub fn app(connection_pool: PgPool, email_client: EmailClient, base_url: String)
|
|||||||
.route("/health_check", get(health_check))
|
.route("/health_check", get(health_check))
|
||||||
.route("/subscriptions", post(subscribe))
|
.route("/subscriptions", post(subscribe))
|
||||||
.route("/subscriptions/confirm", get(confirm))
|
.route("/subscriptions/confirm", get(confirm))
|
||||||
|
.route("/newsletters", post(publish_newsletter))
|
||||||
.layer(
|
.layer(
|
||||||
TraceLayer::new_for_http()
|
TraceLayer::new_for_http()
|
||||||
.make_span_with(|request: &Request<_>| {
|
.make_span_with(|request: &Request<_>| {
|
||||||
|
|||||||
@@ -94,6 +94,15 @@ impl TestApp {
|
|||||||
.await
|
.await
|
||||||
.expect("Failed to execute request")
|
.expect("Failed to execute request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn post_newsletters(&self, body: serde_json::Value) -> reqwest::Response {
|
||||||
|
reqwest::Client::new()
|
||||||
|
.post(format!("{}/newsletters", self.address))
|
||||||
|
.json(&body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.expect("Failed to execute request")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn configure_database(config: &DatabaseSettings) -> PgPool {
|
async fn configure_database(config: &DatabaseSettings) -> PgPool {
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
mod health_check;
|
mod health_check;
|
||||||
mod helpers;
|
mod helpers;
|
||||||
|
mod newsletters;
|
||||||
mod subscriptions;
|
mod subscriptions;
|
||||||
mod subscriptions_confirm;
|
mod subscriptions_confirm;
|
||||||
|
|||||||
111
tests/api/newsletters.rs
Normal file
111
tests/api/newsletters.rs
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
use crate::helpers::{ConfirmationLinks, TestApp};
|
||||||
|
use wiremock::{
|
||||||
|
Mock, ResponseTemplate,
|
||||||
|
matchers::{any, method, path},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
|
||||||
|
let app = TestApp::spawn().await;
|
||||||
|
create_unconfirmed_subscriber(&app).await;
|
||||||
|
|
||||||
|
Mock::given(any())
|
||||||
|
.respond_with(ResponseTemplate::new(200))
|
||||||
|
.expect(0)
|
||||||
|
.mount(&app.email_server)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let newsletter_request_body = serde_json::json!({"title": "Newsletter title", "content": { "text": "Newsletter body as plain text", "html": "<p>Newsletter body as HTML</p>"}});
|
||||||
|
let response = app.post_newsletters(newsletter_request_body).await;
|
||||||
|
|
||||||
|
assert_eq!(response.status().as_u16(), 200);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn newsletters_are_delivered_to_confirmed_subscribers() {
|
||||||
|
let app = TestApp::spawn().await;
|
||||||
|
create_confirmed_subscriber(&app).await;
|
||||||
|
|
||||||
|
Mock::given(any())
|
||||||
|
.respond_with(ResponseTemplate::new(200))
|
||||||
|
.expect(1)
|
||||||
|
.mount(&app.email_server)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let newsletter_request_body = serde_json::json!({
|
||||||
|
"title": "Newsletter title",
|
||||||
|
"content": {
|
||||||
|
"text": "Newsletter body as plain text",
|
||||||
|
"html": "<p>Newsletter body as HTML</p>"
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let response = app.post_newsletters(newsletter_request_body).await;
|
||||||
|
|
||||||
|
assert_eq!(response.status().as_u16(), 200);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn newsletters_returns_422_for_invalid_data() {
|
||||||
|
let app = TestApp::spawn().await;
|
||||||
|
|
||||||
|
let test_cases = [
|
||||||
|
(
|
||||||
|
serde_json::json!({
|
||||||
|
"content": {
|
||||||
|
"text": "Newsletter body as plain text",
|
||||||
|
"html": "<p>Newsletter body as HTML</p>"
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
"missing the title",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
serde_json::json!({ "title": "Newsletter" }),
|
||||||
|
"missing the title",
|
||||||
|
),
|
||||||
|
];
|
||||||
|
|
||||||
|
for (invalid_body, error_message) in test_cases {
|
||||||
|
let response = app.post_newsletters(invalid_body).await;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
response.status().as_u16(),
|
||||||
|
422,
|
||||||
|
"The API did not fail with 422 Unprocessable Entity when the payload was {}.",
|
||||||
|
error_message
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
|
||||||
|
let body = "name=Alphonse&email=alphonse.paix%40outlook.com";
|
||||||
|
|
||||||
|
let _mock_guard = Mock::given(path("/v1/email"))
|
||||||
|
.and(method("POST"))
|
||||||
|
.respond_with(ResponseTemplate::new(200))
|
||||||
|
.named("Create unconfirmed subscriber")
|
||||||
|
.expect(1)
|
||||||
|
.mount_as_scoped(&app.email_server)
|
||||||
|
.await;
|
||||||
|
app.post_subscriptions(body.into())
|
||||||
|
.await
|
||||||
|
.error_for_status()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let email_request = &app
|
||||||
|
.email_server
|
||||||
|
.received_requests()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.pop()
|
||||||
|
.unwrap();
|
||||||
|
app.get_confirmation_links(email_request)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_confirmed_subscriber(app: &TestApp) {
|
||||||
|
let confirmation_links = create_unconfirmed_subscriber(app).await;
|
||||||
|
reqwest::get(confirmation_links.html)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.error_for_status()
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
@@ -81,7 +81,7 @@ async fn subscribe_returns_a_400_when_fields_are_present_but_invalid() {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
400,
|
400,
|
||||||
response.status().as_u16(),
|
response.status().as_u16(),
|
||||||
"the API did not return a 400 Bad Request when the payload had an {}.",
|
"the API did not fail with 400 Bad Request when the payload had an {}.",
|
||||||
description
|
description
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user