Comment posting is idempotent + tests

This commit is contained in:
Alphonse Paix
2025-10-05 15:01:57 +02:00
parent 8f62c2513e
commit d96a29ee73
15 changed files with 289 additions and 95 deletions

View File

@@ -7,7 +7,6 @@ use axum::{
use reqwest::StatusCode;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use std::str::FromStr;
use uuid::Uuid;
#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
@@ -23,7 +22,6 @@ struct HeaderPairRecord {
pub async fn get_saved_response(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<Response>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
@@ -32,11 +30,8 @@ pub async fn get_saved_response(
response_headers as "response_headers!: Vec<HeaderPairRecord>",
response_body as "response_body!"
FROM idempotency
WHERE
user_id = $1
AND idempotency_key = $2
WHERE idempotency_key = $1
"#,
user_id,
idempotency_key.as_ref()
)
.fetch_optional(connection_pool)
@@ -61,7 +56,6 @@ pub async fn get_saved_response(
pub async fn save_response(
mut transaction: Transaction<'static, Postgres>,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
response: Response<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let status_code = response.status().as_u16() as i16;
@@ -80,14 +74,11 @@ pub async fn save_response(
r#"
UPDATE idempotency
SET
response_status_code = $3,
response_headers = $4,
response_body = $5
WHERE
user_id = $1
AND idempotency_key = $2
response_status_code = $2,
response_headers = $3,
response_body = $4
WHERE idempotency_key = $1
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
@@ -109,23 +100,21 @@ pub enum NextAction {
pub async fn try_processing(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
INSERT INTO idempotency (user_id, idempotency_key, created_at)
VALUES ($1, $2, now())
INSERT INTO idempotency (idempotency_key, created_at)
VALUES ($1, now())
ON CONFLICT DO NOTHING
"#,
user_id,
idempotency_key.as_ref()
);
let n_inserted_rows = transaction.execute(query).await?.rows_affected();
if n_inserted_rows > 0 {
Ok(NextAction::StartProcessing(transaction))
} else {
let saved_response = get_saved_response(connection_pool, idempotency_key, user_id)
let saved_response = get_saved_response(connection_pool, idempotency_key)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find saved response."))?;
Ok(NextAction::ReturnSavedResponse(saved_response))

View File

@@ -1,5 +1,4 @@
use crate::{
authentication::AuthenticatedUser,
idempotency::{IdempotencyKey, save_response, try_processing},
routes::{AdminError, AppError},
startup::AppState,
@@ -8,7 +7,7 @@ use crate::{
use anyhow::Context;
use askama::Template;
use axum::{
Extension, Form,
Form,
extract::State,
response::{Html, IntoResponse, Response},
};
@@ -95,7 +94,6 @@ pub async fn publish_newsletter(
base_url,
..
}): State<AppState>,
Extension(AuthenticatedUser { user_id, .. }): Extension<AuthenticatedUser>,
Form(form): Form<BodyData>,
) -> Result<Response, AppError> {
validate_form(&form).map_err(|e| AdminError::Publish(anyhow::anyhow!(e)))?;
@@ -105,7 +103,7 @@ pub async fn publish_newsletter(
.try_into()
.map_err(AdminError::Idempotency)?;
let mut transaction = match try_processing(&connection_pool, &idempotency_key, user_id).await? {
let mut transaction = match try_processing(&connection_pool, &idempotency_key).await? {
crate::idempotency::NextAction::StartProcessing(t) => t,
crate::idempotency::NextAction::ReturnSavedResponse(response) => {
return Ok(response);
@@ -129,7 +127,7 @@ pub async fn publish_newsletter(
let message = String::from("Your email has been queued for delivery.");
let template = MessageTemplate::success(message);
let response = Html(template.render().unwrap()).into_response();
let response = save_response(transaction, &idempotency_key, user_id, response)
let response = save_response(transaction, &idempotency_key, response)
.await
.map_err(AdminError::UnexpectedError)?;
Ok(response)

View File

@@ -54,7 +54,7 @@ pub async fn create_post(
.try_into()
.map_err(AdminError::Idempotency)?;
let mut transaction = match try_processing(&connection_pool, &idempotency_key, user_id).await? {
let mut transaction = match try_processing(&connection_pool, &idempotency_key).await? {
crate::idempotency::NextAction::StartProcessing(t) => t,
crate::idempotency::NextAction::ReturnSavedResponse(response) => {
return Ok(response);
@@ -75,7 +75,7 @@ pub async fn create_post(
let template = MessageTemplate::success("Your new post has been published!".into());
let response = Html(template.render().unwrap()).into_response();
let response = save_response(transaction, &idempotency_key, user_id, response)
let response = save_response(transaction, &idempotency_key, response)
.await
.map_err(AdminError::UnexpectedError)?;
Ok(response)

View File

@@ -1,4 +1,5 @@
use crate::routes::get_max_page;
use crate::idempotency::{IdempotencyKey, save_response, try_processing};
use crate::routes::{AdminError, get_max_page};
use crate::templates::CommentsPageDashboardTemplate;
use crate::{
domain::CommentEntry,
@@ -13,7 +14,7 @@ use axum::{
extract::{Path, Query, State},
response::{IntoResponse, Response},
};
use sqlx::PgPool;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use uuid::Uuid;
#[derive(serde::Deserialize)]
@@ -25,6 +26,7 @@ pub struct CommentPathParam {
pub struct CommentForm {
pub author: Option<String>,
pub content: String,
pub idempotency_key: String,
}
#[tracing::instrument(name = "Posting new comment", skip_all, fields(post_id = %post_id))]
@@ -36,14 +38,29 @@ pub async fn post_comment(
Form(form): Form<CommentForm>,
) -> Result<Response, AppError> {
validate_form(&form)?;
let comment_id = insert_comment(&connection_pool, post_id, form)
let idempotency_key: IdempotencyKey = form
.idempotency_key
.try_into()
.map_err(AdminError::Idempotency)?;
let mut transaction = match try_processing(&connection_pool, &idempotency_key).await? {
crate::idempotency::NextAction::StartProcessing(t) => t,
crate::idempotency::NextAction::ReturnSavedResponse(response) => {
return Ok(response);
}
};
insert_comment(&mut transaction, post_id, form.author, form.content)
.await
.context("Could not insert comment into database.")?;
tracing::info!("new comment with id {} has been inserted", comment_id);
let template = HtmlTemplate(MessageTemplate::success(
"Your comment has been posted.".into(),
));
Ok(template.into_response())
let response = template.into_response();
let response = save_response(transaction, &idempotency_key, response).await?;
Ok(response)
}
fn validate_form(form: &CommentForm) -> Result<(), anyhow::Error> {
@@ -55,17 +72,19 @@ fn validate_form(form: &CommentForm) -> Result<(), anyhow::Error> {
#[tracing::instrument(name = "Inserting new comment in database", skip_all, fields(comment_id = tracing::field::Empty))]
async fn insert_comment(
connection_pool: &PgPool,
transaction: &mut Transaction<'static, Postgres>,
post_id: Uuid,
form: CommentForm,
author: Option<String>,
content: String,
) -> Result<Uuid, sqlx::Error> {
let author = form
.author
let author = author
.filter(|s| !s.trim().is_empty())
.map(|s| s.trim().to_string());
let content = content.trim();
let comment_id = Uuid::new_v4();
tracing::Span::current().record("comment_id", comment_id.to_string());
sqlx::query!(
let query = sqlx::query!(
"
INSERT INTO comments (comment_id, post_id, author, content)
VALUES ($1, $2, $3, $4)
@@ -73,10 +92,9 @@ async fn insert_comment(
comment_id,
post_id,
author,
form.content.trim()
)
.execute(connection_pool)
.await?;
content,
);
transaction.execute(query).await?;
Ok(comment_id)
}

View File

@@ -151,9 +151,11 @@ pub async fn see_post(
let comments = get_comments_page_for_post(&connection_pool, post_id, 1)
.await
.context("Failed to fetch latest comments")?;
let idempotency_key = Uuid::new_v4().to_string();
let template = HtmlTemplate(PostTemplate {
post,
comments,
idempotency_key,
current_page,
max_page,
comments_count,

View File

@@ -114,6 +114,7 @@ pub struct PostListTemplate {
#[template(path = "posts/page.html")]
pub struct PostTemplate {
pub post: PostEntry,
pub idempotency_key: String,
pub comments: Vec<CommentEntry>,
pub current_page: i64,
pub max_page: i64,