Home

Making a RabbitMQ Producer and Consumer with Pika

RabbitMQ is a popular open source AMQP message broker. Technically RabbitMQ implements several messaging protocols with the addition of plugins and adapters, but AMQP is RabbitMQ’s primary purpose and the focus of this blog post. Most popular programming languages have an AMQP library, and the RabbitMQ team claims Pika to be the de facto standard library for Python, and that’s what we’ll be using today. Many applications will be either producing or consuming, but when an application does both, it’s recommended to separate the roles of producer and consumer. I’m going to demonstrate how you can easily perform both roles within an application using a single connection.

Creating a Connection Class

We’ll be making a simple app that sends messages to itself and prints them upon receipt. Pika isn’t thread safe and doesn’t implement auto-reconnecting, but there are ways to make it behave as such. First we’ll need to pip install the packages pika and retry. Next we’ll need a class that represents an AMQP connection in a file named amqpconnection.py. This class will be a wrapper around pika’s connection and channel classes, and the result should be easy to adapt for a more complex application. The class outline will look something like this.

import pika
import functools
import os
from retry import retry

class AmqpConnection:
    def __init__():
    def connect():
    def setup_queues():
    def do_async():
    def publish():
    def consume():
    

The constructor will take RabbitMQ connection information and credentials with sane defaults, and it will set our connection and channel fields to None.

def __init__(self, hostname='localhost', port=5672, username='guest', password='guest'):
    self.hostname = hostname
    self.port = port
    self.username = username
    self.password = password
    self.connection = None
    self.channel = None      
    

The connect method will wrap pika’s constructs and connect to the RabbitMQ server. It will accept a custom connection string as a parameter for human readability.

def connect(self, connection_name='Neat-App'):
    print('Attempting to connect to', self.hostname)
    params = pika.ConnectionParameters(
        host=self.hostname,
        port=self.port,
        credentials=pika.PlainCredentials(self.username, self.password),
        client_properties={'connection_name': connection_name})
    self.connection = pika.BlockingConnection(parameters=params)
    self.channel = self.connection.channel()
    print('Connected Successfully to', self.hostname)
    

The setup_queues method will declare exchanges and queues and bind queues to exchanges if needed. If you wanted to just use a queue without messing with exchanges, you could simply declare a queue; to publish to the queue, use an empty string as the exchange (interpreted as the default exchange) and use the queue name as the routing key.
For our app, we’ll declare a direct exchange called Ping_Exchange, a queue called Ping_Queue, and bind the queue to the exchange with routing key ping.

def setup_queues(self):
    self.channel.exchange_declare('Ping_Exchange', exchange_type='direct')
    self.channel.queue_declare('Ping_Queue')
    self.channel.queue_bind('Ping_Queue', exchange='Ping_Exchange', routing_key='ping')
    

The do_async method will wrap the connection’s add_callback_threadsafe method. This is necessary for multi-threaded apps since pika isn’t thread safe; we must use add_callback_threadsafe anytime we want to perform a network action on a different thread than the main connection thread. Since add_callback_threadsafe only accepts a single callable argument, we’ll have to use functools.partial to pass arguments to the callback.

def do_async(self, callback, *args, **kwargs):
    if self.connection.is_open:
        self.connection.add_callback_threadsafe(functools.partial(callback, *args, **kwargs))
    

The publish method will wrap the channel’s basic_publish method. In our application, we’ll publish a given payload to the exchange we declared in the setup_queues method.

def publish(self, payload):
    if self.connection.is_open and self.channel.is_open:
        self.channel.basic_publish(
            exchange='Ping_Exchange',
            routing_key='ping',
            body=payload
        )
    

The consume method will consume messages from the previously declared queue with the given on_message callback function. This method will be run in the main thread, and it will block on the start_consuming method. Adding the retry annotation allows our application to automatically reconnect. In the event of a reconnection attempt, there will be a delay of one second that doubles after each attempt.

@retry(pika.exceptions.AMQPConnectionError, delay=1, backoff=2)
def consume(self, on_message):
    if self.connection.is_closed or self.channel.is_closed:
        self.connect()
        self.setup_queues()
    try:
        self.channel.basic_consume(queue='Ping_Queue', auto_ack=True, on_message_callback=on_message)
        self.channel.start_consuming()
    except KeyboardInterrupt:
        print('Keyboard interrupt received')
        self.channel.stop_consuming()
        self.connection.close()
        os._exit(1)
    except pika.exceptions.ChannelClosedByBroker:
        print('Channel Closed By Broker Exception')
    

Using the Connection Class

With our AmqpConnection class we can now interface with a RabbitMQ server and start on our simple application. It’ll send a randomly selected message every few seconds, and, when it is consumed, print it out and track how many times the message has been received. Let’s start by creating a file called app.py and import our class. The outline of this file will look like the following.

