msgbartop
Blog di Dino Ciuffetti (Bernardino in realtà)
msgbarbottom

11 Feb 23 How to create persistent Queues, Exchanges, and DLXs on RabbitMQ to avoid loosing messages

What happens when you publish a message to an exchange in RabbitMQ with the wrong topic, or better, routing key? What happens if you send a message to the broker in a queue with a TTL policy or the TTL property is in the message itself and that TTL expires? What happens when a consumer discard your message got from the queue with no republish? What if a queue overflows due to a policy?

It’s simple, the broker will simply discard your message forever.

If this thing will make you mad like it does to me this blog article is for you. Here I will tell you how to create a simple tree of queues, DLX and policies to overcome this problem.

I think that starting with commands and examples is better than 10000 written words, and since I don’t have any ads on my blog I don’t have to write a long article to get money from the ads, so here we are.

I consider your RabbitMQ installation and your admin account ready, so we start with the commands.

# Create your VirtualHost
rabbitmqctl add_vhost vhtest --description "Your VH" --default-queue-type classic

# Give your admin user permissions to to everything on your virtualhost
rabbitmqctl set_permissions --vhost vhtest admin '.*' '.*' '.*'

# Create the user that will publish messages to the exchange
rabbitmqctl add_user testuserpub yourpassword
1

# Create the user that will subscribe to your queue to read messages
rabbitmqctl add_user testusersub yourpassword2

Now, we have 3 users (admin, testuserpub and testusersub) and a virtualhost (vhtest). We are ready to create 2 DLX, one to handle overflow, expired TTL and discarded messages, the other to handle messages sent with the wrong routing key. A DLX (or Dead Letter Exchange) is a particular exchange that is designed to handle dead lettered (discarded) messages.

# Create the DLX to handle overflowed, expired or discarded by consumers
rabbitmqadmin declare exchange --vhost=vhtest name=DLXexQoverfloworttl type=headers internal=true

# Create the DLX to handle messages with wrong routing key
rabbitmqadmin declare exchange --vhost=vhtest name=DLXexQwrongtopic type=fanout internal=true

We’ll now declare and bind queues to the first DLX using three different policies

rabbitmqadmin declare queue --vhost=vhtest name=DLXquQoverflow
rabbitmqadmin declare queue --vhost=vhtest name=DLXquQttl
rabbitmqadmin declare queue --vhost=vhtest name=DLXquQrejected
rabbitmqadmin declare binding --vhost=vhtest source=DLXexQoverfloworttl destination=DLXquQoverflow arguments='{"x-first-death-reason": "maxlen", "x-match": "all-with-x"}'
rabbitmqadmin declare binding --vhost=vhtest source=DLXexQoverfloworttl destination=DLXquQttl arguments='{"x-first-death-reason": "expired", "x-match": "all-with-x"}'
rabbitmqadmin declare binding --vhost=vhtest source=DLXexQoverfloworttl destination=DLXquQrejected arguments='{"x-first-death-reason": "rejected", "x-match": "all-with-x"}'

And now we’ll declare and bind queues to the second DLX to handle messages with wrong topic (routing key)

rabbitmqadmin declare queue --vhost=vhtest name=DLXquQwrongtopic
rabbitmqadmin declare binding --vhost=vhtest source=DLXexQwrongtopic destination=DLXquQwrongtopic

Now we have 1 DLX with 3 queues and another DLX with 1 queue bound. The first will route expired, discarded and overflowed messages to the respective queues (DLXquQttl, DLXquQoverflow, DLXquQrejected), the second will route messages with invalid routing key to the respective queue (DLXquQwrongtopic).

Now we are going to create our main queue and the normal Exchange that will send message to it

rabbitmqadmin declare queue --vhost=vhtest name=quQ
rabbitmqadmin declare exchange --vhost=vhtest name=exQ type=direct

In this example, we want to route all messages with routing key NBE

rabbitmqadmin declare binding --vhost=vhtest source=exQ destination=quQ routing_key=NBE

We now want to create the policy that is needed to associate the wrong topic DLX to our main exchange

rabbitmqctl set_policy --vhost vhtest wrongtopicQ1 "^exQ$" '{"alternate-exchange":"DLXquQwrongtopic"}' --apply-to exchanges

This is an example policy to set limits to 100 messages, 1073741824 bytes, 30 seconds TTL to the quQ queue.

rabbitmqctl set_policy --vhost vhtest shorttimedqunbe '^quQ$' '{"max-length":100,"max-length-bytes":1073741824,"message-ttl":30000,"overflow":"reject-publish-dlx","dead-letter-exchange":"DLXexQoverfloworttl"}' --priority 0 --apply-to queues

Going to give proper permissions to our publish and subscriber users. The user testuserpub can only write to its exchange, while testusersub can read from its queue. No other permissions here.

rabbitmqctl set_permissions --vhost vhtest testuserpub '' '^exQ$' ''
rabbitmqctl set_permissions --vhost vhtest testusersub '' '' '^quQ$'

Mission complete. Please try this at home and write to the comments below! Happy RabbitMQ hacking!