Compare commits

...

27 Commits

Author SHA1 Message Date
Alphonse Paix
0a2c4a32c1 Update README
Some checks failed
Rust / Test (push) Has been cancelled
Rust / Rustfmt (push) Has been cancelled
Rust / Clippy (push) Has been cancelled
Rust / Code coverage (push) Has been cancelled
2025-09-16 16:53:45 +02:00
Alphonse Paix
a9c6cb36a5 Update config to use self-hosted email server 2025-09-14 19:29:02 +00:00
Alphonse Paix
ba6b2dbd93 Better datastores security
Some checks failed
Rust / Test (push) Has been cancelled
Rust / Rustfmt (push) Has been cancelled
Rust / Clippy (push) Has been cancelled
Rust / Code coverage (push) Has been cancelled
Localhost only exposed ports and stronger password for Postgres local
environment instance
2025-09-07 12:46:27 +02:00
Alphonse Paix
1ec51f0caf Update workflow to include redis 2025-09-05 19:23:11 +02:00
Alphonse Paix
54b0512f3f Use env vars for TLS files 2025-09-05 18:27:46 +02:00
Alphonse Paix
8d6cab41d0 Support for TLS encryption 2025-09-05 18:13:35 +02:00
Alphonse Paix
a4104ca1b2 Register form and confirmation messages 2025-09-04 23:39:53 +02:00
Alphonse Paix
f8dee295cd Fault-tolerant delivery system 2025-09-04 02:54:49 +02:00
Alphonse Paix
9a184b93ac Authentication and form for newsletter publishing 2025-09-01 15:47:27 +02:00
Alphonse Paix
d96a401d99 Admin dashboard and sessions 2025-09-01 03:08:43 +02:00
Alphonse Paix
3dce578ba0 Flash messages using axum-messages 2025-08-30 01:39:12 +02:00
Alphonse Paix
8447d050d6 Handler to send emails to confirmed subscribers 2025-08-27 12:14:11 +02:00
Alphonse Paix
9193f2020d Error handling with thiserror and anyhow 2025-08-26 12:47:22 +02:00
Alphonse Paix
4ce25a8136 Fix send email request body 2025-08-25 18:51:37 +02:00
Alphonse Paix
dfd3300371 Query metadata 2025-08-25 17:52:44 +02:00
Alphonse Paix
d1cf1f6c4f Confirm subscription endpoint 2025-08-25 17:46:03 +02:00
Alphonse Paix
73ff7c04fe Query metadata, migrations and formatting 2025-08-24 12:45:32 +02:00
Alphonse Paix
954772e9db Set status to 'confirmed' for new subscribers 2025-08-24 11:43:08 +02:00
Alphonse Paix
4389873bf4 Email client, application startup logic and tests 2025-08-24 11:31:03 +02:00
Alphonse Paix
85ab04f254 Parse data from incoming request 2025-08-23 11:13:57 +02:00
Alphonse Paix
4d049a744a Fix bug when reading environment variables 2025-08-22 16:29:11 +02:00
Alphonse Paix
a7473bb7f5 Environment variables at runtime to connect to database 2025-08-22 16:01:20 +02:00
Alphonse Paix
1567f94b1f Docker for deployment 2025-08-22 14:25:34 +02:00
Alphonse Paix
59817083eb Update GitHub workflow 2025-08-22 08:44:02 +02:00
Alphonse Paix
b280f10c40 Fix incorrect database query in test suite 2025-08-22 08:28:22 +02:00
Alphonse Paix
5cc5758097 Telemetry 2025-08-22 08:14:59 +02:00
Alphonse Paix
ded2a611e2 Database connection and user registration 2025-08-21 15:38:12 +02:00
83 changed files with 7192 additions and 21 deletions

3
.cargo/config.toml Normal file
View File

@@ -0,0 +1,3 @@
[target.x86_64-unknown-linux-gnu]
linker = "clang"
rustflags = ["-C", "link-arg=-fuse-ld=/usr/bin/mold"]

6
.dockerignore Normal file
View File

@@ -0,0 +1,6 @@
/target
.env
/tests
Dockerfile
/scripts
/migrations

1
.env Normal file
View File

@@ -0,0 +1 @@
DATABASE_URL="postgres://postgres:Jq09NF6Y8ZXJS4jd9c8U@localhost:5432/newsletter"

173
.github/workflows/general.yml vendored Normal file
View File

@@ -0,0 +1,173 @@
# The name of your workflow. GitHub displays the names of your workflows on your repository's "Actions" tab
name: Rust
# To automatically trigger the workflow
on:
# NB: this differs from the book's project!
# These settings allow us to run this specific CI pipeline for PRs against
# this specific branch (a.k.a. book chapter).
push:
branches:
- main
pull_request:
types: [opened, synchronize, reopened]
branches:
- main
env:
CARGO_TERM_COLOR: always
SQLX_VERSION: 0.8.6
SQLX_FEATURES: "rustls,postgres"
APP_USER: app
APP_USER_PWD: secret
APP_DB_NAME: newsletter
# A workflow run is made up of one or more jobs, which run in parallel by default
# Each job runs in a runner environment specified by runs-on
jobs:
# Unique identifier of our job (`job_id`)
test:
# Sets the name `Test` for the job, which is displayed in the GitHub UI
name: Test
# Containers must run in Linux based operating systems
runs-on: ubuntu-latest
# Service containers to run alongside the `test` container job
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Environment variables scoped only for the `postgres` element
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: postgres
# When you map ports using the ports keyword, GitHub uses the --publish command to publish the containers ports to the Docker host
# Opens tcp port 5432 on the host and service container
ports:
- 5432:5432
redis:
image: redis:7
ports:
- 6379:6379
steps:
# Downloads a copy of the code in your repository before running CI tests
- name: Check out repository code
# The uses keyword specifies that this step will run v4 of the actions/checkout action.
# This is an action that checks out your repository onto the runner, allowing you to run scripts or other actions against your code (such as build and test tools).
# You should use the checkout action any time your workflow will run against the repository's code.
uses: actions/checkout@v4
# This GitHub Action installs a Rust toolchain using rustup. It is designed for one-line concise usage and good defaults.
# It also takes care of caching intermediate build artifacts.
- name: Install the Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
- name: Install sqlx-cli
run: cargo install sqlx-cli
--version=${{ env.SQLX_VERSION }}
--features ${{ env.SQLX_FEATURES }}
--no-default-features
--locked
- name: Create app user in Postgres
run: |
sudo apt-get install postgresql-client
# Create the application user
CREATE_QUERY="CREATE USER ${APP_USER} WITH PASSWORD '${APP_USER_PWD}';"
PGPASSWORD="password" psql -U "postgres" -h "localhost" -c "${CREATE_QUERY}"
# Grant create db privileges to the app user
GRANT_QUERY="ALTER USER ${APP_USER} CREATEDB;"
PGPASSWORD="password" psql -U "postgres" -h "localhost" -c "${GRANT_QUERY}"
- name: Migrate database
run: SKIP_DOCKER=true ./scripts/init_db.sh
- name: Run tests
run: cargo test
- name: Check that queries are fresh
run: cargo sqlx prepare --check --workspace
# `fmt` container job
fmt:
name: Rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install the Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt
- name: Enforce formatting
run: cargo fmt --check
# `clippy` container job
clippy:
name: Clippy
runs-on: ubuntu-latest
env:
# This environment variable forces sqlx to use its offline mode,
# which means that it will not attempt to connect to a database
# when running the tests. It'll instead use the cached query results.
# We check that the cached query results are up-to-date in another job,
# to speed up the overall CI pipeline.
# This will all be covered in detail in chapter 5.
SQLX_OFFLINE: true
steps:
- uses: actions/checkout@v4
- name: Install the Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: clippy
- name: Linting
run: cargo clippy -- -D warnings
# `coverage` container job
coverage:
name: Code coverage
runs-on: ubuntu-latest
services:
postgres:
image: postgres
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: postgres
ports:
- 5432:5432
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Install the Rust toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: llvm-tools-preview
- name: Install sqlx-cli
run: cargo install sqlx-cli
--version=${{ env.SQLX_VERSION }}
--features ${{ env.SQLX_FEATURES }}
--no-default-features
--locked
- name: Create app user in Postgres
run: |
sudo apt-get install postgresql-client
# Create the application user
CREATE_QUERY="CREATE USER ${APP_USER} WITH PASSWORD '${APP_USER_PWD}';"
PGPASSWORD="password" psql -U "postgres" -h "localhost" -c "${CREATE_QUERY}"
# Grant create db privileges to the app user
GRANT_QUERY="ALTER USER ${APP_USER} CREATEDB;"
PGPASSWORD="password" psql -U "postgres" -h "localhost" -c "${GRANT_QUERY}"
- name: Migrate database
run: SKIP_DOCKER=true ./scripts/init_db.sh
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info
- name: Generate report
run: cargo llvm-cov report --html --output-dir coverage
- uses: actions/upload-artifact@v4
with:
name: "Coverage report"
path: coverage/

View File

@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT newsletter_issue_id, subscriber_email\n FROM issue_delivery_queue\n FOR UPDATE\n SKIP LOCKED\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "newsletter_issue_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "subscriber_email",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "06f83a51e9d2ca842dc0d6947ad39d9be966636700de58d404d8e1471a260c9a"
}

View File

@@ -0,0 +1,41 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE idempotency\n SET\n response_status_code = $3,\n response_headers = $4,\n response_body = $5\n WHERE\n user_id = $1\n AND idempotency_key = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Int2",
{
"Custom": {
"name": "header_pair[]",
"kind": {
"Array": {
"Custom": {
"name": "header_pair",
"kind": {
"Composite": [
[
"name",
"Text"
],
[
"value",
"Bytea"
]
]
}
}
}
}
}
},
"Bytea"
]
},
"nullable": []
},
"hash": "0851bf5e8d147f0ace037c6f434bcc4e04d330e3c4259ef8c8097e61f77b64e2"
}

View File

@@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT title, text_content, html_content\n FROM newsletter_issues\n WHERE newsletter_issue_id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "title",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "text_content",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "html_content",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "43116d4e670155129aa69a7563ddc3f7d01ef3689bb8de9ee1757b401ad95b46"
}

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO newsletter_issues (\n newsletter_issue_id, title, text_content, html_content, published_at\n )\n VALUES ($1, $2, $3, $4, now())\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "605c5893a2a89a84c201a6a2ae52a3c00cb4db064a52ea9f198c24de4b877ba2"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO issue_delivery_queue (\n newsletter_issue_id,\n subscriber_email\n )\n SELECT $1, email\n FROM subscriptions\n WHERE status = 'confirmed'\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "9bfa261067713ca31b191c9f9bcf19ae0dd2d12a570ce06e8e2abd72c5d7b42d"
}

View File

@@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE subscriptions SET status = 'confirmed' WHERE id = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "a71a1932b894572106460ca2e34a63dc0cb8c1ba7a70547add1cddbb68133c2b"
}

View File

@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT user_id, password_hash\n FROM users\n WHERE username = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "password_hash",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false
]
},
"hash": "acf1b96c82ddf18db02e71a0e297c822b46f10add52c54649cf599b883165e58"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT subscriber_id FROM subscription_tokens WHERE subscription_token = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "subscriber_id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "ad120337ee606be7b8d87238e2bb765d0da8ee61b1a3bc142414c4305ec5e17f"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n DELETE FROM issue_delivery_queue\n WHERE\n newsletter_issue_id = $1\n AND subscriber_email = $2\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "b399033752641396cfe752e930e073765335a6c6e84935f60f4918576b47c249"
}

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO subscriptions (id, email, name, subscribed_at, status)\n VALUES ($1, $2, $3, $4, 'pending_confirmation')\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text",
"Timestamptz"
]
},
"nullable": []
},
"hash": "e6822c9e162eabc20338cc27d51a8e80578803ec1589c234d93c3919d14a96a6"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE users SET password_hash = $1 WHERE user_id = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "eae27786a7c81ee2199fe3d5c10ac52c8067c61d6992f8f5045b908eb73bab8b"
}

View File