from amqpconnection import AmqpConnection
from threading import Thread
import time
import random

messages = []
msg_counts = {}

def send_message_loop():
def on_message():
def main():

if __name__ == '__main__': main()
    

First we'll create a short list of random messages.

messages = ['Hello, World!', 'Foo Bar Baz', 'Crème Anglaise', 'Maddox Lost', 'Epstein didn\'t kill himself']

We’ll define our main function to create an instance of our AmqpConnection class, connect to the server, fork off a thread to send random messages in a loop, and consume them with our on_message callback. The AmqpConnection constructor is smartly written to assume default configuration for a RabbitMQ instance running on your machine, so we’ll leave it that way.

def main():
    mq = AmqpConnection()
    mq.connect()
    mq.setup_queues()
    Thread(target=send_message_loop, args=[mq]).start()
    mq.consume(on_message)      
    

The send_message_loop function will accept mq as an argument, the instance of our AmqpConnection class. The function will run in a separate thread, so when it publishes messages it must wrap the mq.publish method call within an invocation of mq.do_async to schedule the call on the main connection thread, ensuring thread safety. If you called mq.publish directly, an exception would be thrown.

def send_message_loop(mq):
  while True:
      random_message = random.choice(messages)
      mq.do_async(mq.publish, payload=random_message)
      time.sleep(3.0)
    

We finally need to define on_message, our callback function for the mq.consume method. Pika’s channel.basic_consume takes a callback whose signature is on_message_callback(channel, method, properties, body), so that’s what our function will look like, though we only care about body in our app, which is the message payload.

def on_message(channel, method, properties, body):
    msg = body.decode('utf8')
    if msg not in msg_counts:
        msg_counts[msg] = 1
    else:
        msg_counts[msg] = msg_counts[msg] + 1
    print(f'Time: {int(time.time()) % 1000} --- Message: {msg} --- Count: {msg_counts[msg]}')
    

Testing the Application

I have a RabbitMQ instance running on ‘192.168.1.111’, so I changed the first line of the main function to mq = AmqpConnection(hostname='192.168.1.111'), then I ran python app.py and saw our app was working as expected. I took down the RabbitMQ server for about a minute, and once it was back up, our application automatically reconnected and resumed operation.

$ python app.py
Attempting to connect to 192.168.1.111
Connected Successfully to 192.168.1.111
Time: 583 --- Message: Crème Anglaise --- Count: 1
Time: 586 --- Message: Hello, World! --- Count: 1
Time: 589 --- Message: Epstein didn't kill himself --- Count: 1
Time: 592 --- Message: Foo Bar Baz --- Count: 1
Time: 595 --- Message: Foo Bar Baz --- Count: 2
Time: 598 --- Message: Hello, World! --- Count: 2
Time: 601 --- Message: Maddox Lost --- Count: 1
Time: 604 --- Message: Foo Bar Baz --- Count: 3
Time: 607 --- Message: Foo Bar Baz --- Count: 4
Time: 610 --- Message: Epstein didn't kill himself --- Count: 2
Time: 613 --- Message: Hello, World! --- Count: 3
Time: 616 --- Message: Hello, World! --- Count: 4
Time: 619 --- Message: Hello, World! --- Count: 5
Time: 622 --- Message: Crème Anglaise --- Count: 2
Time: 625 --- Message: Foo Bar Baz --- Count: 5
Time: 628 --- Message: Hello, World! --- Count: 6
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Attempting to connect to 192.168.1.111
Connected Successfully to 192.168.1.111
Time: 763 --- Message: Foo Bar Baz --- Count: 6
Time: 766 --- Message: Crème Anglaise --- Count: 3
Time: 769 --- Message: Crème Anglaise --- Count: 4
Time: 772 --- Message: Maddox Lost --- Count: 2
Time: 775 --- Message: Epstein didn't kill himself --- Count: 3
Time: 778 --- Message: Maddox Lost --- Count: 3
Time: 781 --- Message: Epstein didn't kill himself --- Count: 4
Time: 784 --- Message: Epstein didn't kill himself --- Count: 5
Time: 787 --- Message: Epstein didn't kill himself --- Count: 6
Time: 790 --- Message: Hello, World! --- Count: 7
Time: 793 --- Message: Hello, World! --- Count: 8
Time: 796 --- Message: Foo Bar Baz --- Count: 7
Keyboard interrupt received
    

I Ctrl-C'd out of there before my provocative application gets its hyoid bone broken or gets sued for $20 million.

Conclusion

I hope you learned a little something about RabbitMQ and how to use Python's Pika library to easily interact with it. This project isn't hosted on my github because it's not a worthwhile application and it's very small. You can easily copy the code from here and adapt it to a real application. Enjoy.