120+ Engineers
20+ Countries
850+ Projects
750+ Satisfied Clients
4.9 Clutch
120+ Engineers
20+ Countries
850+ Projects
750+ Satisfied Clients
4.9 Clutch
120+ Engineers
20+ Countries
850+ Projects
750+ Satisfied Clients

Building a Pub/Sub System in Python with RabbitMQ

  • Install and configure RabbitMQ locally or on a server

  • Connect Python apps to RabbitMQ using the pika library

  • Set up publishers to send messages to exchanges

  • Create subscribers to consume messages from queues

  • Choose the right exchange type: direct, fanout, or topic

  • Enable message acknowledgments for reliable delivery

  • Use durable queues and persistent messages for fault tolerance

  • Test with multiple publishers/subscribers to ensure scalability

Last Update: 28 Nov 2024

Building a Pub/Sub System in Python with RabbitMQ image

RabbitMQ is a powerful message broker that supports multiple messaging patterns, one of the most popular being publish/subscribe (pub/sub). This pattern allows a producer to send a message that multiple subscribers can receive and process independently.

In this blog, we’ll build a simple pub/sub system in Python using RabbitMQ. Our setup will include.

  • A producer that publishes messages.
  • Multiple subscribers that receive every message published by the producer.

Prerequisites

  1. Install RabbitMQ on your system. Follow the official guide or use Docker for your operating system (I use Docker in this blog to install RabbitMQ).
  2. Install the pika library for Python:
    pip install pika

     

 

Install RabbitMQ in the Docker (Optional)

Ensure Docker is installed on your system. If not, download and install it from the Docker website.

1. Pull the RabbitMQ Docker Image

RabbitMQ provides an official Docker image. Pull the latest version using the following command:

docker pull rabbitmq:management

The management tag includes the RabbitMQ Management Plugin, providing a user-friendly web interface for monitoring and managing RabbitMQ.


2. Run the RabbitMQ Container

Run the RabbitMQ container with the management plugin enabled:

docker run --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
  • 5672: Default port for RabbitMQ messaging.
  • 15672: Port for the RabbitMQ Management Web Interface.
  • --name rabbitmq: Assigns the name rabbitmq to the container.

 

3. Access the RabbitMQ Web Interface

  1. Open a browser and navigate to:
    http://localhost:15672
  2. Use the default login credentials:
    • Username: guest
    • Password: guest

How Pub/Sub Works in RabbitMQ

In RabbitMQ, the pub/sub pattern is implemented using exchanges and queues:

  • The producer sends messages to an exchange.
  • The exchange routes these messages to all queues bound to it.
  • Each subscriber listens to a specific queue and processes incoming messages.

We’ll use the fanout exchange type, which ensures that every queue bound to the exchange receives a copy of the message.

Setting Up the Producer.

Here’s how the producer sends messages to the exchange:

# producer.py
import pika

def publish_message(message):
    # Connect to RabbitMQ
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host="127.0.0.1",
            port="5672",
            credentials=pika.credentials.PlainCredentials(
                'guest',
                'guest',
            )
        ),
    )
    channel = connection.channel()

    # Declare the exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    # Publish the message
    channel.basic_publish(exchange='logs', routing_key='', body=message)
    print(f"Sent '{message}'")

    # Close the connection
    connection.close()

if __name__ == "__main__":
    message = "Hello, Subscribers! This is a test message."
    publish_message(message)

Creating a Subscriber

Each subscriber listens to messages from a queue bound to the exchange. Here’s the subscriber code:

# subscriber.py
import pika

def consume_messages(subscriber_id):
    # Connect to RabbitMQ
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(
            host="127.0.0.1",
            port="5672",
            credentials=pika.credentials.PlainCredentials(
                'guest',
                'guest',
            )
        ),
    )
    channel = connection.channel()

    # Declare the exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    # Create a queue with a random name and bind it to the exchange
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange='logs', queue=queue_name)

    print(f"Subscriber {subscriber_id} waiting for messages. To exit press CTRL+C")

    # Callback function to process messages
    def callback(ch, method, properties, body):
        print(f"Subscriber {subscriber_id} received {body.decode()}")
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # Start consuming messages
    channel.basic_consume(queue=queue_name, on_message_callback=callback)
    channel.start_consuming()

if __name__ == "__main__":
    subscriber_id = "1"  # Change this for different subscribers
    consume_messages(subscriber_id)

Testing the System

  • Start RabbitMQ: Ensure RabbitMQ is running on your system.

  • Run Subscribers: Open multiple terminal windows and start the subscriber script with different IDs:

python subscriber.py 1
python subscriber.py 2

Send Messages from the Producer: In another terminal, run the producer script:

python producer.py

Each subscriber should print the received message.

Understanding the Code

  1. Exchange: The fanout exchange type ensures every queue bound to it gets a copy of the message.
  2. Temporary Queues: Subscribers use randomly named queues (queue='') to receive messages.
  3. Callback: The subscriber’s callback function processes incoming messages.

Real-World Applications

The pub/sub pattern is commonly used in:

  • Broadcasting notifications to multiple services.
  • Log aggregation systems.
  • Real-time event streaming for microservices.

Conclusion

In this blog, we demonstrated how to implement a simple pub/sub system using RabbitMQ and Python. This foundation can be extended to build more complex distributed systems.

Let me know if you have any suggestions, or feel free to share your thoughts in the comments!

 

Frequently Asked Questions

Trendingblogs
Get the best of our content straight to your inbox!

By submitting, you agree to our privacy policy.

Have a Project To Discuss?

We're ready!

Let's
Talk