@@ -0,0 +1,58 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n response_status_code as \"response_status_code!\",\n response_headers as \"response_headers!: Vec<HeaderPairRecord>\",\n response_body as \"response_body!\"\n FROM idempotency\n WHERE\n user_id = $1\n AND idempotency_key = $2\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "response_status_code!",
"type_info": "Int2"
},
{
"ordinal": 1,
"name": "response_headers!: Vec<HeaderPairRecord>",
"type_info": {
"Custom": {
"name": "header_pair[]",
"kind": {
"Array": {
"Custom": {
"name": "header_pair",
"kind": {
"Composite": [
[
"name",
"Text"
],
[
"value",
"Bytea"
]
]
}
}
}
}
}
}
},
{
"ordinal": 2,
"name": "response_body!",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": [
true,
true,
true
]
},
"hash": "ed9f14ed1476ef5a9dc8b7aabf38fd31e127e2a6246d5a14f4ef624f0302eac8"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO idempotency (user_id, idempotency_key, created_at)\n VALUES ($1, $2, now())\n ON CONFLICT DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "f007c2d5d9ae67a2412c6a70a2228390c5bd4835fcf71fd17a00fe521b43415d"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO subscription_tokens (subscription_token, subscriber_id)\n VALUES ($1, $2)\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "fa625c0844ec26b7f59ce885d6fe0b9a4f4676946706cb926c21da6ab1b89d90"
}

3341
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,64 @@
name = "zero2prod"
version = "0.1.0"
edition = "2024"
resolver = "2"
[lib]
path = "src/lib.rs"
[[bin]]
path = "src/main.rs"
name = "zero2prod"
[dependencies]
axum = "0.8.4"
tokio = { version = "1.47.1", features = ["rt-multi-thread"] }
anyhow = "1.0.99"
argon2 = { version = "0.5.3", features = ["std"] }
axum = { version = "0.8.4", features = ["macros"] }
axum-extra = { version = "0.10.1", features = ["query", "cookie"] }
axum-messages = "0.8.0"
axum-server = { version = "0.7.2", features = ["tls-rustls-no-provider"] }
base64 = "0.22.1"
chrono = { version = "0.4.41", default-features = false, features = ["clock"] }
config = "0.15.14"
htmlescape = "0.3.1"
rand = { version = "0.9.2", features = ["std_rng"] }
reqwest = { version = "0.12.23", default-features = false, features = [
"rustls-tls",
"json",
"cookies",
] }
secrecy = { version = "0.10.3", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }
serde-aux = "4.7.0"
sha3 = "0.10.8"
sqlx = { version = "0.8.6", features = [
"runtime-tokio-rustls",
"macros",
"postgres",
"uuid",
"chrono",
"migrate",
] }
thiserror = "2.0.16"
tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] }
tower-http = { version = "0.6.6", features = ["trace"] }
tower-sessions = "0.14.0"
tower-sessions-redis-store = "0.16.0"
tracing = "0.1.41"
tracing-bunyan-formatter = "0.3.10"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
unicode-segmentation = "1.12.0"
urlencoding = "2.1.3"
uuid = { version = "1.18.0", features = ["v4", "serde"] }
validator = { version = "0.20.0", features = ["derive"] }
[dev-dependencies]
claims = "0.8.0"
fake = "4.4.0"
linkify = "0.10.0"
once_cell = "1.21.3"
quickcheck = "1.0.3"
quickcheck_macros = "1.1.0"
serde_json = "1.0.143"
serde_urlencoded = "0.7.1"
wiremock = "0.6.4"

27
Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
FROM lukemathwalker/cargo-chef:latest-rust-1.89.0 AS chef
WORKDIR /app
FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
RUN apt update -y \
&& apt install -y --no-install-recommends clang mold
RUN cargo chef cook --release --recipe-path recipe.json
COPY . .
ENV SQLX_OFFLINE=true
RUN cargo build --release --bin zero2prod
FROM debian:bookworm-slim AS runtime
WORKDIR /app
RUN apt update -y \
&& apt install -y --no-install-recommends openssl ca-certificates \
&& apt autoremove -y \
&& apt clean -y \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/zero2prod zero2prod
COPY configuration configuration
ENV APP_ENVIRONMENT=production
ENTRYPOINT [ "./zero2prod" ]

23
README.md Normal file
View File

@@ -0,0 +1,23 @@
# zero2prod
## Packages
```
sudo apt install postgresql-client
sudo apt install pkg-config
sudo apt install libssl-dev
cargo install sqlx-cli --no-default-features --features rustls,postgres
```
## Documentation
- [axum](https://docs.rs/axum/latest/axum/) + [examples](https://github.com/tokio-rs/axum/tree/main/examples)
- [Tailwind CSS](tailwindcss.com)
- [htmx](htmx.org)
- [Rust](rust-lang.org)
## Repositories
- [Book repository](https://github.com/LukeMathWalker/zero-to-production)
- [Gitea](https://gitea.alphonsepaix.xyz/alphonse/zero2prod.git)
- [GitHub](https://github.com/alphonsepaix/zero2prod.git)

4
configuration/base.yaml Normal file
View File

@@ -0,0 +1,4 @@
email_client:
timeout_milliseconds: 10000
base_url: "https://api.alphonsepaix.xyz"
sender_email: "newsletter@alphonsepaix.xyz"

15
configuration/local.yaml Normal file
View File

@@ -0,0 +1,15 @@
application:
port: 8000
host: "127.0.0.1"
base_url: "http://127.0.0.1:8000"
database:
host: "127.0.0.1"
port: 5432
database_name: "newsletter"
username: "postgres"
password: "Jq09NF6Y8ZXJS4jd9c8U"
require_ssl: false
email_client:
authorization_token: "secret-token"
redis_uri: "redis://127.0.0.1:6379"
require_tls: false

View File

@@ -0,0 +1,2 @@
application:
host: "0.0.0.0"

View File

@@ -0,0 +1,7 @@
CREATE TABLE subscriptions (
id UUID NOT NULL,
email TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
subscribed_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (id)
);

View File

@@ -0,0 +1 @@
ALTER TABLE subscriptions ADD COLUMN status TEXT NULL;

View File

@@ -0,0 +1,4 @@
BEGIN;
UPDATE subscriptions SET status = 'confirmed' WHERE status IS NULL;
ALTER TABLE subscriptions ALTER COLUMN status SET NOT NULL;
COMMIT;

View File

@@ -0,0 +1,5 @@
CREATE TABLE subscription_tokens (
subscription_token TEXT NOT NULL,
subscriber_id UUID NOT NULL REFERENCES subscriptions (id),
PRIMARY KEY (subscription_token)
);

View File

@@ -0,0 +1,5 @@
CREATE TABLE users (
user_id UUID PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
password TEXT NOT NULL
);

View File

@@ -0,0 +1 @@
ALTER TABLE users RENAME password TO password_hash;

View File

@@ -0,0 +1 @@
ALTER TABLE users ADD COLUMN salt TEXT NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE users DROP COLUMN salt;

View File

@@ -0,0 +1,6 @@
INSERT INTO users (user_id, username, password_hash)
VALUES (
'd2492680-6e45-4179-b369-1439b8f22051',
'admin',
'$argon2id$v=19$m=19456,t=2,p=1$oWy180x7KxJYiTHzoN3sVw$vTgzvEqACiXjGalYUJHgb329Eb+s6wu5r+Cw8dHR5YE'
);

View File

@@ -0,0 +1,14 @@
CREATE TYPE header_pair AS (
name TEXT,
value BYTEA
);
CREATE TABLE idempotency (
user_id UUID NOT NULL REFERENCES users (user_id),
idempotency_key TEXT NOT NULL,
response_status_code SMALLINT NOT NULL,
response_headers header_pair[] NOT NULL,
response_body BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (user_id, idempotency_key)
);

View File

@@ -0,0 +1,3 @@
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;

View File

@@ -0,0 +1,8 @@
CREATE TABLE newsletter_issues (
newsletter_issue_id UUID NOT NULL,
title TEXT NOT NULL,
text_content TEXT NOT NULL,
html_content TEXT NOT NULL,
published_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (newsletter_issue_id)
);

View File

@@ -0,0 +1,6 @@
CREATE TABLE issue_delivery_queue (
newsletter_issue_id UUID NOT NULL
REFERENCES newsletter_issues (newsletter_issue_id),
subscriber_email TEXT NOT NULL,
PRIMARY KEY (newsletter_issue_id, subscriber_email)
);

45
scripts/init_db.sh Executable file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env bash
set -x
set -eo pipefail
if ! [ -x "$(command -v psql)" ]; then
echo >&2 "Error: psql is not installed."
exit 1
fi
if ! [ -x "$(command -v sqlx)" ]; then
echo >&2 "Error: sqlx is not installed."
exit 1
fi
DB_USER="${POSTGRES_USER:=postgres}"
DB_PASSWORD="${POSTGRES_PASSWORD:=Jq09NF6Y8ZXJS4jd9c8U}"
DB_NAME="${POSTGRES_DB:=newsletter}"
DB_PORT="${POSTGRES_PORT:=5432}"
DB_HOST="${POSTGRES_HOST:=localhost}"
if [[ -z "${SKIP_DOCKER}" ]]; then
docker run \
-e POSTGRES_USER=${DB_USER} \
-e POSTGRES_PASSWORD=${DB_PASSWORD} \
-e POSTGRES_DB=${DB_NAME} \
-p "127.0.0.1:${DB_PORT}":5432 \
-d postgres \
postgres -N 1000
fi
export PGPASSWORD="${DB_PASSWORD}"
until psql -h "${DB_HOST}" -U "${DB_USER}" -p "${DB_PORT}" -d "postgres" -c '\q'; do
>&2 echo "Postgres is still unavailable - sleeping"
sleep 1
done
>&2 echo "Postgres is up and running on port ${DB_PORT} - running migrations now!"
DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}
export DATABASE_URL
sqlx database create
sqlx migrate run
>&2 echo "Postgres has been migrated, ready to go!"

18
scripts/init_redis.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/usr/bin/env bash
set -x
set -eo pipefail
RUNNING_CONTAINER=$(docker ps --filter 'name=redis' --format '{{.ID}}')
if [[ -n $RUNNING_CONTAINER ]]; then
echo >&2 "A redis container is already running (${RUNNING_CONTAINER})."
exit 1
fi
docker run \
-p "127.0.0.1:6379:6379" \
-d \
--name "redis_$(date '+%s')" \
redis
>&2 echo "Redis is ready to go!"

163
src/authentication.rs Normal file
View File

@@ -0,0 +1,163 @@
use crate::{
routes::AdminError, session_state::TypedSession, telemetry::spawn_blocking_with_tracing,
};
use anyhow::Context;
use argon2::{
Algorithm, Argon2, Params, PasswordHash, PasswordHasher, PasswordVerifier, Version,
password_hash::{SaltString, rand_core::OsRng},
};
use axum::{extract::Request, middleware::Next, response::Response};
use secrecy::{ExposeSecret, SecretString};
use sqlx::PgPool;
use uuid::Uuid;
pub struct Credentials {
pub username: String,
pub password: SecretString,
}
#[derive(Debug, thiserror::Error)]
pub enum AuthError {
#[error(transparent)]
UnexpectedError(#[from] anyhow::Error),
#[error("Invalid credentials.")]
InvalidCredentials(#[source] anyhow::Error),
#[error("Not authenticated.")]
NotAuthenticated,
}
#[tracing::instrument(name = "Change password", skip(password, connection_pool))]
pub async fn change_password(
user_id: Uuid,
password: SecretString,
connection_pool: &PgPool,
) -> Result<(), anyhow::Error> {
let password_hash = spawn_blocking_with_tracing(move || compute_pasword_hash(password))
.await?
.context("Failed to hash password")?;
sqlx::query!(
"UPDATE users SET password_hash = $1 WHERE user_id = $2",
password_hash.expose_secret(),
user_id
)
.execute(connection_pool)
.await
.context("Failed to update user password in the database.")?;
Ok(())
}
fn compute_pasword_hash(password: SecretString) -> Result<SecretString, anyhow::Error> {
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::new(
Algorithm::Argon2id,
Version::V0x13,
Params::new(1500, 2, 1, None).unwrap(),
)
.hash_password(password.expose_secret().as_bytes(), &salt)?
.to_string();
Ok(SecretString::from(password_hash))
}
#[tracing::instrument(
name = "Validate credentials",
skip(username, password, connection_pool)
)]
pub async fn validate_credentials(
Credentials { username, password }: Credentials,
connection_pool: &PgPool,
) -> Result<Uuid, AuthError> {
let mut user_id = None;
let mut expected_password_hash = SecretString::from(
"$argon2id$v=19$m=15000,t=2,p=1$\
gZiV/M1gPc22ElAH/Jh1Hw$\
CWOrkoo7oJBQ/iyh7uJ0LO2aLEfrHwTWllSAxT0zRno"
.to_string(),
);
if let Some((stored_user_id, stored_expected_password_hash)) =
get_stored_credentials(&username, connection_pool)
.await
.map_err(AuthError::UnexpectedError)?
{
user_id = Some(stored_user_id);
expected_password_hash = stored_expected_password_hash;
}
spawn_blocking_with_tracing(|| verify_password_hash(expected_password_hash, password))
.await
.context("Failed to spawn blocking task.")
.map_err(AuthError::UnexpectedError)??;
user_id
.ok_or_else(|| anyhow::anyhow!("Unknown username."))
.map_err(AuthError::InvalidCredentials)
}
#[tracing::instrument(
name = "Verify password",
skip(expected_password_hash, password_candidate)
)]
fn verify_password_hash(
expected_password_hash: SecretString,
password_candidate: SecretString,
) -> Result<(), AuthError> {
let expected_password_hash = PasswordHash::new(expected_password_hash.expose_secret())
.context("Failed to parse hash in PHC string format.")?;
Argon2::default()
.verify_password(
password_candidate.expose_secret().as_bytes(),
&expected_password_hash,
)
.context("Password verification failed.")
.map_err(AuthError::InvalidCredentials)
}
#[tracing::instrument(name = "Get stored credentials", skip(username, connection_pool))]
async fn get_stored_credentials(
username: &str,
connection_pool: &PgPool,
) -> Result<Option<(Uuid, SecretString)>, anyhow::Error> {
let row = sqlx::query!(
r#"
SELECT user_id, password_hash
FROM users
WHERE username = $1
"#,
username,
)
.fetch_optional(connection_pool)
.await
.context("Failed to perform a query to retrieve stored credentials.")?
.map(|row| (row.user_id, SecretString::from(row.password_hash)));
Ok(row)
}
pub async fn require_auth(
session: TypedSession,
mut request: Request,
next: Next,
) -> Result<Response, AdminError> {
let user_id = session
.get_user_id()
.await
.map_err(|e| AdminError::UnexpectedError(e.into()))?
.ok_or(AdminError::NotAuthenticated)?;
let username = session
.get_username()
.await
.map_err(|e| AdminError::UnexpectedError(e.into()))?
.ok_or(AdminError::UnexpectedError(anyhow::anyhow!(
"Could not find username in session."
)))?;
request
.extensions_mut()
.insert(AuthenticatedUser { user_id, username });
Ok(next.run(request).await)
}
#[derive(Clone)]
pub struct AuthenticatedUser {
pub user_id: Uuid,
pub username: String,
}

133
src/configuration.rs Normal file
View File

@@ -0,0 +1,133 @@
use crate::domain::SubscriberEmail;
use secrecy::{ExposeSecret, SecretString};
use serde::Deserialize;
use serde_aux::field_attributes::deserialize_number_from_string;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
pub fn get_configuration() -> Result<Settings, config::ConfigError> {
let base_path = std::env::current_dir().expect("Failed to determine the current directory");
let config_dir = base_path.join("configuration");
let environment: Environment = std::env::var("APP_ENVIRONMENT")
.unwrap_or_else(|_| "local".into())
.try_into()
.expect("Failed to parse APP_ENVIRONMENT");
let environment_filename = format!("{}.yaml", environment.as_str());
let settings = config::Config::builder()
.add_source(config::File::from(config_dir.join("base.yaml")))
.add_source(config::File::from(config_dir.join(environment_filename)))
.add_source(
config::Environment::with_prefix("APP")
.prefix_separator("_")
.separator("__"),
)
.build()?;
settings.try_deserialize::<Settings>()
}
pub enum Environment {
Local,
Production,
}
impl Environment {
pub fn as_str(&self) -> &str {
match self {
Environment::Local => "local",
Environment::Production => "production",
}
}
}
impl TryFrom<String> for Environment {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.to_lowercase().as_str() {
"local" => Ok(Environment::Local),
"production" => Ok(Environment::Production),
other => Err(format!(
"{} is not a supported environment. Use either `local` or `production`.",
other
)),
}
}
}
#[derive(Clone, Deserialize)]
pub struct Settings {
pub application: ApplicationSettings,
pub database: DatabaseSettings,
pub email_client: EmailClientSettings,
pub redis_uri: SecretString,
pub require_tls: bool,
}
#[derive(Clone, Deserialize)]
pub struct ApplicationSettings {
#[serde(deserialize_with = "deserialize_number_from_string")]
pub port: u16,
pub host: String,
pub base_url: String,
}
#[derive(Clone, Deserialize)]
pub struct EmailClientSettings {
pub base_url: String,
sender_email: String,
pub authorization_token: SecretString,
pub timeout_milliseconds: u64,
}
impl EmailClientSettings {
pub fn sender(&self) -> Result<SubscriberEmail, String> {
SubscriberEmail::parse(self.sender_email.clone())
}
pub fn new(
base_url: String,
sender_email: String,
authorization_token: String,
timeout_milliseconds: u64,
) -> Self {
let authorization_token = SecretString::from(authorization_token);
Self {
base_url,
sender_email,
authorization_token,
timeout_milliseconds,
}
}
}
#[derive(Clone, Deserialize)]
pub struct DatabaseSettings {
pub username: String,
pub password: SecretString,
#[serde(deserialize_with = "deserialize_number_from_string")]
pub port: u16,
pub host: String,
pub database_name: String,
pub require_ssl: bool,
}
impl DatabaseSettings {
pub fn with_db(&self) -> PgConnectOptions {
self.without_db().database(&self.database_name)
}
pub fn without_db(&self) -> PgConnectOptions {
let ssl_mode = if self.require_ssl {
PgSslMode::Require
} else {
PgSslMode::Prefer
};
PgConnectOptions::new()
.host(&self.host)
.username(&self.username)
.password(self.password.expose_secret())
.port(self.port)
.ssl_mode(ssl_mode)
}
}

