Database worker

Worker used to clean up pending subscriptions and old idempotency
records
This commit is contained in:
Alphonse Paix
2025-10-10 15:10:01 +02:00
parent 7affe88d50
commit 5d5f9ec765
5 changed files with 70 additions and 6 deletions

58
src/database_worker.rs Normal file
View File

@@ -0,0 +1,58 @@
use anyhow::Context;
use sqlx::{
PgPool,
postgres::{PgConnectOptions, PgPoolOptions},
};
use std::time::Duration;
pub async fn run_until_stopped(configuration: PgConnectOptions) -> Result<(), anyhow::Error> {
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration);
worker_loop(connection_pool).await
}
async fn worker_loop(connection_pool: PgPool) -> Result<(), anyhow::Error> {
loop {
if let Err(e) = clean_pending_subscriptions(&connection_pool).await {
tracing::error!("{:?}", e);
}
if let Err(e) = clean_idempotency_keys(&connection_pool).await {
tracing::error!("{:?}", e);
}
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
async fn clean_pending_subscriptions(connection_pool: &PgPool) -> Result<(), anyhow::Error> {
let result = sqlx::query!(
"
DELETE FROM subscriptions
WHERE status = 'pending_confirmation'
AND subscribed_at < NOW() - INTERVAL '24 hours'
"
)
.execute(connection_pool)
.await
.context("Failed to clean up subscriptions table.")?;
match result.rows_affected() {
n if n > 0 => tracing::info!("Cleaned up {} expired subscriptions.", n),
_ => (),
}
Ok(())
}
async fn clean_idempotency_keys(connection_pool: &PgPool) -> Result<(), anyhow::Error> {
let result = sqlx::query!(
"
DELETE FROM idempotency
WHERE created_at < NOW() - INTERVAL '1 hour'
"
)
.execute(connection_pool)
.await
.context("Failed to clean up idempontency table.")?;
match result.rows_affected() {
n if n > 0 => tracing::info!("Cleaned up {} old idempotency records.", n),
_ => (),
}
Ok(())
}

View File

@@ -7,7 +7,7 @@ use std::time::Duration;
use tracing::{Span, field::display};
use uuid::Uuid;
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
pub async fn run_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
let email_client = EmailClient::build(configuration.email_client).unwrap();
worker_loop(connection_pool, email_client).await

View File

@@ -1,5 +1,6 @@
pub mod authentication;
pub mod configuration;
pub mod database_worker;
pub mod domain;
pub mod email_client;
pub mod idempotency;

View File

@@ -1,6 +1,6 @@
use zero2prod::{
configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped,
startup::Application, telemetry::init_subscriber,
configuration::get_configuration, database_worker, issue_delivery_worker, startup::Application,
telemetry::init_subscriber,
};
#[tokio::main]
@@ -11,11 +11,16 @@ async fn main() -> Result<(), anyhow::Error> {
let application = Application::build(configuration.clone()).await?;
let application_task = tokio::spawn(application.run_until_stopped());
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
let database_worker_task = tokio::spawn(database_worker::run_until_stopped(
configuration.database.with_db(),
));
let delivery_worker_task =
tokio::spawn(issue_delivery_worker::run_until_stopped(configuration));
tokio::select! {
_ = application_task => {},
_ = worker_task => {},
_ = database_worker_task => {},
_ = delivery_worker_task => {},
};
Ok(())

View File

@@ -21,7 +21,7 @@ where
)
.with(
tracing_subscriber::fmt::layer()
.compact()
.pretty()
.with_writer(sink)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE),
)