use crate::idempotency::IdempotencyKey; use axum::{ body::{self, Body}, http::{HeaderName, HeaderValue}, response::{IntoResponse, Response}, }; use reqwest::StatusCode; use sqlx::{Executor, PgPool, Postgres, Transaction}; use std::str::FromStr; use uuid::Uuid; #[derive(Debug, sqlx::Type)] #[sqlx(type_name = "header_pair")] struct HeaderPairRecord { name: String, value: Vec, } pub async fn get_saved_response( connection_pool: &PgPool, idempotency_key: &IdempotencyKey, user_id: Uuid, ) -> Result, anyhow::Error> { let saved_response = sqlx::query!( r#" SELECT response_status_code as "response_status_code!", response_headers as "response_headers!: Vec", response_body as "response_body!" FROM idempotency WHERE user_id = $1 AND idempotency_key = $2 "#, user_id, idempotency_key.as_ref() ) .fetch_optional(connection_pool) .await?; if let Some(r) = saved_response { let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?; let mut response = status_code.into_response(); for HeaderPairRecord { name, value } in r.response_headers { response.headers_mut().insert( HeaderName::from_str(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); } *response.body_mut() = r.response_body.into(); Ok(Some(response)) } else { Ok(None) } } pub async fn save_response( mut transaction: Transaction<'static, Postgres>, idempotency_key: &IdempotencyKey, user_id: Uuid, response: Response, ) -> Result, anyhow::Error> { let status_code = response.status().as_u16() as i16; let headers = response .headers() .into_iter() .map(|(name, value)| HeaderPairRecord { name: name.to_string(), value: value.as_bytes().to_vec(), }) .collect::>(); let (response_head, body) = response.into_parts(); let body = body::to_bytes(body, usize::MAX).await?.to_vec(); let query = sqlx::query_unchecked!( r#" UPDATE idempotency SET response_status_code = $3, response_headers = $4, response_body = $5 WHERE user_id = $1 AND idempotency_key = $2 "#, user_id, idempotency_key.as_ref(), status_code, headers, &body, ); transaction.execute(query).await?; transaction.commit().await?; let mut r = response_head.into_response(); *r.body_mut() = body.into(); Ok(r) } pub enum NextAction { StartProcessing(Transaction<'static, Postgres>), ReturnSavedResponse(Response), } pub async fn try_processing( connection_pool: &PgPool, idempotency_key: &IdempotencyKey, user_id: Uuid, ) -> Result { let mut transaction = connection_pool.begin().await?; let query = sqlx::query!( r#" INSERT INTO idempotency (user_id, idempotency_key, created_at) VALUES ($1, $2, now()) ON CONFLICT DO NOTHING "#, user_id, idempotency_key.as_ref() ); let n_inserted_rows = transaction.execute(query).await?.rows_affected(); if n_inserted_rows > 0 { Ok(NextAction::StartProcessing(transaction)) } else { let saved_response = get_saved_response(connection_pool, idempotency_key, user_id) .await? .ok_or_else(|| anyhow::anyhow!("Could not find saved response."))?; Ok(NextAction::ReturnSavedResponse(saved_response)) } }