7
src/domain.rs Normal file
View File

@@ -0,0 +1,7 @@
mod new_subscriber;
mod subscriber_email;
mod subscriber_name;
pub use new_subscriber::NewSubscriber;
pub use subscriber_email::SubscriberEmail;
pub use subscriber_name::SubscriberName;

View File

@@ -0,0 +1,6 @@
use crate::domain::{SubscriberName, subscriber_email::SubscriberEmail};
pub struct NewSubscriber {
pub email: SubscriberEmail,
pub name: SubscriberName,
}

View File

@@ -0,0 +1,67 @@
use validator::Validate;
#[derive(Debug, Validate)]
pub struct SubscriberEmail {
#[validate(email)]
email: String,
}
impl SubscriberEmail {
pub fn parse(email: String) -> Result<Self, String> {
let subscriber_email = SubscriberEmail { email };
subscriber_email
.validate()
.map_err(|_| format!("{} is not a valid email.", subscriber_email.email))?;
Ok(subscriber_email)
}
}
impl AsRef<str> for SubscriberEmail {
fn as_ref(&self) -> &str {
self.email.as_str()
}
}
#[cfg(test)]
mod tests {
use super::SubscriberEmail;
use claims::assert_err;
use fake::Fake;
use fake::faker::internet::en::SafeEmail;
use fake::rand::SeedableRng;
use fake::rand::rngs::StdRng;
#[derive(Clone, Debug)]
struct ValidEmailFixture(pub String);
impl quickcheck::Arbitrary for ValidEmailFixture {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
let mut rng = StdRng::seed_from_u64(u64::arbitrary(g));
let email = SafeEmail().fake_with_rng(&mut rng);
Self(email)
}
}
#[test]
fn empty_string_is_rejected() {
let email = "".to_string();
assert_err!(SubscriberEmail::parse(email));
}
#[test]
fn email_missing_at_symbol_is_rejected() {
let email = "alphonse.paixoutlook.com".to_string();
assert_err!(SubscriberEmail::parse(email));
}
#[test]
fn email_missing_subject_is_rejected() {
let email = "@outlook.com".to_string();
assert_err!(SubscriberEmail::parse(email));
}
#[quickcheck_macros::quickcheck]
fn valid_emails_are_parsed_successfully(valid_email: ValidEmailFixture) -> bool {
SubscriberEmail::parse(valid_email.0).is_ok()
}
}

View File

@@ -0,0 +1,69 @@
use unicode_segmentation::UnicodeSegmentation;
#[derive(Debug)]
pub struct SubscriberName(String);
impl SubscriberName {
pub fn parse(s: String) -> Result<Self, String> {
let is_empty_or_whitespace = s.trim().is_empty();
let is_too_long = s.graphemes(true).count() > 256;
let forbidden_characters = ['/', '(', ')', '"', '<', '>', '\\', '{', '}'];
let contains_forbidden_characters = s.chars().any(|g| forbidden_characters.contains(&g));
if is_empty_or_whitespace || is_too_long || contains_forbidden_characters {
Err(format!("{} is not a valid subscriber name.", s))
} else {
Ok(Self(s))
}
}
}
impl AsRef<str> for SubscriberName {
fn as_ref(&self) -> &str {
self.0.as_str()
}
}
#[cfg(test)]
mod tests {
use crate::domain::SubscriberName;
use claims::{assert_err, assert_ok};
#[test]
fn a_256_grapheme_long_name_is_valid() {
let name = "ê".repeat(256);
assert_ok!(SubscriberName::parse(name));
}
#[test]
fn a_name_longer_than_256_graphemes_is_rejected() {
let name = "ê".repeat(257);
assert_err!(SubscriberName::parse(name));
}
#[test]
fn a_whitespace_only_name_is_rejected() {
let name = "\n \t ".to_string();
assert_err!(SubscriberName::parse(name));
}
#[test]
fn empty_string_is_rejected() {
let name = "".to_string();
assert_err!(SubscriberName::parse(name));
}
#[test]
fn a_name_containing_invalid_character_is_rejected() {
for name in ['/', '(', ')', '"', '<', '>', '\\', '{', '}'] {
let name = name.to_string();
assert_err!(SubscriberName::parse(name));
}
}
#[test]
fn a_valid_name_is_parsed_successfully() {
let name = "Alphonse".to_string();
assert_ok!(SubscriberName::parse(name));
}
}

207
src/email_client.rs Normal file
View File

@@ -0,0 +1,207 @@
use crate::{configuration::EmailClientSettings, domain::SubscriberEmail};
use reqwest::Client;
use secrecy::{ExposeSecret, SecretString};
use std::time::Duration;
pub struct EmailClient {
http_client: Client,
base_url: reqwest::Url,
sender: SubscriberEmail,
authorization_token: SecretString,
}
impl EmailClient {
pub fn build(config: EmailClientSettings) -> Result<Self, anyhow::Error> {
let client = Self {
http_client: Client::builder()
.timeout(Duration::from_millis(config.timeout_milliseconds))
.build()
.unwrap(),
base_url: reqwest::Url::parse(&config.base_url)?,
sender: config.sender().map_err(|e| anyhow::anyhow!(e))?,
authorization_token: config.authorization_token,
};
Ok(client)
}
pub async fn send_email(
&self,
recipient: &SubscriberEmail,
subject: &str,
html_content: &str,
text_content: &str,
) -> Result<(), reqwest::Error> {
let url = self.base_url.join("email").unwrap();
let request_body = SendEmailRequest {
from: EmailField {
email: self.sender.as_ref(),
},
to: vec![EmailField {
email: recipient.as_ref(),
}],
subject,
text: text_content,
html: html_content,
};
self.http_client
.post(url)
.header("X-Requested-With", "XMLHttpRequest")
.header(
"Authorization",
format!("Bearer {}", self.authorization_token.expose_secret()),
)
.json(&request_body)
.send()
.await?
.error_for_status()?;
Ok(())
}
}
#[derive(serde::Serialize)]
struct SendEmailRequest<'a> {
from: EmailField<'a>,
to: Vec<EmailField<'a>>,
subject: &'a str,
text: &'a str,
html: &'a str,
}
#[derive(serde::Serialize)]
struct EmailField<'a> {
email: &'a str,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{
configuration::EmailClientSettings, domain::SubscriberEmail, email_client::EmailClient,
};
use claims::{assert_err, assert_ok};
use fake::{
Fake, Faker,
faker::{
internet::en::SafeEmail,
lorem::en::{Paragraph, Sentence},
},
};
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{any, header, header_exists, method, path},
};
struct SendEmailBodyMatcher;
impl wiremock::Match for SendEmailBodyMatcher {
fn matches(&self, request: &wiremock::Request) -> bool {
let result: Result<serde_json::Value, _> = serde_json::from_slice(&request.body);
if let Ok(body) = result {
body.get("from").is_some()
&& body.get("to").is_some()
&& body.get("subject").is_some()
&& body.get("html").is_some()
&& body.get("text").is_some()
} else {
false
}
}
}
fn subject() -> String {
Sentence(1..2).fake()
}
fn content() -> String {
Paragraph(1..10).fake()
}
fn email() -> SubscriberEmail {
SubscriberEmail::parse(SafeEmail().fake()).unwrap()
}
fn email_client(base_url: String) -> EmailClient {
let sender_email = SafeEmail().fake();
let token: String = Faker.fake();
let settings = EmailClientSettings::new(base_url, sender_email, token, 200);
EmailClient::build(settings).unwrap()
}
#[tokio::test]
async fn send_email_sends_the_expected_request() {
let mock_server = MockServer::start().await;
let email_client = email_client(mock_server.uri());
Mock::given(header_exists("Authorization"))
.and(header("Content-Type", "application/json"))
.and(header("X-Requested-With", "XMLHttpRequest"))
.and(path("v1/email"))
.and(method("POST"))
.and(SendEmailBodyMatcher)
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
email_client
.send_email(&email(), &subject(), &content(), &content())
.await
.unwrap();
}
#[tokio::test]
async fn send_email_succeeds_if_the_server_returns_200() {
let mock_server = MockServer::start().await;
let email_client = email_client(mock_server.uri());
Mock::given(any())
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&mock_server)
.await;
let response = email_client
.send_email(&email(), &subject(), &content(), &content())
.await;
assert_ok!(response);
}
#[tokio::test]
async fn send_email_fails_if_the_server_retuns_500() {
let mock_server = MockServer::start().await;
let email_client = email_client(mock_server.uri());
Mock::given(any())
.respond_with(ResponseTemplate::new(500))
.expect(1)
.mount(&mock_server)
.await;
let response = email_client
.send_email(&email(), &subject(), &content(), &content())
.await;
assert_err!(response);
}
#[tokio::test]
async fn send_email_times_out_if_the_server_takes_too_long() {
let mock_server = MockServer::start().await;
let email_client = email_client(mock_server.uri());
Mock::given(any())
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(180)))
.expect(1)
.mount(&mock_server)
.await;
let response = email_client
.send_email(&email(), &subject(), &content(), &content())
.await;
assert_err!(response);
}
}

