Making a RabbitMQ Producer and Consumer with Pika
RabbitMQ is a popular open source AMQP message broker. Technically RabbitMQ implements several messaging protocols with the addition of plugins and adapters, but AMQP is RabbitMQ’s primary purpose and the focus of this blog post. Most popular programming languages have an AMQP library, and the RabbitMQ team claims Pika to be the de facto standard library for Python, and that’s what we’ll be using today. Many applications will be either producing or consuming, but when an application does both, it’s recommended to separate the roles of producer and consumer. I’m going to demonstrate how you can easily perform both roles within an application using a single connection.
Creating a Connection Class
We’ll be making a simple app that sends messages to itself and prints them upon receipt. Pika isn’t thread safe and doesn’t implement auto-reconnecting, but there are ways to make it behave as such. First we’ll need to pip install
the packages pika
and retry
. Next we’ll need a class that represents an AMQP connection in a file named amqpconnection.py
. This class will be a wrapper around pika’s connection
and channel
classes, and the result should be easy to adapt for a more complex application. The class outline will look something like this.
import pika import functools import os from retry import retry class AmqpConnection: def __init__(): def connect(): def setup_queues(): def do_async(): def publish(): def consume():
The constructor will take RabbitMQ connection information and credentials with sane defaults, and it will set our connection
and channel
fields to None
.
def __init__(self, hostname='localhost', port=5672, username='guest', password='guest'): self.hostname = hostname self.port = port self.username = username self.password = password self.connection = None self.channel = None
The connect
method will wrap pika’s constructs and connect to the RabbitMQ server. It will accept a custom connection string as a parameter for human readability.
def connect(self, connection_name='Neat-App'): print('Attempting to connect to', self.hostname) params = pika.ConnectionParameters( host=self.hostname, port=self.port, credentials=pika.PlainCredentials(self.username, self.password), client_properties={'connection_name': connection_name}) self.connection = pika.BlockingConnection(parameters=params) self.channel = self.connection.channel() print('Connected Successfully to', self.hostname)
The setup_queues
method will declare exchanges and queues and bind queues to exchanges if needed. If you wanted to just use a queue without messing with exchanges, you could simply declare a queue; to publish to the queue, use an empty string as the exchange (interpreted as the default exchange) and use the queue name as the routing key.
For our app, we’ll declare a direct exchange called Ping_Exchange
, a queue called Ping_Queue
, and bind the queue to the exchange with routing key ping
.
def setup_queues(self): self.channel.exchange_declare('Ping_Exchange', exchange_type='direct') self.channel.queue_declare('Ping_Queue') self.channel.queue_bind('Ping_Queue', exchange='Ping_Exchange', routing_key='ping')
The do_async
method will wrap the connection’s add_callback_threadsafe
method. This is necessary for multi-threaded apps since pika isn’t thread safe; we must use add_callback_threadsafe
anytime we want to perform a network action on a different thread than the main connection thread. Since add_callback_threadsafe
only accepts a single callable argument, we’ll have to use functools.partial
to pass arguments to the callback.
def do_async(self, callback, *args, **kwargs): if self.connection.is_open: self.connection.add_callback_threadsafe(functools.partial(callback, *args, **kwargs))
The publish
method will wrap the channel’s basic_publish
method. In our application, we’ll publish a given payload to the exchange we declared in the setup_queues
method.
def publish(self, payload): if self.connection.is_open and self.channel.is_open: self.channel.basic_publish( exchange='Ping_Exchange', routing_key='ping', body=payload )
The consume
method will consume messages from the previously declared queue with the given on_message
callback function. This method will be run in the main thread, and it will block on the start_consuming
method. Adding the retry
annotation allows our application to automatically reconnect. In the event of a reconnection attempt, there will be a delay of one second that doubles after each attempt.
@retry(pika.exceptions.AMQPConnectionError, delay=1, backoff=2) def consume(self, on_message): if self.connection.is_closed or self.channel.is_closed: self.connect() self.setup_queues() try: self.channel.basic_consume(queue='Ping_Queue', auto_ack=True, on_message_callback=on_message) self.channel.start_consuming() except KeyboardInterrupt: print('Keyboard interrupt received') self.channel.stop_consuming() self.connection.close() os._exit(1) except pika.exceptions.ChannelClosedByBroker: print('Channel Closed By Broker Exception')
Using the Connection Class
With our AmqpConnection
class we can now interface with a RabbitMQ server and start on our simple application. It’ll send a randomly selected message every few seconds, and, when it is consumed, print it out and track how many times the message has been received. Let’s start by creating a file called app.py
and import our class. The outline of this file will look like the following.
from amqpconnection import AmqpConnection from threading import Thread import time import random messages = [] msg_counts = {} def send_message_loop(): def on_message(): def main(): if __name__ == '__main__': main()
First we'll create a short list of random messages.
messages = ['Hello, World!', 'Foo Bar Baz', 'Crème Anglaise', 'Maddox Lost', 'Epstein didn\'t kill himself']
We’ll define our main
function to create an instance of our AmqpConnection
class, connect to the server, fork off a thread to send random messages in a loop, and consume them with our on_message
callback. The AmqpConnection constructor is smartly written to assume default configuration for a RabbitMQ instance running on your machine, so we’ll leave it that way.
def main(): mq = AmqpConnection() mq.connect() mq.setup_queues() Thread(target=send_message_loop, args=[mq]).start() mq.consume(on_message)
The send_message_loop
function will accept mq
as an argument, the instance of our AmqpConnection
class. The function will run in a separate thread, so when it publishes messages it must wrap the mq.publish
method call within an invocation of mq.do_async
to schedule the call on the main connection thread, ensuring thread safety. If you called mq.publish
directly, an exception would be thrown.
def send_message_loop(mq): while True: random_message = random.choice(messages) mq.do_async(mq.publish, payload=random_message) time.sleep(3.0)
We finally need to define on_message
, our callback function for the mq.consume
method. Pika’s channel.basic_consume
takes a callback whose signature is on_message_callback(channel, method, properties, body)
, so that’s what our function will look like, though we only care about body
in our app, which is the message payload.
def on_message(channel, method, properties, body): msg = body.decode('utf8') if msg not in msg_counts: msg_counts[msg] = 1 else: msg_counts[msg] = msg_counts[msg] + 1 print(f'Time: {int(time.time()) % 1000} --- Message: {msg} --- Count: {msg_counts[msg]}')
Testing the Application
I have a RabbitMQ instance running on ‘192.168.1.111’, so I changed the first line of the main
function to mq = AmqpConnection(hostname='192.168.1.111')
, then I ran python app.py
and saw our app was working as expected. I took down the RabbitMQ server for about a minute, and once it was back up, our application automatically reconnected and resumed operation.
$ python app.py Attempting to connect to 192.168.1.111 Connected Successfully to 192.168.1.111 Time: 583 --- Message: Crème Anglaise --- Count: 1 Time: 586 --- Message: Hello, World! --- Count: 1 Time: 589 --- Message: Epstein didn't kill himself --- Count: 1 Time: 592 --- Message: Foo Bar Baz --- Count: 1 Time: 595 --- Message: Foo Bar Baz --- Count: 2 Time: 598 --- Message: Hello, World! --- Count: 2 Time: 601 --- Message: Maddox Lost --- Count: 1 Time: 604 --- Message: Foo Bar Baz --- Count: 3 Time: 607 --- Message: Foo Bar Baz --- Count: 4 Time: 610 --- Message: Epstein didn't kill himself --- Count: 2 Time: 613 --- Message: Hello, World! --- Count: 3 Time: 616 --- Message: Hello, World! --- Count: 4 Time: 619 --- Message: Hello, World! --- Count: 5 Time: 622 --- Message: Crème Anglaise --- Count: 2 Time: 625 --- Message: Foo Bar Baz --- Count: 5 Time: 628 --- Message: Hello, World! --- Count: 6 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Attempting to connect to 192.168.1.111 Connected Successfully to 192.168.1.111 Time: 763 --- Message: Foo Bar Baz --- Count: 6 Time: 766 --- Message: Crème Anglaise --- Count: 3 Time: 769 --- Message: Crème Anglaise --- Count: 4 Time: 772 --- Message: Maddox Lost --- Count: 2 Time: 775 --- Message: Epstein didn't kill himself --- Count: 3 Time: 778 --- Message: Maddox Lost --- Count: 3 Time: 781 --- Message: Epstein didn't kill himself --- Count: 4 Time: 784 --- Message: Epstein didn't kill himself --- Count: 5 Time: 787 --- Message: Epstein didn't kill himself --- Count: 6 Time: 790 --- Message: Hello, World! --- Count: 7 Time: 793 --- Message: Hello, World! --- Count: 8 Time: 796 --- Message: Foo Bar Baz --- Count: 7 Keyboard interrupt received
I Ctrl-C'd out of there before my provocative application gets its hyoid bone broken or gets sued for $20 million.
Conclusion
I hope you learned a little something about RabbitMQ and how to use Python's Pika library to easily interact with it. This project isn't hosted on my github because it's not a worthwhile application and it's very small. You can easily copy the code from here and adapt it to a real application. Enjoy.