Event Driven System using Database Write Ahead Log and Change Data Capture
Whats the issue with the naive approach? Why event driven systems should use WAL and CDC instead of just polling?

We'll be using Postgres, Debezium Server, RabbitMQ, and Python.
So what’s a Write Ahead Log ?
Write-Ahead Log (WAL) is an append-only transaction log where a database records changes before flushing those changes to its main data files on disk ,before physically writing the data to the disk. Database updates the Log with the operation. what this does is preserves the state even if the transaction fails or the database server crashes mid way. It can use this log to recover back to its last functioning state. It guarantees the data is not lost during crash. It also allows the database to be replicated and helps in point in time recovery. So its the single source of truth for your entire database. By default it is in physical format but we can set the wal_level = logical which enables logical decoding to get much more cleaner logs like INSERT into x . Which tools like Debezium consume.
Whats Debezium and why even use that? Cant we poll the database directly or emit the event from the server itself?
Debezium is a Change Data Capture platform that captures database changes into reliable event streams. It can also emit the streams directly into a message broker like Rabbit MQ directly. You can possibly poll the database directly but there are several problems with that the database polling is resource intensive. Lets say we poll the database every 5 seconds then latency is introduced okay so lets reduce the number down to 100ms that solves the latency but most of the requests sit idle consuming database resources. And even when no changes to the database are being made we keep on polling the database.thats not ideal. Okay so why not emit the stream directly from one server to the message broker and listen to it ? Thats a naive approach which works nicely when everything is working as expected but when one thing fails this system can cause cascading failures. Lets say you have a system like
db.update(new_user);
broker.publish("user.created",new_user)
This code looks fine until one of them fails. If the database operation fails, the message is still published — an event gets emitted for a user that doesn't exist. If the publish fails, the user is created but the event is never emitted. Either way, your system is now inconsistent and you have no reliable source of truth because the database and the broker are operating independently with no coordination between them.
This is the dual-write problem, and it's why emitting events directly from your application code is fragile.
Debezium solves this by removing your application from the equation entirely. Instead of your code deciding when to publish an event, Debezium tails the WAL directly — treating every committed transaction as a stream of changes. With wal_level set to logical, it can decode those entries into clean, readable events and forward them to a message broker. If the database write didn't commit, there's nothing in the WAL, so no event is ever emitted. The database becomes the single authority, and Debezium just reacts to what it sees there.
So whats RabbitMQ?
RabbitMQ is a message broker that implements the Advanced Message Queuing Protocol (AMQP). It acts as an intermediary between systems, allowing services to communicate asynchronously by sending messages instead of calling each other directly.
In practice, a producer sends a message to an exchange, the exchange routes it to one or more queues based on rules, and a consumer reads and processes messages from those queues.
Producer → Exchange → Queue → Consumer
This setup helps decouple systems, absorb traffic spikes, and ensure messages are not lost even if downstream services are temporarily unavailable.
Okay so now that we have everything defined. Whats the apparoach and how the system is connected and how they interact with each other.
Let's start with a real usecase. You want your system to generate an export of a user's activity and email it to them. The naive approach is to do this inside a single API request — but that falls apart fast. The request can hit a Gateway Timeout, the client might drop the connection halfway through, and your server is tied up the entire time, unable to serve anything else.
So instead of doing the work in the request, the API handler does one thing only: insert a row into the outbox table and immediately return a 202 Accepted to the client. The heavy work happens asynchronously downstream.
The outbox table records these requests in a structure like this:
{
"event_type": "report.requested",
"user_id": "123",
"occurred_at": "2026-05-13T10:00:00Z",
"idempotency_key": "uuid-v4",
"payload": {...}
}
We set the wal_level to logical in postgres. Next step is we create publication for Outbox Table
CREATE PUBLICATION outbox_pub
FOR TABLE outbox;
Creating publication allows us to only include changes from this table in the logical replication stream. without publication every change is exposed so accidental data leak is possible and it also creates overhead on Debezium so by setting up publication we restrict the logs and reduce load.
Before moving forward lets create a user specifically for debezium server
CREATE USER debezium WITH REPLICATION PASSWORD 'password';
GRANT SELECT ON outbox TO debezium;
So now we have configured everything in our postgres server and database, the next step is the debezium server. This is the important part for managing events.
We need to configure debezium server to listen to the WAL of Postgres server. Lets modify the application.properties :
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=debezium
debezium.source.database.password=password
debezium.source.database.dbname=your_db
debezium.source.topic.prefix=outbox
debezium.source.plugin.name=pgoutput
debezium.source.slot.name=outbox_slot
debezium.source.publication.name=outbox_pub
Okay so now that we have configured the debezium to listen to our outbox table changes. Lets configure RabbitMQ and come back to debezium at a later point.
Goto the RabbitMQ Dashboard and create a new exchange
Name: events
Type: topic
Durability: durable
Next is create a queue and bind it to the exchange.
Queue name: report-service
Durable: true
Exchange: events
Routing key: report.*
Queue: report-service
Now the report.requested event type goes to report-service queue.
Let’s also create a Dead Letter Queue to prevent infinite failure loops.
Create a new queue for DLQ:
Queue name: report-service-dlq
Durable: true
and lets add arguments to report-service queue to use report-service-dlq as the deadletter queue.
Queue: report-service
Durable: true
Arguments:
x-dead-letter-exchange = events
x-dead-letter-routing-key = report.dlq
Now our RabbitMQ is ready to take events. So lets head back to debezium to configure our sink for RabbitMQ.
Lets add these configuration to our application.properties
debezium.sink.type=rabbitmq
debezium.sink.rabbitmq.connection.host=localhost
debezium.sink.rabbitmq.connection.port=5672
debezium.sink.rabbitmq.connection.username=guest
debezium.sink.rabbitmq.connection.password=guest
debezium.sink.rabbitmq.exchange=events
debezium.transforms=outbox
debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
debezium.transforms.outbox.table.field.event.key=idempotency_key
debezium.transforms.outbox.table.field.event.payload=payload
debezium.transforms.outbox.table.fields.additional.placement=event_type:envelope:event_type,idempotency_key:envelope:idempotency_key
debezium.transforms.outbox.table.expand.json.payload=true
debezium.transforms.outbox.route.by.field=event_type
debezium.transforms.outbox.route.topic.replacement=${routedByValue}
This now configures to send the data to the RabbitMQ exchange with the routing-key being the event_type and then RabbitMQ handles the routing according to the event_type.
Okay so now thats all set.Lets create a python application that listens to the queue and performs the action. We connect to the report-service queue in RabbitMQ and continuously listen for incoming events published through the events exchange.
Each message represents a committed outbox event that originated from the database via Debezium, ensuring that only successfully persisted transactions are processed.
Once a message is received:
It is parsed from JSON format
The
event_typedetermines the action to execute (e.g.,report.requested)The payload contains all required business context (such as
user_idand filters)
After successful processing, the worker acknowledges the message to remove it from the queue. If processing fails, the message is rejected and routed to the Dead Letter Queue (report-service-dlq) for later inspection or recovery. here is a code example :
import pika
import json
import time
def handle_event(event):
event_type = event.get("event_type")
payload = event.get("payload", {})
idempotency_key = event.get("idempotency_key")
if has_been_processed(idempotency_key):
print(f"Duplicate event {idempotency_key} detected. Skipping.")
return
print(f"Received event: {event_type}")
if event_type == "report.requested":
user_id = payload.get("user_id")
print(f"Generating report for user: {user_id}")
time.sleep(5)
mark_as_processed(idempotency_key)
print(f"Report generated and email sent for user: {user_id}")
else:
print("Unknown event type")
def callback(ch, method, properties, body):
try:
handle_event(json.loads(body.decode()))
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print("Processing failed:", e)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="report-service", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="report-service", on_message_callback=callback)
print("Worker started. Waiting for messages...")
channel.start_consuming()
if __name__ == "__main__":
main()
Conclusion
So at the end the system stops relying on the service layer to coordinate everything (and hoping both the DB write and event publish succeed together). Instead, the database becomes the single source of truth, and everything else just reacts to what has already been committed.
Few Things to keep in mind.
The outbox table can grow with time so we need to implement a pruning strategy as well. For simplicity purpose we will use the pg_cron extension and prune events older than 7 days:
SELECT cron.schedule('0 2 * * *', $$
DELETE FROM outbox WHERE occurred_at < NOW() - INTERVAL '7 days';
$$);