5
src/idempotency.rs Normal file
View File

@@ -0,0 +1,5 @@
mod key;
mod persistance;
pub use key::*;
pub use persistance::*;

28
src/idempotency/key.rs Normal file
View File

@@ -0,0 +1,28 @@
pub struct IdempotencyKey(String);
impl TryFrom<String> for IdempotencyKey {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
if value.is_empty() {
return Err("The idempotency key cannot be empty.".into());
}
let max_length = 50;
if value.len() >= max_length {
return Err("The idempotency key must be shorter than {max_length} characters.".into());
}
Ok(Self(value))
}
}
impl From<IdempotencyKey> for String {
fn from(value: IdempotencyKey) -> Self {
value.0
}
}
impl AsRef<str> for IdempotencyKey {
fn as_ref(&self) -> &str {
&self.0
}
}

View File

@@ -0,0 +1,128 @@
use crate::idempotency::IdempotencyKey;
use axum::{
body::{self, Body},
http::{HeaderName, HeaderValue},
response::{IntoResponse, Response},
};
use reqwest::StatusCode;
use sqlx::{Executor, PgPool, Postgres, Transaction};
use std::str::FromStr;
use uuid::Uuid;
#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
name: String,
value: Vec<u8>,
}
pub async fn get_saved_response(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<Option<Response>, anyhow::Error> {
let saved_response = sqlx::query!(
r#"
SELECT
response_status_code as "response_status_code!",
response_headers as "response_headers!: Vec<HeaderPairRecord>",
response_body as "response_body!"
FROM idempotency
WHERE
user_id = $1
AND idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref()
)
.fetch_optional(connection_pool)
.await?;
if let Some(r) = saved_response {
let status_code = StatusCode::from_u16(r.response_status_code.try_into()?)?;
let mut response = status_code.into_response();
for HeaderPairRecord { name, value } in r.response_headers {
response.headers_mut().insert(
HeaderName::from_str(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
*response.body_mut() = r.response_body.into();
Ok(Some(response))
} else {
Ok(None)
}
}
pub async fn save_response(
mut transaction: Transaction<'static, Postgres>,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
response: Response<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let status_code = response.status().as_u16() as i16;
let headers = response
.headers()
.into_iter()
.map(|(name, value)| HeaderPairRecord {
name: name.to_string(),
value: value.as_bytes().to_vec(),
})
.collect::<Vec<_>>();
let (response_head, body) = response.into_parts();
let body = body::to_bytes(body, usize::MAX).await?.to_vec();
let query = sqlx::query_unchecked!(
r#"
UPDATE idempotency
SET
response_status_code = $3,
response_headers = $4,
response_body = $5
WHERE
user_id = $1
AND idempotency_key = $2
"#,
user_id,
idempotency_key.as_ref(),
status_code,
headers,
&body,
);
transaction.execute(query).await?;
transaction.commit().await?;
let mut r = response_head.into_response();
*r.body_mut() = body.into();
Ok(r)
}
pub enum NextAction {
StartProcessing(Transaction<'static, Postgres>),
ReturnSavedResponse(Response),
}
pub async fn try_processing(
connection_pool: &PgPool,
idempotency_key: &IdempotencyKey,
user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
INSERT INTO idempotency (user_id, idempotency_key, created_at)
VALUES ($1, $2, now())
ON CONFLICT DO NOTHING
"#,
user_id,
idempotency_key.as_ref()
);
let n_inserted_rows = transaction.execute(query).await?.rows_affected();
if n_inserted_rows > 0 {
Ok(NextAction::StartProcessing(transaction))
} else {
let saved_response = get_saved_response(connection_pool, idempotency_key, user_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Could not find saved response."))?;
Ok(NextAction::ReturnSavedResponse(saved_response))
}
}

View File

@@ -0,0 +1,151 @@
use crate::{configuration::Settings, domain::SubscriberEmail, email_client::EmailClient};
use sqlx::{Executor, PgPool, Postgres, Row, Transaction, postgres::PgPoolOptions};
use std::time::Duration;
use tracing::{Span, field::display};
use uuid::Uuid;
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
let connection_pool = PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
let email_client = EmailClient::build(configuration.email_client).unwrap();
worker_loop(connection_pool, email_client).await
}
async fn worker_loop(
connection_pool: PgPool,
email_client: EmailClient,
) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&connection_pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => tokio::time::sleep(Duration::from_secs(10)).await,
Ok(ExecutionOutcome::TaskCompleted) => (),
Err(_) => tokio::time::sleep(Duration::from_secs(1)).await,
}
}
}
pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}
#[tracing::instrument(
skip_all,
fields(
newsletter_issue_id=tracing::field::Empty,
subscriber_email=tracing::field::Empty
),
err
)]
pub async fn try_execute_task(
connection_pool: &PgPool,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(connection_pool).await?;
if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}
let (transaction, issue_id, email) = task.unwrap();
Span::current()
.record("newsletter_issue_id", display(issue_id))
.record("subscriber_email", display(&email));
match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(connection_pool, issue_id).await?;
if let Err(e) = email_client
.send_email(
&email,
&issue.title,
&issue.html_content,
&issue.text_content,
)
.await
{
tracing::error!(
error.message = %e,
"Failed to deliver issue to confirmed subscriber. Skipping."
);
}
}
Err(e) => {
tracing::error!(
error.message = %e,
"Skipping a subscriber. Their stored contact details are invalid."
);
}
}
delete_task(transaction, issue_id, &email).await?;
Ok(ExecutionOutcome::TaskCompleted)
}
struct NewsletterIssue {
title: String,
text_content: String,
html_content: String,
}
#[tracing::instrument(skip_all)]
async fn get_issue(
connection_pool: &PgPool,
issue_id: Uuid,
) -> Result<NewsletterIssue, anyhow::Error> {
let issue = sqlx::query_as!(
NewsletterIssue,
r#"
SELECT title, text_content, html_content
FROM newsletter_issues
WHERE newsletter_issue_id = $1
"#,
issue_id
)
.fetch_one(connection_pool)
.await?;
Ok(issue)
}
#[tracing::instrument(skip_all)]
async fn dequeue_task(
connection_pool: &PgPool,
) -> Result<Option<(Transaction<'static, Postgres>, Uuid, String)>, anyhow::Error> {
let mut transaction = connection_pool.begin().await?;
let query = sqlx::query!(
r#"
SELECT newsletter_issue_id, subscriber_email
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1
"#
);
let r = transaction.fetch_optional(query).await?;
if let Some(row) = r {
Ok(Some((
transaction,
row.get("newsletter_issue_id"),
row.get("subscriber_email"),
)))
} else {
Ok(None)
}
}
#[tracing::instrument(skip_all)]
async fn delete_task(
mut transaction: Transaction<'static, Postgres>,
issue_id: Uuid,
email: &str,
) -> Result<(), anyhow::Error> {
let query = sqlx::query!(
r#"
DELETE FROM issue_delivery_queue
WHERE
newsletter_issue_id = $1
AND subscriber_email = $2
"#,
issue_id,
email
);
transaction.execute(query).await?;
transaction.commit().await?;
Ok(())
}

10
src/lib.rs Normal file
View File

@@ -0,0 +1,10 @@
pub mod authentication;
pub mod configuration;
pub mod domain;
pub mod email_client;
pub mod idempotency;
pub mod issue_delivery_worker;
pub mod routes;
pub mod session_state;
pub mod startup;
pub mod telemetry;

View File

@@ -1,9 +1,22 @@
use axum::{Router, routing::get};
use zero2prod::{
configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped,
startup::Application, telemetry::init_subscriber,
};
#[tokio::main]
async fn main() {
let app = Router::new().route("/", get(|| async { "Hello, World!" }));
async fn main() -> Result<(), anyhow::Error> {
init_subscriber(std::io::stdout);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
let configuration = get_configuration().expect("Failed to read configuration");
let application = Application::build(configuration.clone()).await?;
let application_task = tokio::spawn(application.run_until_stopped());
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
tokio::select! {
_ = application_task => {},
_ = worker_task => {},
};
Ok(())
}

15
src/routes.rs Normal file
View File

@@ -0,0 +1,15 @@
mod admin;
mod health_check;
mod home;
mod login;
mod register;
mod subscriptions;
mod subscriptions_confirm;
pub use admin::*;
pub use health_check::*;
pub use home::*;
pub use login::*;
pub use register::*;
pub use subscriptions::*;
pub use subscriptions_confirm::*;

68
src/routes/admin.rs Normal file
View File

@@ -0,0 +1,68 @@
pub mod change_password;
pub mod dashboard;
pub mod newsletters;
use crate::{routes::error_chain_fmt, session_state::TypedSession};
use axum::{
Json,
response::{IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
pub use change_password::*;
pub use dashboard::*;
pub use newsletters::*;
use reqwest::StatusCode;
#[derive(thiserror::Error)]
pub enum AdminError {
#[error("Something went wrong.")]
UnexpectedError(#[from] anyhow::Error),
#[error("Trying to access admin dashboard without authentication.")]
NotAuthenticated,
#[error("Updating password failed.")]
ChangePassword,
#[error("Could not publish newsletter.")]
Publish(#[source] anyhow::Error),
#[error("The idempotency key was invalid.")]
Idempotency(String),
}
impl std::fmt::Debug for AdminError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
error_chain_fmt(self, f)
}
}
impl IntoResponse for AdminError {
fn into_response(self) -> Response {
#[derive(serde::Serialize)]
struct ErrorResponse<'a> {
message: &'a str,
}
tracing::error!("{:?}", self);
match &self {
AdminError::UnexpectedError(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: "An internal server error occured.",
}),
)
.into_response(),
AdminError::NotAuthenticated => Redirect::to("/login").into_response(),
AdminError::ChangePassword => Redirect::to("/admin/password").into_response(),
AdminError::Publish(_) => Redirect::to("/admin/newsletters").into_response(),
AdminError::Idempotency(e) => {
(StatusCode::BAD_REQUEST, Json(ErrorResponse { message: e })).into_response()
}
}
}
}
#[tracing::instrument(name = "Logging out", skip(messages, session))]
pub async fn logout(messages: Messages, session: TypedSession) -> Result<Response, AdminError> {
session.clear().await;
messages.success("You have successfully logged out.");
Ok(Redirect::to("/login").into_response())
}

View File

