use anyhow::Context; use sqlx::{ PgPool, postgres::{PgConnectOptions, PgPoolOptions}, }; use std::time::Duration; pub async fn run_until_stopped(configuration: PgConnectOptions) -> Result<(), anyhow::Error> { let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration); worker_loop(connection_pool).await } async fn worker_loop(connection_pool: PgPool) -> Result<(), anyhow::Error> { loop { if let Err(e) = clean_pending_subscriptions(&connection_pool).await { tracing::error!("{:?}", e); } if let Err(e) = clean_idempotency_keys(&connection_pool).await { tracing::error!("{:?}", e); } tokio::time::sleep(Duration::from_secs(60)).await; } } async fn clean_pending_subscriptions(connection_pool: &PgPool) -> Result<(), anyhow::Error> { let result = sqlx::query!( " DELETE FROM subscriptions WHERE status = 'pending_confirmation' AND subscribed_at < NOW() - INTERVAL '24 hours' " ) .execute(connection_pool) .await .context("Failed to clean up subscriptions table.")?; match result.rows_affected() { n if n > 0 => tracing::info!("Cleaned up {} expired subscriptions.", n), _ => (), } Ok(()) } async fn clean_idempotency_keys(connection_pool: &PgPool) -> Result<(), anyhow::Error> { let result = sqlx::query!( " DELETE FROM idempotency WHERE created_at < NOW() - INTERVAL '1 hour' " ) .execute(connection_pool) .await .context("Failed to clean up idempontency table.")?; match result.rows_affected() { n if n > 0 => tracing::info!("Cleaned up {} old idempotency records.", n), _ => (), } Ok(()) }