The Outbox Pattern
The Outbox Pattern solves the "dual-write" problem in distributed systems: the risk that a database transaction commits but the subsequent message publication to a broker (like Kafka) fails, or vice versa. By writing the event to a local `outbox` table within the same ACID transaction as the business logic, we guarantee that the event is captured if and only if the state change is persisted.
PostgreSQL CDC Implementation Logic
For high-throughput systems, polling the outbox table is inefficient. The modern standard is **Change Data Capture (CDC)** via PostgreSQL's logical decoding.
1. Database Schema Setup
Create a dedicated outbox table. Using a UUID for the `id` helps with idempotency on the consumer side.
```sql
CREATE TABLE outbox (
id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL,
aggregate_id TEXT NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index for manual cleanup/audit if needed
CREATE INDEX idx_outbox_created_at ON outbox (created_at);
```
2. Configuring Logical Replication
PostgreSQL must be configured to support logical decoding. In `postgresql.conf`:
```ini
wal_level = logical
max_replication_slots = 5
max_wal_senders = 5
```
Create a **Publication** for the outbox table:
```sql
CREATE PUBLICATION outbox_pub FOR TABLE outbox;
```
3. Consuming the WAL (Logical Decoding)
You can consume the Write-Ahead Log (WAL) directly using the `pgoutput` plugin. This is what tools like Debezium use under the hood.
**Low-level consumption example (Python with `psycopg2`):**
```python
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
conn = psycopg2.connect("dbname=mydb user=postgres",
connection_factory=LogicalReplicationConnection)
cur = conn.cursor()
Create a logical replication slot using pgoutput plugin
try:
cur.create_replication_slot('outbox_slot', output_plugin='pgoutput')
except psycopg2.errors.DuplicateObject:
pass
Start replication stream
cur.start_replication(slot_name='outbox_slot', decode=True,
options={'proto_version': '1', 'publication_names': 'outbox_pub'})
def handle_message(msg):
msg.payload contains the raw WAL log entry
Logic here to parse 'INSERT' into the outbox table and
publish to Kafka/RabbitMQ
print(f"Captured WAL entry: {msg.payload}")
msg.cursor.send_feedback(flush_lsn=msg.data_start)
cur.consume_stream(handle_message)
```
Debezium Outbox Routing
Debezium is the industry-standard connector for this pattern. It uses a **Single Message Transform (SMT)** to route events from a single outbox table to multiple Kafka topics based on the `aggregate_type`.
Debezium Connector Configuration (JSON)
```json
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.dbname": "inventory",
"table.include.list": "public.outbox",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "events.${routedByValue}",
"transforms.outbox.route.by.field": "aggregate_type"
}
}
```
How the SMT Works:
1. **Detection:** Debezium detects an `INSERT` in the `outbox` table.
2. **Transformation:** The `EventRouter` SMT extracts the `payload` and `id`.
3. **Routing:** It sets the Kafka topic name to `events.Order` (if `aggregate_type` was 'Order').
4. **Cleanup:** After the event is safely in Kafka, you can optionally delete the row from the `outbox` table. Note that Debezium captures the *insert*, so deleting the row later doesn't affect the event already in the broker.
Resilience and Idempotency
The "At-Least-Once" Guarantee
Logical decoding ensures you don't miss an event. However, failures during the network hop between the CDC connector and the broker can result in duplicate messages.
Consumer Idempotency Strategies
1. **Database Deduplication:** Store the `outbox_id` in a `processed_events` table on the consumer side.
2. **Natural Idempotency:** Design business logic such that repeating the operation is safe (e.g., "Set Status to Shipped" vs "Increment Shipped Count").
Operational Checklist
- **WAL Bloat:** If the CDC consumer stops, PostgreSQL will retain WAL files until they are consumed, potentially filling the disk. Monitor replication slot lag.
- **Schema Evolution:** Ensure the `payload` JSON structure is versioned or managed via a Schema Registry if using Avro.
- **Cleanup:** Implement a background job to truncate the `outbox` table after $N$ days to prevent the table from becoming a performance bottleneck.
Further Reading
- [ChangeDataCapture](ChangeDataCapture)
- [EventSourcing](EventSourcing)
- [IdempotencyPatterns](IdempotencyPatterns)