@@ -0,0 +1,72 @@
use crate::{
authentication::{self, AuthenticatedUser, Credentials, validate_credentials},
routes::AdminError,
startup::AppState,
};
use axum::{
Extension, Form,
extract::State,
response::{Html, IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
use secrecy::{ExposeSecret, SecretString};
use std::fmt::Write;
#[derive(serde::Deserialize)]
pub struct PasswordFormData {
pub current_password: SecretString,
pub new_password: SecretString,
pub new_password_check: SecretString,
}
pub async fn change_password_form(messages: Messages) -> Result<Response, AdminError> {
let mut error_html = String::new();
for message in messages {
writeln!(error_html, "<p><i>{}</i></p>", message).unwrap();
}
Ok(Html(format!(
include_str!("html/change_password_form.html"),
error_html
))
.into_response())
}
pub async fn change_password(
Extension(AuthenticatedUser { user_id, username }): Extension<AuthenticatedUser>,
State(AppState {
connection_pool, ..
}): State<AppState>,
messages: Messages,
Form(form): Form<PasswordFormData>,
) -> Result<Response, AdminError> {
let credentials = Credentials {
username,
password: form.current_password,
};
if form.new_password.expose_secret() != form.new_password_check.expose_secret() {
messages.error("You entered two different passwords - the field values must match.");
Err(AdminError::ChangePassword)
} else if validate_credentials(credentials, &connection_pool)
.await
.is_err()
{
messages.error("The current password is incorrect.");
Err(AdminError::ChangePassword)
} else if let Err(e) = verify_password(form.new_password.expose_secret()) {
messages.error(e);
Err(AdminError::ChangePassword)
} else {
authentication::change_password(user_id, form.new_password, &connection_pool)
.await
.map_err(|_| AdminError::ChangePassword)?;
messages.success("Your password has been changed.");
Ok(Redirect::to("/admin/password").into_response())
}
}
fn verify_password(password: &str) -> Result<(), String> {
if password.len() < 12 || password.len() > 128 {
return Err("The password must contain between 12 and 128 characters.".into());
}
Ok(())
}

View File

@@ -0,0 +1,11 @@
use crate::authentication::AuthenticatedUser;
use axum::{
Extension,
response::{Html, IntoResponse, Response},
};
pub async fn admin_dashboard(
Extension(AuthenticatedUser { username, .. }): Extension<AuthenticatedUser>,
) -> Response {
Html(format!(include_str!("html/dashboard.html"), username)).into_response()
}

View File

@@ -0,0 +1,26 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Change password</title>
</head>
<body>
<form action="/admin/password" method="post">
<input
type="password"
name="current_password"
placeholder="Current password"
/>
<input type="password" name="new_password" placeholder="New password" />
<input
type="password"
name="new_password_check"
placeholder="Confirm new password"
/>
<button type="submit">Change password</button>
</form>
{}
<p><a href="/admin/dashboard">Back</a></p>
</body>
</html>

View File

@@ -0,0 +1,21 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Admin dashboard</title>
</head>
<body>
<p>Welcome {}!</p>
<p>Available actions:</p>
<ol>
<li><a href="/admin/password">Change password</a></li>
<li><a href="/admin/newsletters">Send a newsletter</a></li>
<li>
<form name="logoutForm" action="/admin/logout" method="post">
<input type="submit" value="Logout" />
</form>
</li>
</ol>
</body>
</html>

View File

@@ -0,0 +1,19 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Send a newsletter</title>
</head>
<body>
<form action="/admin/newsletters" method="post">
<input type="text" name="title" placeholder="Subject" />
<input type="text" name="html" placeholder="Content (HTML)" />
<input type="text" name="text" placeholder="Content (text)" />
<input hidden type="text" name="idempotency_key" value="{}" />
<button type="submit">Send</button>
</form>
{}
<p><a href="/admin/dashboard">Back</a></p>
</body>
</html>

View File

@@ -0,0 +1,144 @@
use crate::{
authentication::AuthenticatedUser,
idempotency::{IdempotencyKey, save_response, try_processing},
routes::AdminError,
startup::AppState,
};
use anyhow::Context;
use axum::{
Extension, Form,
extract::State,
response::{Html, IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
use sqlx::{Executor, Postgres, Transaction};
use std::fmt::Write;
use uuid::Uuid;
#[derive(serde::Deserialize)]
pub struct BodyData {
title: String,
html: String,
text: String,
idempotency_key: String,
}
pub async fn publish_newsletter_form(messages: Messages) -> Response {
let mut error_html = String::new();
for message in messages {
writeln!(error_html, "<p><i>{}</i></p>", message).unwrap();
}
let idempotency_key = Uuid::new_v4();
Html(format!(
include_str!("html/send_newsletter_form.html"),
idempotency_key, error_html
))
.into_response()
}
#[tracing::instrument(skip_all)]
pub async fn insert_newsletter_issue(
transaction: &mut Transaction<'static, Postgres>,
title: &str,
text_content: &str,
html_content: &str,
) -> Result<Uuid, sqlx::Error> {
let newsletter_issue_id = Uuid::new_v4();
let query = sqlx::query!(
r#"
INSERT INTO newsletter_issues (
newsletter_issue_id, title, text_content, html_content, published_at
)
VALUES ($1, $2, $3, $4, now())
"#,
newsletter_issue_id,
title,
text_content,
html_content
);
transaction.execute(query).await?;
Ok(newsletter_issue_id)
}
#[tracing::instrument(skip_all)]
async fn enqueue_delivery_tasks(
transaction: &mut Transaction<'static, Postgres>,
newsletter_issue_id: Uuid,
) -> Result<(), sqlx::Error> {
let query = sqlx::query!(
r#"
INSERT INTO issue_delivery_queue (
newsletter_issue_id,
subscriber_email
)
SELECT $1, email
FROM subscriptions
WHERE status = 'confirmed'
"#,
newsletter_issue_id,
);
transaction.execute(query).await?;
Ok(())
}
#[tracing::instrument(
name = "Publishing a newsletter",
skip(connection_pool, form, messages)
)]
pub async fn publish_newsletter(
State(AppState {
connection_pool, ..
}): State<AppState>,
Extension(AuthenticatedUser { user_id, .. }): Extension<AuthenticatedUser>,
messages: Messages,
Form(form): Form<BodyData>,
) -> Result<Response, AdminError> {
if let Err(e) = validate_form(&form) {
messages.error(e);
return Err(AdminError::Publish(anyhow::anyhow!(e)));
}
let idempotency_key: IdempotencyKey = form
.idempotency_key
.try_into()
.map_err(AdminError::Idempotency)?;
let success_message = || {
messages.success(format!(
"The newsletter issue '{}' has been published!",
form.title
))
};
let mut transaction = match try_processing(&connection_pool, &idempotency_key, user_id).await? {
crate::idempotency::NextAction::StartProcessing(t) => t,
crate::idempotency::NextAction::ReturnSavedResponse(response) => {
success_message();
return Ok(response);
}
};
let issue_id = insert_newsletter_issue(&mut transaction, &form.title, &form.text, &form.html)
.await
.context("Failed to store newsletter issue details")?;
enqueue_delivery_tasks(&mut transaction, issue_id)
.await
.context("Failed to enqueue delivery tasks")?;
let response = Redirect::to("/admin/newsletters").into_response();
success_message();
save_response(transaction, &idempotency_key, user_id, response)
.await
.map_err(AdminError::UnexpectedError)
}
fn validate_form(form: &BodyData) -> Result<(), &'static str> {
if form.title.is_empty() {
return Err("The title was empty");
}
if form.html.is_empty() || form.text.is_empty() {
return Err("The content was empty.");
}
Ok(())
}

View File

@@ -0,0 +1,5 @@
use axum::{http::StatusCode, response::IntoResponse};
pub async fn health_check() -> impl IntoResponse {
StatusCode::OK
}

5
src/routes/home.rs Normal file
View File

@@ -0,0 +1,5 @@
use axum::response::{Html, IntoResponse};
pub async fn home() -> impl IntoResponse {
Html(include_str!("home/home.html"))
}

15
src/routes/home/home.html Normal file
View File

@@ -0,0 +1,15 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Home</title>
</head>
<body>
<p>Welcome to our newsletter!</p>
<ol>
<li><a href="/login">Admin login</a></li>
<li><a href="/register">Register</a></li>
</ol>
</body>
</html>

110
src/routes/login.rs Normal file
View File

@@ -0,0 +1,110 @@
use crate::{
authentication::{AuthError, Credentials, validate_credentials},
routes::error_chain_fmt,
session_state::TypedSession,
startup::AppState,
};
use axum::{
Form, Json,
extract::State,
response::{Html, IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
use reqwest::StatusCode;
use secrecy::SecretString;
use std::fmt::Write;
#[derive(thiserror::Error)]
pub enum LoginError {
#[error("Something went wrong.")]
UnexpectedError(#[from] anyhow::Error),
#[error("Authentication failed.")]
AuthError(#[source] anyhow::Error),
}
impl std::fmt::Debug for LoginError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
error_chain_fmt(self, f)
}
}
impl IntoResponse for LoginError {
fn into_response(self) -> Response {
#[derive(serde::Serialize)]
struct ErrorResponse<'a> {
message: &'a str,
}
tracing::error!("{:?}", self);
match &self {
LoginError::UnexpectedError(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: "An internal server error occured.",
}),
)
.into_response(),
LoginError::AuthError(_) => Redirect::to("/login").into_response(),
}
}
}
#[derive(serde::Deserialize)]
pub struct LoginFormData {
username: String,
password: SecretString,
}
pub async fn get_login(messages: Messages) -> impl IntoResponse {
let mut error_html = String::new();
for message in messages {
writeln!(error_html, "<p><i>{}</i></p>", message).unwrap();
}
Html(format!(include_str!("login/login.html"), error_html))
}
pub async fn post_login(
session: TypedSession,
messages: Messages,
State(AppState {
connection_pool, ..
}): State<AppState>,
Form(form): Form<LoginFormData>,
) -> Result<Redirect, LoginError> {
let credentials = Credentials {
username: form.username.clone(),
password: form.password,
};
tracing::Span::current().record("username", tracing::field::display(&credentials.username));
match validate_credentials(credentials, &connection_pool).await {
Err(e) => {
let e = match e {
AuthError::UnexpectedError(_) => LoginError::UnexpectedError(e.into()),
AuthError::InvalidCredentials(_) => {
let e = LoginError::AuthError(e.into());
messages.error(e.to_string());
e
}
AuthError::NotAuthenticated => unreachable!(),
};
Err(e)
}
Ok(user_id) => {
tracing::Span::current().record("user_id", tracing::field::display(&user_id));
session
.renew()
.await
.map_err(|e| LoginError::UnexpectedError(e.into()))?;
session
.insert_user_id(user_id)
.await
.map_err(|e| LoginError::UnexpectedError(e.into()))?;
session
.insert_username(form.username)
.await
.map_err(|e| LoginError::UnexpectedError(e.into()))?;
Ok(Redirect::to("/admin/dashboard"))
}
}
}

View File

@@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Login</title>
</head>
<body>
<form action="/login" method="post">
<input type="text" name="username" placeholder="Username" />
<input type="password" name="password" placeholder="Password" />
<button type="submit">Login</button>
</form>
{}
</body>
</html>

11
src/routes/register.rs Normal file
View File

@@ -0,0 +1,11 @@
use axum::response::{Html, IntoResponse, Response};
use axum_messages::Messages;
use std::fmt::Write;
pub async fn register(messages: Messages) -> Response {
let mut error_html = String::new();
for message in messages {
writeln!(error_html, "<p><i>{}</i></p>", message).unwrap();
}
Html(format!(include_str!("register/register.html"), error_html)).into_response()
}

View File

@@ -0,0 +1,11 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Account confirmed</title>
</head>
<body>
<p>Your account has been confirmed. Welcome!</p>
</body>
</html>

View File

@@ -0,0 +1,22 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Register</title>
</head>
<body>
<form action="/subscriptions" method="post">
<input type="text" name="name" placeholder="Name" />
<input type="text" name="email" placeholder="Email address" />
<input
type="text"
name="email_check"
placeholder="Confirm email address"
/>
<button type="Register">Register</button>
</form>
{}
<p><a href="/">Back</a></p>
</body>
</html>

228
src/routes/subscriptions.rs Normal file
View File

