diff --git a/src/database_worker.rs b/src/database_worker.rs new file mode 100644 index 0000000..290bc71 --- /dev/null +++ b/src/database_worker.rs @@ -0,0 +1,58 @@ +use anyhow::Context; +use sqlx::{ + PgPool, + postgres::{PgConnectOptions, PgPoolOptions}, +}; +use std::time::Duration; + +pub async fn run_until_stopped(configuration: PgConnectOptions) -> Result<(), anyhow::Error> { + let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration); + worker_loop(connection_pool).await +} + +async fn worker_loop(connection_pool: PgPool) -> Result<(), anyhow::Error> { + loop { + if let Err(e) = clean_pending_subscriptions(&connection_pool).await { + tracing::error!("{:?}", e); + } + if let Err(e) = clean_idempotency_keys(&connection_pool).await { + tracing::error!("{:?}", e); + } + tokio::time::sleep(Duration::from_secs(60)).await; + } +} + +async fn clean_pending_subscriptions(connection_pool: &PgPool) -> Result<(), anyhow::Error> { + let result = sqlx::query!( + " + DELETE FROM subscriptions + WHERE status = 'pending_confirmation' + AND subscribed_at < NOW() - INTERVAL '24 hours' + " + ) + .execute(connection_pool) + .await + .context("Failed to clean up subscriptions table.")?; + match result.rows_affected() { + n if n > 0 => tracing::info!("Cleaned up {} expired subscriptions.", n), + _ => (), + } + Ok(()) +} + +async fn clean_idempotency_keys(connection_pool: &PgPool) -> Result<(), anyhow::Error> { + let result = sqlx::query!( + " + DELETE FROM idempotency + WHERE created_at < NOW() - INTERVAL '1 hour' + " + ) + .execute(connection_pool) + .await + .context("Failed to clean up idempontency table.")?; + match result.rows_affected() { + n if n > 0 => tracing::info!("Cleaned up {} old idempotency records.", n), + _ => (), + } + Ok(()) +} diff --git a/src/issue_delivery_worker.rs b/src/issue_delivery_worker.rs index ffe1168..e62c0f6 100644 --- a/src/issue_delivery_worker.rs +++ b/src/issue_delivery_worker.rs @@ -7,7 +7,7 @@ use std::time::Duration; use tracing::{Span, field::display}; use uuid::Uuid; -pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { +pub async fn run_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> { let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db()); let email_client = EmailClient::build(configuration.email_client).unwrap(); worker_loop(connection_pool, email_client).await diff --git a/src/lib.rs b/src/lib.rs index 3a942e2..89290b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod authentication; pub mod configuration; +pub mod database_worker; pub mod domain; pub mod email_client; pub mod idempotency; diff --git a/src/main.rs b/src/main.rs index f273c1e..51feefe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use zero2prod::{ - configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped, - startup::Application, telemetry::init_subscriber, + configuration::get_configuration, database_worker, issue_delivery_worker, startup::Application, + telemetry::init_subscriber, }; #[tokio::main] @@ -11,11 +11,16 @@ async fn main() -> Result<(), anyhow::Error> { let application = Application::build(configuration.clone()).await?; let application_task = tokio::spawn(application.run_until_stopped()); - let worker_task = tokio::spawn(run_worker_until_stopped(configuration)); + let database_worker_task = tokio::spawn(database_worker::run_until_stopped( + configuration.database.with_db(), + )); + let delivery_worker_task = + tokio::spawn(issue_delivery_worker::run_until_stopped(configuration)); tokio::select! { _ = application_task => {}, - _ = worker_task => {}, + _ = database_worker_task => {}, + _ = delivery_worker_task => {}, }; Ok(()) diff --git a/src/telemetry.rs b/src/telemetry.rs index c5b497b..4fe0894 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -21,7 +21,7 @@ where ) .with( tracing_subscriber::fmt::layer() - .compact() + .pretty() .with_writer(sink) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE), )