@@ -0,0 +1,228 @@
use crate::{
domain::{NewSubscriber, SubscriberEmail, SubscriberName},
email_client::EmailClient,
startup::AppState,
};
use anyhow::Context;
use axum::{
Form, Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Redirect, Response},
};
use axum_messages::Messages;
use chrono::Utc;
use rand::{Rng, distr::Alphanumeric};
use serde::Deserialize;
use sqlx::{Executor, Postgres, Transaction};
use uuid::Uuid;
fn generate_subscription_token() -> String {
let mut rng = rand::rng();
std::iter::repeat_with(|| rng.sample(Alphanumeric))
.map(char::from)
.take(25)
.collect()
}
pub fn error_chain_fmt(
e: &impl std::error::Error,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
writeln!(f, "{}", e)?;
let mut current = e.source();
while let Some(cause) = current {
write!(f, "Caused by:\n\t{}", cause)?;
current = cause.source();
if current.is_some() {
writeln!(f)?;
}
}
Ok(())
}
#[derive(thiserror::Error)]
pub enum SubscribeError {
#[error(transparent)]
UnexpectedError(#[from] anyhow::Error),
#[error("{0}")]
ValidationError(String),
}
impl std::fmt::Debug for SubscribeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
error_chain_fmt(self, f)
}
}
impl IntoResponse for SubscribeError {
fn into_response(self) -> Response {
#[derive(serde::Serialize)]
struct ErrorResponse<'a> {
message: &'a str,
}
tracing::error!("{:?}", self);
match self {
SubscribeError::UnexpectedError(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
message: "An internal server error occured.",
}),
)
.into_response(),
SubscribeError::ValidationError(_) => Redirect::to("/register").into_response(),
}
}
}
#[tracing::instrument(
name = "Adding a new subscriber",
skip(messages, connection_pool, email_client, base_url, form),
fields(
subscriber_email = %form.email,
subscriber_name = %form.name
)
)]
pub async fn subscribe(
messages: Messages,
State(AppState {
connection_pool,
email_client,
base_url,
..
}): State<AppState>,
Form(form): Form<SubscriptionFormData>,
) -> Result<Response, SubscribeError> {
let new_subscriber = match form.try_into() {
Ok(new_sub) => new_sub,
Err(e) => {
messages.error(&e);
return Err(SubscribeError::ValidationError(e));
}
};
let mut transaction = connection_pool
.begin()
.await
.context("Failed to acquire a Postgres connection from the pool.")?;
let subscriber_id = insert_subscriber(&mut transaction, &new_subscriber)
.await
.context("Failed to insert new subscriber in the database.")?;
let subscription_token = generate_subscription_token();
store_token(&mut transaction, &subscription_token, &subscriber_id)
.await
.context("Failed to store the confirmation token for a new subscriber.")?;
send_confirmation_email(
&email_client,
&new_subscriber,
&base_url,
&subscription_token,
)
.await
.context("Failed to send a confirmation email.")?;
transaction
.commit()
.await
.context("Failed to commit the database transaction to store a new subscriber.")?;
messages.success("A confirmation email has been sent.");
Ok(Redirect::to("/register").into_response())
}
#[tracing::instrument(
name = "Saving new subscriber details in the database",
skip(transaction, new_subscriber)
)]
pub async fn insert_subscriber(
transaction: &mut Transaction<'_, Postgres>,
new_subscriber: &NewSubscriber,
) -> Result<Uuid, sqlx::Error> {
let subscriber_id = Uuid::new_v4();
let query = sqlx::query!(
r#"
INSERT INTO subscriptions (id, email, name, subscribed_at, status)
VALUES ($1, $2, $3, $4, 'pending_confirmation')
"#,
subscriber_id,
new_subscriber.email.as_ref(),
new_subscriber.name.as_ref(),
Utc::now()
);
transaction.execute(query).await?;
Ok(subscriber_id)
}
#[tracing::instrument(
name = "Store subscription token in the database",
skip(transaction, subscription_token)
)]
async fn store_token(
transaction: &mut Transaction<'_, Postgres>,
subscription_token: &str,
subscriber_id: &Uuid,
) -> Result<(), sqlx::Error> {
let query = sqlx::query!(
r#"
INSERT INTO subscription_tokens (subscription_token, subscriber_id)
VALUES ($1, $2)
"#,
subscription_token,
subscriber_id,
);
transaction.execute(query).await?;
Ok(())
}
#[tracing::instrument(
name = "Send a confirmation email to a new subscriber",
skip(email_client, new_subscriber, base_url, subscription_token)
)]
pub async fn send_confirmation_email(
email_client: &EmailClient,
new_subscriber: &NewSubscriber,
base_url: &str,
subscription_token: &str,
) -> Result<(), reqwest::Error> {
let confirmation_link = format!(
"{}/subscriptions/confirm?subscription_token={}",
base_url, subscription_token
);
let html_content = format!(
"Welcome to our newsletter!<br />\
Click <a href=\"{}\">here</a> to confirm your subscription.",
confirmation_link
);
let text_content = format!(
"Welcome to our newsletter!\nVisit {} to confirm your subscription.",
confirmation_link
);
email_client
.send_email(
&new_subscriber.email,
"Welcome!",
&html_content,
&text_content,
)
.await
}
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct SubscriptionFormData {
name: String,
email: String,
email_check: String,
}
impl TryFrom<SubscriptionFormData> for NewSubscriber {
type Error = String;
fn try_from(value: SubscriptionFormData) -> Result<Self, Self::Error> {
let name = SubscriberName::parse(value.name)?;
if value.email != value.email_check {
return Err("Email addresses don't match.".into());
}
let email = SubscriberEmail::parse(value.email)?;
Ok(Self { name, email })
}
}

View File

@@ -0,0 +1,82 @@
use crate::startup::AppState;
use axum::{
extract::{Query, State},
http::StatusCode,
response::{Html, IntoResponse, Response},
};
use serde::Deserialize;
use sqlx::PgPool;
use uuid::Uuid;
#[tracing::instrument(name = "Confirming new subscriber", skip(params))]
pub async fn confirm(
State(AppState {
connection_pool, ..
}): State<AppState>,
Query(params): Query<Params>,
) -> Response {
let Ok(subscriber_id) =
get_subscriber_id_from_token(&connection_pool, &params.subscription_token).await
else {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
};
if let Some(subscriber_id) = subscriber_id {
if confirm_subscriber(&connection_pool, &subscriber_id)
.await
.is_err()
{
StatusCode::INTERNAL_SERVER_ERROR.into_response()
} else {
Html(include_str!("register/confirm.html")).into_response()
}
} else {
StatusCode::UNAUTHORIZED.into_response()
}
}
#[tracing::instrument(
name = "Mark subscriber as confirmed",
skip(connection_pool, subscriber_id)
)]
async fn confirm_subscriber(
connection_pool: &PgPool,
subscriber_id: &Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"UPDATE subscriptions SET status = 'confirmed' WHERE id = $1",
subscriber_id
)
.execute(connection_pool)
.await
.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Ok(())
}
#[tracing::instrument(
name = "Get subscriber_id from token",
skip(connection, subscription_token)
)]
async fn get_subscriber_id_from_token(
connection: &PgPool,
subscription_token: &str,
) -> Result<Option<Uuid>, sqlx::Error> {
let saved = sqlx::query!(
"SELECT subscriber_id FROM subscription_tokens WHERE subscription_token = $1",
subscription_token
)
.fetch_optional(connection)
.await
.map_err(|e| {
tracing::error!("Failed to execute query: {:?}", e);
e
})?;
Ok(saved.map(|r| r.subscriber_id))
}
#[derive(Debug, Deserialize)]
pub struct Params {
subscription_token: String,
}

53
src/session_state.rs Normal file
View File

@@ -0,0 +1,53 @@
use axum::{extract::FromRequestParts, http::request::Parts};
use std::result;
use tower_sessions::{Session, session::Error};
use uuid::Uuid;
pub struct TypedSession(Session);
type Result<T> = result::Result<T, Error>;
impl TypedSession {
const USER_ID_KEY: &'static str = "user_id";
const USERNAME_KEY: &'static str = "username";
pub async fn renew(&self) -> Result<()> {
self.0.cycle_id().await
}
pub async fn insert_user_id(&self, user_id: Uuid) -> Result<()> {
self.0.insert(Self::USER_ID_KEY, user_id).await
}
pub async fn get_user_id(&self) -> Result<Option<Uuid>> {
self.0.get(Self::USER_ID_KEY).await
}
pub async fn insert_username(&self, username: String) -> Result<()> {
self.0.insert(Self::USERNAME_KEY, username).await
}
pub async fn get_username(&self) -> Result<Option<String>> {
self.0.get(Self::USERNAME_KEY).await
}
pub async fn clear(&self) {
self.0.clear().await;
}
}
impl<S> FromRequestParts<S> for TypedSession
where
S: Sync + Send,
{
type Rejection = <Session as FromRequestParts<S>>::Rejection;
async fn from_request_parts(
parts: &mut Parts,
state: &S,
) -> result::Result<Self, Self::Rejection> {
Session::from_request_parts(parts, state)
.await
.map(TypedSession)
}
}

156
src/startup.rs Normal file
View File

@@ -0,0 +1,156 @@
use crate::{
authentication::require_auth, configuration::Settings, email_client::EmailClient, routes::*,
};
use axum::{
Router,
extract::MatchedPath,
http::Request,
middleware,
routing::{get, post},
};
use axum_messages::MessagesManagerLayer;
use axum_server::tls_rustls::RustlsConfig;
use secrecy::ExposeSecret;
use sqlx::{PgPool, postgres::PgPoolOptions};
use std::{net::TcpListener, sync::Arc};
use tower_http::trace::TraceLayer;
use tower_sessions::SessionManagerLayer;
use tower_sessions_redis_store::{
RedisStore,
fred::prelude::{ClientLike, Config, Pool},
};
use uuid::Uuid;
#[derive(Clone)]
pub struct AppState {
pub connection_pool: PgPool,
pub email_client: Arc<EmailClient>,
pub base_url: String,
}
pub struct Application {
listener: TcpListener,
router: Router,
tls_config: Option<RustlsConfig>,
}
impl Application {
pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
let address = format!(
"{}:{}",
configuration.application.host, configuration.application.port
);
let connection_pool =
PgPoolOptions::new().connect_lazy_with(configuration.database.with_db());
let email_client = EmailClient::build(configuration.email_client).unwrap();
let pool = Pool::new(
Config::from_url(configuration.redis_uri.expose_secret())
.expect("Failed to parse Redis URL string"),
None,
None,
None,
6,
)
.unwrap();
pool.connect();
pool.wait_for_connect().await.unwrap();
let redis_store = RedisStore::new(pool);
let router = app(
connection_pool,
email_client,
configuration.application.base_url,
redis_store,
);
let tls_config = if configuration.require_tls {
Some(
RustlsConfig::from_pem_file(
std::env::var("APP_TLS_CERT")
.expect("Failed to read TLS certificate environment variable"),
std::env::var("APP_TLS_KEY")
.expect("Feiled to read TLS private key environment variable"),
)
.await
.expect("Could not create TLS configuration"),
)
} else {
None
};
let listener = TcpListener::bind(address).unwrap();
Ok(Self {
listener,
router,
tls_config,
})
}
pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
tracing::debug!("listening on {}", self.local_addr());
if let Some(tls_config) = self.tls_config {
axum_server::from_tcp_rustls(self.listener, tls_config)
.serve(self.router.into_make_service())
.await
} else {
axum_server::from_tcp(self.listener)
.serve(self.router.into_make_service())
.await
}
}
pub fn local_addr(&self) -> String {
self.listener.local_addr().unwrap().to_string()
}
pub fn port(&self) -> u16 {
self.listener.local_addr().unwrap().port()
}
}
pub fn app(
connection_pool: PgPool,
email_client: EmailClient,
base_url: String,
redis_store: RedisStore<Pool>,
) -> Router {
let app_state = AppState {
connection_pool,
email_client: Arc::new(email_client),
base_url,
};
let admin_routes = Router::new()
.route("/dashboard", get(admin_dashboard))
.route("/password", get(change_password_form).post(change_password))
.route(
"/newsletters",
get(publish_newsletter_form).post(publish_newsletter),
)
.route("/logout", post(logout))
.layer(middleware::from_fn(require_auth));
Router::new()
.route("/", get(home))
.route("/register", get(register))
.route("/login", get(get_login).post(post_login))
.route("/health_check", get(health_check))
.route("/subscriptions", post(subscribe))
.route("/subscriptions/confirm", get(confirm))
.nest("/admin", admin_routes)
.layer(
TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
let matched_path = request
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);
let request_id = Uuid::new_v4().to_string();
tracing::info_span!(
"http_request",
method = ?request.method(),
matched_path,
request_id,
some_other_field = tracing::field::Empty,
)
}),
)
.layer(MessagesManagerLayer)
.layer(SessionManagerLayer::new(redis_store).with_secure(false))
.with_state(app_state)
}

32
src/telemetry.rs Normal file
View File

@@ -0,0 +1,32 @@
use tokio::task::JoinHandle;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::{fmt::MakeWriter, layer::SubscriberExt, util::SubscriberInitExt};
pub fn init_subscriber<Sink>(sink: Sink)
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let formatting_layer = BunyanFormattingLayer::new(env!("CARGO_CRATE_NAME").into(), sink);
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
format!(
"{}=debug,tower_http=debug,axum::rejection=trace",
env!("CARGO_CRATE_NAME")
)
.into()
}),
)
.with(JsonStorageLayer)
.with(formatting_layer)
.init();
}
pub fn spawn_blocking_with_tracing<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let current_span = tracing::Span::current();
tokio::task::spawn_blocking(move || current_span.in_scope(f))
}

View File

@@ -0,0 +1,34 @@
use crate::helpers::{TestApp, assert_is_redirect_to};
#[tokio::test]
async fn you_must_be_logged_in_to_access_the_admin_dashboard() {
let app = TestApp::spawn().await;
let response = app.get_admin_dashboard().await;
assert_is_redirect_to(&response, "/login");
}
#[tokio::test]
async fn logout_clears_session_state() {
let app = TestApp::spawn().await;
let login_body = serde_json::json!({
"username": &app.test_user.username,
"password": &app.test_user.password,
});
let response = app.post_login(&login_body).await;
assert_is_redirect_to(&response, "/admin/dashboard");
let html_page = app.get_admin_dashboard_html().await;
assert!(html_page.contains(&format!("Welcome {}", app.test_user.username)));
let response = app.post_logout().await;
assert_is_redirect_to(&response, "/login");
let html_page = app.get_login_html().await;
assert!(html_page.contains("You have successfully logged out"));
let response = app.get_admin_dashboard().await;
assert_is_redirect_to(&response, "/login");
}

View File

@@ -0,0 +1,115 @@
use uuid::Uuid;
use crate::helpers::{TestApp, assert_is_redirect_to};
#[tokio::test]
async fn you_must_be_logged_in_to_see_the_change_password_form() {
let app = TestApp::spawn().await;
let response = app.get_change_password().await;
assert_is_redirect_to(&response, "/login");
}
#[tokio::test]
async fn you_must_be_logged_in_to_change_your_password() {
let app = TestApp::spawn().await;
let new_password = Uuid::new_v4().to_string();
let response = app
.post_change_password(&serde_json::json!({
"current_password": Uuid::new_v4().to_string(),
"new_password": new_password,
"new_password_check": new_password,
}))
.await;
assert_is_redirect_to(&response, "/login");
}
#[tokio::test]
async fn new_password_fields_must_match() {
let app = TestApp::spawn().await;
app.post_login(&serde_json::json!({
"username": app.test_user.username,
"password": app.test_user.password,
}))
.await;
let new_password = Uuid::new_v4().to_string();
let another_new_password = Uuid::new_v4().to_string();
let response = app
.post_change_password(&serde_json::json!({
"current_password": app.test_user.password,
"new_password": new_password,
"new_password_check": another_new_password,
}))
.await;
assert_is_redirect_to(&response, "/admin/password");
let html_page = app.get_change_password_html().await;
assert!(html_page.contains("You entered two different passwords"));
}
#[tokio::test]
async fn current_password_is_invalid() {
let app = TestApp::spawn().await;
app.post_login(&serde_json::json!({
"username": app.test_user.username,
"password": app.test_user.password,
}))
.await;
let new_password = Uuid::new_v4().to_string();
let response = app
.post_change_password(&serde_json::json!({
"current_password": Uuid::new_v4().to_string(),
"new_password": new_password,
"new_password_check": new_password,
}))
.await;
assert_is_redirect_to(&response, "/admin/password");
let html_page = app.get_change_password_html().await;
assert!(html_page.contains("The current password is incorrect"));
}
#[tokio::test]
async fn changing_password_works() {
let app = TestApp::spawn().await;
let login_body = &serde_json::json!({
"username": app.test_user.username,
"password": app.test_user.password,
});
let response = app.post_login(login_body).await;
assert_is_redirect_to(&response, "/admin/dashboard");
let new_password = Uuid::new_v4().to_string();
let response = app
.post_change_password(&serde_json::json!({
"current_password": app.test_user.password,
"new_password": new_password,
"new_password_check": new_password,
}))
.await;
assert_is_redirect_to(&response, "/admin/password");
let html_page = app.get_change_password_html().await;
assert!(html_page.contains("Your password has been changed"));
let response = app.post_logout().await;
assert_is_redirect_to(&response, "/login");
let html_page = app.get_login_html().await;
assert!(html_page.contains("You have successfully logged out"));
let login_body = &serde_json::json!({
"username": app.test_user.username,
"password": new_password,
});
let response = app.post_login(login_body).await;
assert_is_redirect_to(&response, "/admin/dashboard");
}

16
tests/api/health_check.rs Normal file
View File

@@ -0,0 +1,16 @@
use crate::helpers::TestApp;
#[tokio::test]
async fn health_check_works() {
let app = TestApp::spawn().await;
let client = reqwest::Client::new();
let response = client
.get(format!("{}/health_check", app.address))
.send()
.await
.unwrap();
assert!(response.status().is_success());
assert_eq!(Some(0), response.content_length());
}

296
tests/api/helpers.rs Normal file
View File

@@ -0,0 +1,296 @@
use argon2::{
Algorithm, Argon2, Params, PasswordHasher, Version,
password_hash::{SaltString, rand_core::OsRng},
};
use linkify::LinkFinder;
use once_cell::sync::Lazy;
use sqlx::{Connection, Executor, PgConnection, PgPool};
use uuid::Uuid;
use wiremock::MockServer;
use zero2prod::{
configuration::{DatabaseSettings, get_configuration},
email_client::EmailClient,
issue_delivery_worker::{ExecutionOutcome, try_execute_task},
startup::Application,
telemetry::init_subscriber,
};
static TRACING: Lazy<()> = Lazy::new(|| {
if std::env::var("TEST_LOG").is_ok() {
init_subscriber(std::io::stdout);
} else {
init_subscriber(std::io::sink);
}
});
pub struct ConfirmationLinks {
pub html: reqwest::Url,
pub text: reqwest::Url,
}
pub struct TestUser {
pub user_id: Uuid,
pub username: String,
pub password: String,
}
impl TestUser {
pub fn generate() -> Self {
Self {
user_id: Uuid::new_v4(),
username: Uuid::new_v4().to_string(),
password: Uuid::new_v4().to_string(),
}
}
pub async fn store(&self, connection_pool: &PgPool) {
let salt = SaltString::generate(&mut OsRng);
let password_hash = Argon2::new(
Algorithm::Argon2id,
Version::V0x13,
Params::new(1500, 2, 1, None).unwrap(),
)
.hash_password(self.password.as_bytes(), &salt)
.unwrap()
.to_string();
sqlx::query!(
"INSERT INTO users (user_id, username, password_hash) VALUES ($1, $2, $3)",
self.user_id,
self.username,
password_hash
)
.execute(connection_pool)
.await
.expect("Failed to create test user");
}
}
pub struct TestApp {
pub address: String,
pub connection_pool: PgPool,
pub email_server: wiremock::MockServer,
pub port: u16,
pub test_user: TestUser,
pub api_client: reqwest::Client,
pub email_client: EmailClient,
}
impl TestApp {
pub async fn spawn() -> Self {
Lazy::force(&TRACING);
let email_server = MockServer::start().await;
let configuration = {
let mut c = get_configuration().expect("Failed to read configuration");
c.database.database_name = Uuid::new_v4().to_string();
c.application.port = 0;
c.email_client.base_url = email_server.uri();
c
};
let local_addr = configuration.application.host.clone();
let connection_pool = configure_database(&configuration.database).await;
let email_client = EmailClient::build(configuration.email_client.clone()).unwrap();
let application = Application::build(configuration)
.await
.expect("Failed to build application");
let port = application.port();
let address = format!("http://{}:{}", local_addr, port);
let test_user = TestUser::generate();
test_user.store(&connection_pool).await;
let api_client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.cookie_store(true)
.build()
.unwrap();
let app = TestApp {
address,
connection_pool,
email_server,
port,
test_user,
api_client,
email_client,
};
tokio::spawn(application.run_until_stopped());
app
}
pub async fn dispatch_all_pending_emails(&self) {
loop {
if let ExecutionOutcome::EmptyQueue =
try_execute_task(&self.connection_pool, &self.email_client)
.await
.unwrap()
{
break;
}
}
}
pub fn get_confirmation_links(&self, request: &wiremock::Request) -> ConfirmationLinks {
let body: serde_json::Value = serde_json::from_slice(&request.body).unwrap();
let get_link = |s: &str| {
let links: Vec<_> = LinkFinder::new()
.links(s)
.filter(|l| *l.kind() == linkify::LinkKind::Url)
.collect();
assert_eq!(links.len(), 1);
let raw_link = links[0].as_str();
let mut confirmation_link = reqwest::Url::parse(raw_link).unwrap();
assert_eq!(confirmation_link.host_str().unwrap(), "127.0.0.1");
confirmation_link.set_port(Some(self.port)).unwrap();
confirmation_link
};
let html = get_link(body["html"].as_str().unwrap());
let text = get_link(body["text"].as_str().unwrap());
ConfirmationLinks { html, text }
}
pub async fn get_login_html(&self) -> String {
self.api_client
.get(format!("{}/login", &self.address))
.send()
.await
.expect("Failed to execute request")
.text()
.await
.unwrap()
}
pub async fn get_admin_dashboard(&self) -> reqwest::Response {
self.api_client
.get(format!("{}/admin/dashboard", &self.address))
.send()
.await
.expect("Failed to execute request")
}
pub async fn get_admin_dashboard_html(&self) -> String {
self.get_admin_dashboard().await.text().await.unwrap()
}
pub async fn get_register_html(&self) -> String {
self.api_client
.get(format!("{}/register", &self.address))
.send()
.await
.expect("Failed to execute request")
.text()
.await
.unwrap()
}
pub async fn get_change_password(&self) -> reqwest::Response {
self.api_client
.get(format!("{}/admin/password", &self.address))
.send()
.await
.expect("Failed to execute request")
}
pub async fn get_change_password_html(&self) -> String {
self.get_change_password().await.text().await.unwrap()
}
pub async fn post_subscriptions(&self, body: String) -> reqwest::Response {
self.api_client
.post(format!("{}/subscriptions", self.address))
.header("Content-Type", "application/x-www-form-urlencoded")
.body(body)
.send()
.await
.expect("Failed to execute request")
}
pub async fn get_newsletter_form(&self) -> reqwest::Response {
self.api_client
.get(format!("{}/admin/password", &self.address))
.send()
.await
.expect("Failed to execute request")
}
pub async fn get_newsletter_form_html(&self) -> String {
self.get_newsletter_form().await.text().await.unwrap()
}
pub async fn post_newsletters<Body>(&self, body: &Body) -> reqwest::Response
where
Body: serde::Serialize,
{
self.api_client
.post(format!("{}/admin/newsletters", self.address))
.form(body)
.send()
.await
.expect("Failed to execute request")
}
pub async fn post_login<Body>(&self, body: &Body) -> reqwest::Response
where
Body: serde::Serialize,
{
self.api_client
.post(format!("{}/login", self.address))
.form(body)
.send()
.await
.expect("failed to execute request")
}
pub async fn admin_login(&self) {
let login_body = serde_json::json!({
"username": self.test_user.username,
"password": self.test_user.password
});
self.post_login(&login_body).await;
}
pub async fn post_logout(&self) -> reqwest::Response {
self.api_client
.post(format!("{}/admin/logout", self.address))
.send()
.await
.expect("failed to execute request")
}
pub async fn post_change_password<Body>(&self, body: &Body) -> reqwest::Response
where
Body: serde::Serialize,
{
self.api_client
.post(format!("{}/admin/password", self.address))
.form(body)
.send()
.await
.expect("failed to execute request")
}
}
async fn configure_database(config: &DatabaseSettings) -> PgPool {
let mut connection = PgConnection::connect_with(&config.without_db())
.await
.expect("Failed to connect to Postgres");
connection
.execute(format!(r#"CREATE DATABASE "{}";"#, config.database_name).as_ref())
.await
.expect("Failed to create the database");
let connection_pool = PgPool::connect_with(config.with_db())
.await
.expect("Failed to connect to Postgres");
sqlx::migrate!("./migrations")
.run(&connection_pool)
.await
.expect("Failed to migrate the database");
connection_pool
}
pub fn assert_is_redirect_to(response: &reqwest::Response, location: &str) {
assert_eq!(response.status().as_u16(), 303);
assert_eq!(response.headers().get("Location").unwrap(), location);
}

35
tests/api/login.rs Normal file
View File

@@ -0,0 +1,35 @@
use crate::helpers::{TestApp, assert_is_redirect_to};
#[tokio::test]
async fn an_error_flash_message_is_set_on_failure() {
let app = TestApp::spawn().await;
let login_body = serde_json::json!({
"username": "user",
"password": "password"
});
let response = app.post_login(&login_body).await;
assert_eq!(response.status().as_u16(), 303);
assert_is_redirect_to(&response, "/login");
let login_page_html = app.get_login_html().await;
assert!(login_page_html.contains("Authentication failed"));
}
#[tokio::test]
async fn login_redirects_to_admin_dashboard_after_login_success() {
let app = TestApp::spawn().await;
let login_body = serde_json::json!({
"username": &app.test_user.username,
"password": &app.test_user.password
});
let response = app.post_login(&login_body).await;
assert_is_redirect_to(&response, "/admin/dashboard");
let html_page = app.get_admin_dashboard_html().await;
assert!(html_page.contains(&format!("Welcome {}", app.test_user.username)));
}

8
tests/api/main.rs Normal file
View File

@@ -0,0 +1,8 @@
mod admin_dashboard;
mod change_password;
mod health_check;
mod helpers;
mod login;
mod newsletters;
mod subscriptions;
mod subscriptions_confirm;

240
tests/api/newsletters.rs Normal file
View File

@@ -0,0 +1,240 @@
use crate::helpers::{ConfirmationLinks, TestApp, assert_is_redirect_to};
use fake::{
Fake,
faker::{internet::en::SafeEmail, name::fr_fr::Name},
};
use std::time::Duration;
use uuid::Uuid;
use wiremock::{
Mock, MockBuilder, ResponseTemplate,
matchers::{method, path},
};
#[tokio::test]
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
let app = TestApp::spawn().await;
create_unconfirmed_subscriber(&app).await;
app.admin_login().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200))
.expect(0)
.mount(&app.email_server)
.await;
let newsletter_request_body = serde_json::json!({
"title": "Newsletter title",
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>"
});
app.post_newsletters(&newsletter_request_body).await;
app.dispatch_all_pending_emails().await;
}
#[tokio::test]
async fn requests_without_authentication_are_redirected() {
let app = TestApp::spawn().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200))
.expect(0)
.mount(&app.email_server)
.await;
let newsletter_request_body = serde_json::json!({
"title": "Newsletter title",
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>"
});
let response = app.post_newsletters(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/login");
}
#[tokio::test]
async fn newsletters_are_delivered_to_confirmed_subscribers() {
let app = TestApp::spawn().await;
create_confirmed_subscriber(&app).await;
app.admin_login().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
let newsletter_title = "Newsletter title";
let newsletter_request_body = serde_json::json!({
"title": newsletter_title,
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>",
"idempotency_key": Uuid::new_v4().to_string(),
});
let response = app.post_newsletters(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
let html_page = app.get_newsletter_form_html().await;
assert!(html_page.contains(&format!(
"The newsletter issue '{}' has been published",
newsletter_title
)));
app.dispatch_all_pending_emails().await;
}
#[tokio::test]
async fn form_shows_error_for_invalid_data() {
let app = TestApp::spawn().await;
app.admin_login().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200))
.expect(0)
.mount(&app.email_server)
.await;
let test_cases = [
(
serde_json::json!({
"title": "",
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>",
"idempotency_key": Uuid::new_v4().to_string(),
}),
"The title was empty",
),
(
serde_json::json!({
"title": "Newsletter",
"text": "",
"html": "",
"idempotency_key": Uuid::new_v4().to_string(),
}),
"The content was empty",
),
];
for (invalid_body, error_message) in test_cases {
app.post_newsletters(&invalid_body).await;
let html_page = app.get_newsletter_form_html().await;
assert!(html_page.contains(error_message));
}
}
#[tokio::test]
async fn newsletter_creation_is_idempotent() {
let app = TestApp::spawn().await;
create_confirmed_subscriber(&app).await;
app.admin_login().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
let newsletter_title = "Newsletter title";
let newsletter_request_body = serde_json::json!({
"title": newsletter_title,
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>",
"idempotency_key": Uuid::new_v4().to_string(),
});
let response = app.post_newsletters(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
let html_page = app.get_newsletter_form_html().await;
assert!(html_page.contains(&format!(
"The newsletter issue '{}' has been published",
newsletter_title
)));
let response = app.post_newsletters(&newsletter_request_body).await;
assert_is_redirect_to(&response, "/admin/newsletters");
let html_page = app.get_newsletter_form_html().await;
assert!(html_page.contains(&format!(
"The newsletter issue '{}' has been published",
newsletter_title
)));
app.dispatch_all_pending_emails().await;
}
#[tokio::test]
async fn concurrent_form_submission_is_handled_gracefully() {
let app = TestApp::spawn().await;
create_confirmed_subscriber(&app).await;
app.admin_login().await;
when_sending_an_email()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
.expect(1)
.mount(&app.email_server)
.await;
let newsletter_request_body = serde_json::json!({
"title": "Newsletter title",
"text": "Newsletter body as plain text",
"html": "<p>Newsletter body as HTML</p>",
"idempotency_key": Uuid::new_v4().to_string(),
});
let response1 = app.post_newsletters(&newsletter_request_body);
let response2 = app.post_newsletters(&newsletter_request_body);
let (response1, response2) = tokio::join!(response1, response2);
assert_eq!(response1.status(), response2.status());
assert_eq!(
response1.text().await.unwrap(),
response2.text().await.unwrap(),
);
app.dispatch_all_pending_emails().await;
}
async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
let name: String = Name().fake();
let email: String = SafeEmail().fake();
let body = serde_urlencoded::to_string(serde_json::json!({
"name": name,
"email": email,
"email_check": email
}))
.unwrap();
let _mock_guard = Mock::given(path("/v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.named("Create unconfirmed subscriber")
.expect(1)
.mount_as_scoped(&app.email_server)
.await;
app.post_subscriptions(body)
.await
.error_for_status()
.unwrap();
let email_request = &app
.email_server
.received_requests()
.await
.unwrap()
.pop()
.unwrap();
app.get_confirmation_links(email_request)
}
async fn create_confirmed_subscriber(app: &TestApp) {
let confirmation_links = create_unconfirmed_subscriber(app).await;
reqwest::get(confirmation_links.html)
.await
.unwrap()
.error_for_status()
.unwrap();
}
fn when_sending_an_email() -> MockBuilder {
Mock::given(path("/v1/email")).and(method("POST"))
}

160
tests/api/subscriptions.rs Normal file
View File

@@ -0,0 +1,160 @@
use crate::helpers::{TestApp, assert_is_redirect_to};
use wiremock::{
Mock, ResponseTemplate,
matchers::{method, path},
};
#[tokio::test]
async fn subscribe_displays_a_confirmation_message_for_valid_form_data() {
let app = TestApp::spawn().await;
Mock::given(path("/v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.mount(&app.email_server)
.await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={0}&email_check={0}", email);
let response = app.post_subscriptions(body).await;
assert_is_redirect_to(&response, "/register");
let page_html = app.get_register_html().await;
assert!(page_html.contains("A confirmation email has been sent"));
}
#[tokio::test]
async fn subscribe_persists_the_new_subscriber() {
let app = TestApp::spawn().await;
Mock::given(path("/v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.mount(&app.email_server)
.await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={0}&email_check={0}", email);
let response = app.post_subscriptions(body).await;
assert_is_redirect_to(&response, "/register");
let page_html = app.get_register_html().await;
assert!(page_html.contains("A confirmation email has been sent"));
let saved = sqlx::query!("SELECT email, name, status FROM subscriptions")
.fetch_one(&app.connection_pool)
.await
.expect("Failed to fetch saved subscription");
assert_eq!(saved.email, "alphonse.paix@outlook.com");
assert_eq!(saved.name, "Alphonse");
assert_eq!(saved.status, "pending_confirmation");
}
#[tokio::test]
async fn subscribe_returns_a_422_when_data_is_missing() {
let app = TestApp::spawn().await;
let test_cases = [
("name=Alphonse", "missing the email"),
("email=alphonse.paix%40outlook.com", "missing the name"),
("", "missing both name and email"),
];
for (invalid_body, error_message) in test_cases {
let response = app.post_subscriptions(invalid_body.into()).await;
assert_eq!(
422,
response.status().as_u16(),
"the API did not fail with 422 Unprocessable Entity when the payload was {}.",
error_message
);
}
}
#[tokio::test]
async fn subscribe_shows_an_error_message_when_fields_are_present_but_invalid() {
let app = TestApp::spawn().await;
let test_cases = [
("name=&email=alphonse.paix%40outlook.com", "an empty name"),
("name=Alphonse&email=&email_check=", "an empty email"),
(
"name=Alphonse&email=not-an-email&email_check=not-an_email",
"an invalid email",
),
(
"name=Alphonse&email=alphonse.paix@outlook.com&email_check=alphonse.paix@outlook.fr",
"two different email addresses",
),
];
for (body, description) in test_cases {
let response_text = app
.post_subscriptions(body.into())
.await
.text()
.await
.unwrap();
assert!(
!response_text.contains("Your account has been confirmed"),
"the API did not displayed an error message when the payload had an {}.",
description
);
}
}
#[tokio::test]
async fn subscribe_sends_a_confirmation_email_for_valid_data() {
let app = TestApp::spawn().await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={0}&email_check={0}", email);
Mock::given(path("v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
app.post_subscriptions(body).await;
}
#[tokio::test]
async fn subscribe_sends_a_confirmation_email_with_a_link() {
let app = TestApp::spawn().await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={0}&email_check={0}", email);
Mock::given(path("v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
app.post_subscriptions(body).await;
let email_request = &app.email_server.received_requests().await.unwrap()[0];
let confirmation_links = app.get_confirmation_links(email_request);
assert_eq!(confirmation_links.html, confirmation_links.text);
}
#[tokio::test]
async fn subscribe_fails_if_there_is_a_fatal_database_error() {
let app = TestApp::spawn().await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={0}&email_check={0}", email);
sqlx::query!("ALTER TABLE subscriptions DROP COLUMN email")
.execute(&app.connection_pool)
.await
.unwrap();
let response = app.post_subscriptions(body).await;
assert_eq!(response.status().as_u16(), 500);
}

View File

@@ -0,0 +1,78 @@
use crate::helpers::TestApp;
use wiremock::{
Mock, ResponseTemplate,
matchers::{method, path},
};
#[tokio::test]
async fn confirmation_links_without_token_are_rejected_with_a_400() {
let app = TestApp::spawn().await;
let response = reqwest::get(&format!("{}/subscriptions/confirm", &app.address))
.await
.unwrap();
assert_eq!(400, response.status().as_u16());
}
#[tokio::test]
async fn clicking_on_the_link_shows_a_confiramtion_message() {
let app = TestApp::spawn().await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={email}&email_check={email}");
Mock::given(path("v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
app.post_subscriptions(body).await;
let email_request = &app.email_server.received_requests().await.unwrap()[0];
let confirmation_links = app.get_confirmation_links(email_request);
let response = reqwest::get(confirmation_links.html).await.unwrap();
assert_eq!(response.status().as_u16(), 200);
assert!(
response
.text()
.await
.unwrap()
.contains("Your account has been confirmed")
);
}
#[tokio::test]
async fn clicking_on_the_confirmation_link_confirms_a_subscriber() {
let app = TestApp::spawn().await;
let email = "alphonse.paix@outlook.com";
let body = format!("name=Alphonse&email={email}&email_check={email}");
Mock::given(path("v1/email"))
.and(method("POST"))
.respond_with(ResponseTemplate::new(200))
.expect(1)
.mount(&app.email_server)
.await;
app.post_subscriptions(body).await;
let email_request = &app.email_server.received_requests().await.unwrap()[0];
let confirmation_links = app.get_confirmation_links(email_request);
reqwest::get(confirmation_links.html)
.await
.unwrap()
.error_for_status()
.unwrap();
let saved = sqlx::query!("SELECT email, name, status FROM subscriptions")
.fetch_one(&app.connection_pool)
.await
.expect("Failed to fetch saved subscription");
assert_eq!(saved.email, "alphonse.paix@outlook.com");
assert_eq!(saved.name, "Alphonse");
assert_eq!(saved.status, "confirmed");
}