Solving Problems with Rabbit

By Alvaro Videla and Jason J.W. Williams, authors of RabbitMQ in Action

RabbitMQ makes it easy to connect any and all parts of your infrastructure in any way you want. So, the first thing you should always ask is, how can you break your apps apart? Or rather, which parts of your app are order takers and which parts are order processors? This article, based on chapter 6 of Rabbit MQ in Action, With that in mind, dives into some real-world examples of using Rabbit and messaging to solve real problems and answers those questions.

Normally, when programmers hear asynchronous programming they either go running for the hills or think "Cool. Like Node.js right?" Sometimes both. The problem with normal approaches to asynchronous programming is that they're all-or-nothing propositions. You rewrite all of your code so none of it blocks or you're just wasting your time. RabbitMQ gives you a different path. It allows you to fire off a request for processing elsewhere so that your synchronous application can go back to what it was doing. When you start using messaging, you can achieve most of the benefits of pure asynchronous programming without throwing your existing code or knowledge away.

In this article, we'll show you how to use Rabbit to solve a number of real-world problems.

Fire-and-forget models

When we look at the types of problems messaging can solve, one of the main areas that messaging fits is fire-and-forget processing. Whether you need to add contacts to a mailing list or convert 1,000 pictures into thumbnails, you're interested in the jobs getting done but there's no reason they need to be done in real-time. In fact, you usually want to avoid blocking the user who triggered the jobs. We describe these types of jobs as fire-and-forget: you create the job, put it on the exchange, and let your app get back to what it was doing. Depending on your requirements, you may not even need to notify the user when the jobs complete.

Two general types of jobs fit into this pattern:

We're going to show you two different real-world examples of fire-and-forget apps that fit into these two categories. The first is an alerting framework that will allow the apps in your infrastructure to generate administrator alerts without worrying about where they need to go or how to get them there. The second example is a perfect demonstration of batch processing: taking a single image upload and converting into multiple image sizes and formats. When you're done with this section you'll have the most fundamental type of RabbitMQ programming under your belt: triggering work with messages that need no reply. Let's start generating some alerts!

Sending alerts

No matter what type of apps you write, getting notifications when things go awry is critical. Typically you run some sort of service monitor like Nagios to let you know when your app is down or services that it relies upon are unavailable. But what about getting notified when your app is experiencing an unusual number of requests for user logins, all from a single IP? Or perhaps you'd like to allow your customers to be notified when unusual events occur to their data? What you need is for your app to generate alerts, but this opens up a whole new set of questions and adds a lot of complexity to your app. What events do you alert on, and more important, how do you alert? SMS? IM? No matter how you slice it, you're looking at adding a lot of new surface area to your code for bugs to hide in. For example, what happens when the SMS gateway is down? All of your web apps that need to alert now need error-handling code to deal with the SMS server being unavailable.

Worry not, for RabbitMQ is riding to your rescue. The only thing about alerting that inherently needs to be done in your web apps is generating the contents of the alert. There's no reason why your web app needs to know whom the alert should go to, how to get it there, or what to do when the alert deliveries go awry. All you need to do is write a new alerting server app that receives alert messages via Rabbit, and then enhance your web app to publish these alert messages where appropriate.

How should you design this new alerting framework? Particularly, what type of AMQP exchange should you use? You could use a fanout exchange and then create a queue for each alert transmission type (IM, Twitter, SMS, and so on). The advantage is that your web app doesn't have to know anything about how the alerts will be delivered to the ultimate receiver. It just publishes and moves on. The disadvantage is that every alert transmitter gets a copy, so you get flooded with an IM, a text message, and a Twitter direct message every time an alert happens.

A better way to organize your alerting system would be to create three severity levels for your alerts: info, warning, and critical. But with the fanout exchange, any alert published would get sent to all three severity level queues. You could instead create your exchange as a direct exchange, which would allow your web app to tag the alert messages with the severity level as the routing key. But what would happen if you chose a topic exchange? Topic exchanges let you create flexible tags for your messages that target them to multiple queues, but only the queues providing the services you want (unlike the fanout exchange). If you were to use a topic exchange for your alerting framework, you wouldn't be limited to just one severity level per alert. In fact, you could now tag your messages not only with a severity level, but also the type of alert it is. For example, let's say Joe Don Hacker is hitting your statistics server with 10,000 requests per second for map data on your dog walking reservations. In your organization, you need an alert about this to go both to the infrastructure admins (who get all alerts flagged as critical), and to your API dev team (who get all alerts tagged rate_limiting). Since you've chosen a topic exchange for the alerting framework, your web app can tag the alert about such underhanded activity with critical.rate_limiting. Presto! The alert message is automatically routed by RabbitMQ to the critical and rate_limiting queues, because of the exchange bindings you've created: critical.* and *.rate_limiting. Figure 1 shows how the flow of your alerting system will work.

Figure 1 Alerting system flow

To build this alerting framework you'll need the Pika library. Here are some quick steps to get Pika installed (assuming you don't have easy_install yet either):

$ wget http://peak.telecommunity.com/dist/ez_setup.py
...
  (25.9 KB/s) - ez_setup.py saved [10285/10285]

$ python ez_setup.py
...
Installed /Library/Python/2.6/site-packages/setuptools-0.6...
$ easy_install pika
...
Installed /Library/Python/2.6/site-packages/pika-0.9.6-py2.6.egg
Processing dependencies for pika
Finished processing dependencies for pika

Next you need to set up the RabbitMQ user and password the applications will use to publish and receive alert messages. Let's call the user alert_user and give it the password alertme. Let's also grant alert_user read/write/configure permissions on the default vhost /.

From the ./sbin directory of your RabbitMQ install, run the following:

$ ./rabbitmqctl add_user alert_user alertme
Creating user "alert_user" ...
...done.
$ ./rabbitmqctl set_permissions alert_user ".*" ".*" ".*" Setting permissions for user "alert_user" in vhost "/" ...
...done.

With the setup out of the way, you're ready to work on the most important part of your alerting system: the AMQP consumer that will receive the alert messages and transmit them to their destinations. Create a file called alert_consumer.py and put the code in the following listing inside.

Listing 1 Connect to the broker


import json, smtplib 
import pika

if __name__ == "__main__": 
AMQP_SERVER = "localhost" AMQP_USER = "alert_user" AMQP_PASS = "alertme" AMQP_VHOST = "/" AMQP_EXCHANGE = "alerts" creds_broker = pika.PlainCredentials(AMQP_USER, AMQP_PASS)
conn_params = pika.ConnectionParameters(AMQP_SERVER, virtual_host = AMQP_VHOST, credentials = creds_broker) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel()

The first thing this code does is import the libraries you'll need to make the consumer tick, and tell Python where the main body of your program is (if name == " main ":). Next, you establish the settings you need to make a successful connection to your broker (user, name, password, virtual host, and so forth). The settings assume you have RabbitMQ running locally on your development workstation and are using the username and password you just created. For simplicity, let's use the default virtual host / where you're going to create an exchange called alerts. Here's where the real action starts:

channel.exchange_declare( exchange=AMQP_EXCHANGE,
                          type="topic",
                          auto_delete=False)

You're declaring the alerts exchange as a topic exchange with the type="topic" parameter that's passed to channel.exchange_declare. The auto_delete parameter you're also passing to the exchange and queue declarations ensures they'll stick around when the last consumer disconnects from them.

Remember that we talked about two tagging patterns for alerts:

What you need to do is create bindings that implement these rules so that the alert messages go to the queues you want. For your example, let's create a binding that routes any messages with tags starting with critical. to the critical queue. Let's also create a different binding that routes any messages with tags ending in .rate_limit to the rate_limit queue. Go ahead and create the critical and rate_limit queues and bind them, as shown in the following listing.

Listing 2 Declare and bind queues and exchanges for the alert topics

channel.queue_declare(queue="critical", auto_delete=False)
channel.queue_bind(queue="critical", 
                   exchange="alerts", 
routing_key="critical.*") channel.queue_declare(queue="rate_limit", auto_delete=False) channel.queue_bind(queue="rate_limit",
exchange="alerts", routing_key="*.rate_limit")

You'll notice that the binding rule created for critical alerts is critical.* and not critical*. This is because RabbitMQ uses . as a separator for different parts of a tag. If you were to use critical* as the rule, only messages tagged exactly critical* would match. What you want is to match critical.mywebapp, critical.rate_limit, or anything that starts with critical.; hence the binding rule should be critical.*. When using topic exchanges, it's important to be careful to design your tagging patterns to use . to separate the parts of the tag you want to match separately on.

You could've also passed durable=True to the queue declarations and bindings, which would ensure that they survived a restart of RabbitMQ. Since restarting your consumer will automatically create the exchange, queues, and bindings it needs, you don't need to worry about durability for your alerting system. The other reason you're not concerned about making the queues durable is because you're not going to flag your alert messages as durable either. Your system could be handling very high volumes of alerts, so you want to ensure the highest performance and not use durable messaging, which is persisted to relatively slow disk.

You might be thinking, "We have exchanges, queues, and bindings … where do we turn an alert message into an actual alert?" You do that by setting up your consumer subscriptions and starting the listener loop, as in the following listing.

Listing 3 Attach the alert processors


channel.basic_consume( critical_notify, 
                       queue="critical", 
                       no_ack=False, 
                       consumer_tag="critical")

channel.basic_consume( rate_limit_notify, 
                       queue="rate_limit", 
                       no_ack=False, 
                       consumer_tag="rate_limit")

print "Ready for alerts!" 
channel.start_consuming()

Let's take the channel.basic_consume call apart and explain what each parameter does:

Once you've established the consumer subscriptions, you only need to call channel .start_consuming() to start your consumer listening for messages. You may have noticed that the callback functions (critical_notify and rate_limit_notify) you specified for your subscriptions haven't been defined yet. Let's go ahead and specify one of those in the following listing.

Listing 4 Critical alerts processor

def critical_notify(channel, method, header, body):
    """Sends CRITICAL alerts to administrators via e-mail.""" 

    EMAIL_RECIPS = ["ops.team@ourcompany.com",]

    message = json.loads(body) 
send_mail(EMAIL_RECIPS, "CRITICAL ALERT", message)
print ("Sent alert via e-mail! Alert Text: %s " + \ "Recipients: %s") % (str(message), str(EMAIL_RECIPS)) channel.basic_ack(delivery_tag=method.delivery_tag)

When a consumer callback is called, Pika passes in four parameters related to the message:

In critical_notify the first thing to check is the content_type header. Your alerts will be JSON encoded, so you'll check the content type to make sure it's application/ json. The content_type is optional, but it's useful when you want to communicate encoding information about the message between producer and consumer. After you've verified the content type, you decode the message body from JSON to text and construct an email to the Ops Team (ops.team@ourcompany.com) containing the alert text. Once the email alert has been successfully sent, you send an acknowledgement back to RabbitMQ that you've received the message. The acknowledgement is important because RabbitMQ won't give you another message from the queue until you've acknowledged the last one you received. By putting the acknowledgement as the last operation, you ensure that if your consumer were to crash, RabbitMQ would assign the message to another consumer.

With all of the pieces of your consumer explained, let's look at the whole thing put together in the following listing.

Listing 5 Alert consumer—alert_consumer.py, start to finish


import json, smtplib 
import pika

def send_mail(recipients, subject, message): 
    """E-mail generator for received alerts.""" 
    headers = ("From: %s\r\nTo: \r\nDate: \r\n" + \
               "Subject: %s\r\n\r\n") % ("alerts@ourcompany.com", 
                                         subject)

    smtp_server = smtplib.SMTP() 
    smtp_server.connect("mail.ourcompany.com", 25)    
    smtp_server.sendmail("alerts@ourcompany.com",
                         recipients,
                         headers + str(message))
    smtp_server.close()

def critical_notify(channel, method, header, body): 
"""Sends CRITICAL alerts to administrators via e-mail.""" EMAIL_RECIPS = ["ops.team@ourcompany.com",] message = json.loads(body)
send_mail(EMAIL_RECIPS, "CRITICAL ALERT", message)
print ("Sent alert via e-mail! Alert Text: %s " + \ "Recipients: %s") % (str(message), str(EMAIL_RECIPS)) channel.basic_ack(delivery_tag=method.delivery_tag)
def rate_limit_notify(channel, method, header, body): """Sends the message to the administrators via e-mail.""" EMAIL_RECIPS = ["api.team@ourcompany.com",] message = json.loads(body)
#(f-asc_10) Transmit e-mail to SMTP server send_mail(EMAIL_RECIPS, "RATE LIMIT ALERT!", message) print ("Sent alert via e-mail! Alert Text: %s " + \ "Recipients: %s") % (str(message), str(EMAIL_RECIPS)) channel.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
AMQP_SERVER = "localhost" AMQP_USER = "alert_user" AMQP_PASS = "alertme" AMQP_VHOST = "/" AMQP_EXCHANGE = "alerts" creds_broker = pika.PlainCredentials(AMQP_USER, AMQP_PASS)
conn_params = pika.ConnectionParameters(AMQP_SERVER, virtual_host = AMQP_VHOST, credentials = creds_broker) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel() channel.exchange_declare( exchange=AMQP_EXCHANGE,
type="topic", auto_delete=False) channel.queue_declare(queue="critical", auto_delete=False)
channel.queue_bind(queue="critical", exchange="alerts", routing_key="critical.*") channel.queue_declare(queue="rate_limit", auto_delete=False) channel.queue_bind(queue="rate_limit", exchange="alerts", routing_key="*.rate_limit") channel.basic_consume( critical_notify,
queue="critical", no_ack=False, consumer_tag="critical") channel.basic_consume( rate_limit_notify, queue="rate_limit", no_ack=False, consumer_tag="rate_limit") print "Ready for alerts!" channel.start_consuming()

You now have an elegant consumer that will translate alert AMQP messages into email alerts targeted at different groups simply by manipulating the message tag. Adding additional alert types and transmission methods is simple. All you need to do is create a consumer callback to provide the new alert processing and connect it to a queue that's populated via a binding rule for the new alert type. Your consumer wouldn't be very useful without alerts for it to process. So let's see what it takes to produce alerts that your consumer can act on.

Our goal when we started this section was to make producing alerts simple and uncomplicated for existing apps. If you look at the following listing, you'll see that, though the consumer takes some 90 lines of code to process an alert, the alert itself can be generated in less than 20 lines.

Listing 6 Alert generator example—alert_producer.py

import json, pika
from optparse import OptionParser  
opt_parser = OptionParser() opt_parser.add_option("-r", "--routing-key", dest="routing_key", help="Routing key for message " + \ " (e.g. myalert.im)") opt_parser.add_option("-m", "--message", dest="message", help="Message text for alert.") args = opt_parser.parse_args()[0] creds_broker = pika.PlainCredentials("alert_user", "alertme")
conn_params = pika.ConnectionParameters("localhost", virtual_host = "/", credentials = creds_broker) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel() msg = json.dumps(args.message)
msg_props = pika.BasicProperties() msg_props.content_type = "application/json" msg_props.durable = False channel.basic_publish(body=msg, exchange="alerts", properties=msg_props, routing_key=args.routing_key) print ("Sent message %s tagged with routing key '%s' to " + \ "exchange '/'.") % (json.dumps(args.message), args.routing_key)

The sample producer can be run from the command line to generate alerts with any contents and routing tags you like. The first part of the program simply extracts the message and the routing key from the command line. From there you're connecting to the RabbitMQ broker identically to the way you did in the alert consumer. Where things get interesting is when you publish the message:

msg = json.dumps(args.message) 
msg_props = pika.BasicProperties() 
msg_props.content_type = "application/json" 
msg_props.durable = False

channel.basic_publish(body=msg, 
                      exchange="alerts", 
                      properties=msg_props, 
                      routing_key=args.routing_key)

Five lines of code is all it takes for you to create the alert message and tag it with the appropriate routing key (say, critical.mywebapp). After you JSON-encode the alert's message text, you create a BasicProperties object called msg_props. This is where you can set the AMQP message's optional content type header, and also where you'd make the message durable if you wanted persistency. Finally, in one line of code you publish the message to the alerts exchange with the routing key that classifies what type of alert it is. Since messages with routing keys that don't match any bindings will be discarded, you can even tag alerts with routing keys for alert types you don't support yet. As soon as you do support those alert types, any alert messages with those routing keys will be routed to the right consumer. The last bit to note about the consumer is the block_on_flow_control flag you're passing to channel.basic_publish. This tells Pika to hold off on returning from basic_publish if RabbitMQ's flow control mechanism tells it to stop publishing. When RabbitMQ tells Pika it's okay to proceed, it'll finally return, allowing more publishing to occur. This makes your producer play nicely with RabbitMQ so that if Rabbit becomes overloaded, it can throttle the producer to slow it down. If you're publishing alerts from another program that can't afford to be blocked, be sure to set block_on_flow_control to false.

In only 100 lines of code total, you've given your web apps a flexible and scalable way to issue alerts that then get transmitted asynchronously to their recipients. You've also seen how beneficial the fire-and-forget messaging pattern can be when you need to transmit information to be processed quickly but don't need to know the result of the processing. For example, you could easily extend the alert consumer to add an additional processor that uses the binding pattern *.* to log a copy of all alerts to a database. But alerting and logging are far from the only uses of the fire-and-forget messaging pattern. Let's look at an example where you need to perform CPU-intensive processing on the contents of the message, and how RabbitMQ can help you move that into an asynchronous operation.

Parallel processing

Say you started running your own social network website and you just deployed a shiny new feature: picture uploads. People want to share their holiday pictures with friends and family—perhaps you've seen this somewhere. Also, to improve the interaction among users, you want to notify their friends when one of their contacts has uploaded a new picture. A week after the new feature release, the marketing guys come to your desk asking you to give some points to the users, a reward for the pictures they upload to encourage them to keep submitting pictures and improve the activity on the site. You agree and add a few lines of code, and now you hook a reward system into the upload picture process. It looks a bit nasty for your coder eyes, but it's working as expected and the boss is happy with the results.

Next month the bandwidth bill arrives and the ops guy is angry because the bandwidth usage has doubled. The external API offered to clients is displaying full-size images when it should be offering links to small thumbnails. So you'd better get your uploading code generating those thumbnails too. What to do? The easy way would be to add one more hook in there and execute the thumbnail generation directly from the upload controller, but wait … If for every picture upload you have to execute a picture resize operation, this means the frontend web servers will get overloaded, so you can't just do that. And users of your website don't want to wait for your picture processing script to get a confirmation that their upload is okay. This means you need a smarter solution, something that allows you to run tasks in parallel and in a different machine than the one serving requests to the users.

You can see that resizing a picture, rewarding the user, and notifying friends are three separate tasks. Those tasks are independent in that they don't have to wait for each other's results to be able to run, which means that you can refactor your code not only to process the image resize separately, but also to do those other things in parallel. Also, if you achieve such design, you can cope with new requirements easily. You need to log every picture upload? You just add a new worker to do the logging, and so on.

This sounds nice, almost like a dream, but all this parallelization stuff seems hard to accomplish. How much do you have to code to achieve message multicast? Not much; enter the fanout exchange.

As we said when we described the exchange types, the fanout exchange will put a copy of the message into the bound queues, as simple as that, and that's what you need for your upload picture module. Every time the user uploads a picture, instead of doing all the processing work right away, you'll publish a message with the picture metainformation and then let your asynchronous workers do the rest in parallel. RabbitMQ will ensure that each consumer gets a copy of the message. It's the worker's duty to process it accordingly.

The messages will contain the following metainformation about the picture: the image ID of the picture, the user ID, and the path to locate the picture on the filesystem. You'll use JSON as the data exchange format. This will make it easier in the future if you need to support several languages for the different tasks. Your messages will look like this:

{
    'image_id': 123456,
    'user_id': 6543,
    'image_path': '/path/to/pic.jpg'
}

Figure 2 shows that you'll declare an upload-pictures exchange and will bind three queues to it: resize-picture, add-points, and notify-friends.

Figure 2 Uploading pictures

From this design you can tell that adding a new kind of task, like logging, is just a matter of declaring a new queue and binding it to the upload-pictures exchange. Your focus as developers will be to code each of the workers and the publishing logic; RabbitMQ will do the rest.

So, let's start by adding the publisher logic into the upload picture module, as in the following listing. You omit the logic for taking the picture from the POST request and moving it to some place on the filesystem.

Listing 7 Upload pictures publisher

<?php

$channel->exchange_declare('upload-pictures',
  'fanout', false, true, false); 
$metadata = json_encode(array( 'image_id' => $image_id, 'user_id' => $user_id, 'image_path' => $image_path ));
$msg = new AMQPMessage($metadata, array('content_type' => 'application/json', 'delivery_mode' => 2));
$channel->basic_publish($msg, 'upload-pictures');
?>

Let's see what you did here. The code for obtaining an AMQP channel isn't present since we covered that in previous examples. At B you declare the upload-pictures exchange, with a fanout type and with durable properties. Then at C you create the message metadata encoded as JSON. The $image_id, _$user_id, and $image_path were initialized during the upload process. At D you create a new instance of the message specifying the deliver_mode as 2 to make it persistent. Finally at E you publish the message to the upload-pictures exchange. You don't need to provide a routing key since the messages will be fanned-out to all the bound queues.

Next let's create one of the consumers, the one for adding points to the users after each upload. Check inside add-points-consumer.php for the complete code, since the following listing omits bits that we've covered before, like including the AMQP libraries or instantiating the connection and the channel.

Listing 8 Add points consumer

<?php

$channel->exchange_declare('upload-pictures',
    'fanout', false, true, false); 
$channel->queue_declare('add-points', false, true, false, false);
$channel->queue_bind('add-points', 'upload-pictures');
$consumer = function($msg){};
$channel->basic_consume($queue, $consumer_tag, false, false, false, false, $consumer);
?>

The code is straightforward. At #1 you declare the topic exchange as when publishing the message; then at #2 you create the add-points queue where the message will be delivered by RabbitMQ. You bind that queue at #3 to the exchange using the empty routing key. At #4 you omit the code for your callback function for now; at #5 you send the basic_consume command to prepare the consumer. You also omit the wait loop and the channel and connection cleanup code. The following listing shows the callback function.

Listing 9 Add points callback function

<?php

function add_points_to_user($user_id){  
echo sprintf("Adding points to user: %s\n", $user_id); } $consumer = function($msg){
if($msg->body == 'quit'){ $msg->delivery_info['channel']->
basic_cancel($msg->delivery_info['consumer_tag']); } $meta = json_decode($msg->body, true);
add_points_to_user($meta['user_id']);
$msg->delivery_info['channel']-> basic_ack($msg->delivery_info['delivery_tag']);
}; ?>

In listing 9 you have the code for actually processing the message. At B you add a dummy function that for now just echoes that it's giving points to the user. In a real-world application you'd include the logic for increasing the user points, say on a Redis database. Then at C you define the consumer callback function. The tricky bit of code at D is a hook to stop consuming messages. If the message body equals quit, then you stop the consumer. This simple trick is sure to close the channel and the connection in a clean way. Then at E you pass the message body to the json_decode function to obtain the metadata. You give true as the second parameter to make sure PHP will decode the JSON object as an associative array. At F you call the add_points_to_user function, passing as parameters the user_id that you obtained from the decoded message.

Let's test the implementation. You'll just copy the code from the publisher and modify the logic for creating the message to have a simple test script. In this case you'll take three arguments from the command line: image ID, user ID, and image path.

You'll encode them and send them over RabbitMQ to the consumer that you created before. We won't explain the following listing because it's the same as you saw before in listing 7.

Listing 10 Upload pictures test

<?php
require_once('../lib/php-amqplib/amqp.inc');
require_once('../config/config.php');

$conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $conn->channel();

$channel->exchange_declare('upload-pictures',
  'fanout', false, true, false);

$metadata = json_encode(array(
      'image_id' => $argv[1],
      'user_id' => $argv[2],
      'image_path' => $argv[3]
));

$msg = new AMQPMessage($metadata, array(
      'content_type' => 'application/json',
      'delivery_mode' => 2));

$channel->basic_publish($msg, 'upload-pictures');

$channel->close();
$conn->close();
?>

Save this code in a file called fanout-publisher.php and open two terminal windows. In the first window, launch the add-points-consumer.php script:

$ php add-points-consumer.php

In the other window, execute the publisher, passing some random parameters to simulate a request:

$ php fanout-publisher.php 1 2 /path/to/pic.jpg

If everything went well, you can switch to the first terminal to see the following message:

Adding points to user: 2

So far nothing impressive. Let's add another consumer to see a fanout exchange and parallel processing in action. Put the code from the following listing in the file resize-picture-consumer.php.

Listing 11 Resize picture consumer

<?php

require_once('../lib/php-amqplib/amqp.inc');
require_once('../config/config.php');

$conn = new AMQPConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $conn->channel();

$channel->exchange_declare('upload-pictures',
    'fanout', false, true, false);

$channel->queue_declare('resize-picture', 
    false, true, false, false); 
$channel->queue_bind('resize-picture', 'upload-pictures');
$consumer = function($msg){ if($msg->body == 'quit'){ $msg->delivery_info['channel']-> basic_cancel($msg->delivery_info['consumer_tag']); } $meta = json_decode($msg->body, true); resize_picture($meta['image_id'], $meta['image_path']);
$msg->delivery_info['channel']-> basic_ack($msg->delivery_info['delivery_tag']); }; function resize_picture($image_id, $image_path){
echo sprintf("Resizing picture: %s %s\n", $image_id, $image_path); } $channel->basic_consume($queue, $consumer_tag, false, false, false, false, $consumer); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $conn->close(); ?>

The code in listing 11 is basically the same from listing 8. The interesting bits are at #1 and #2 where you create and bind the resize-picture to the upload-picture exchange. You can see that this uses the same exchange as the previous example. As always with AMQP, the messages are published to one exchange and then, depending on the bindings, they can be routed to one or several queues (or none at all).

The code continues straightforwardly; inside the consumer callback you call the resize_picture #3 function passing the image_id and image_path that you got from the metadata. Finally the function resize_picture (#4) echoes a message to tell you that it's resizing the image. As before, on a real setup, here you'd want to have the code to actually resize the image.

Now, open a third window on the terminal and type

$ php resize-picture-consumer.php

Then go back to the window where you have the publisher script and run it again:

$ php fanout-publisher.php 1 2 /path/to/pic.jpg

If everything went fine, then you should see on each consumer window the following messages:

Adding points to user: 2

and

Resizing picture: 1 /path/to/pic.jpg

Based on the examples from the add points to user consumer, you can see that if you integrate RabbitMQ into your solution, then scaling the code to new requirements is simple. To add the image resize consumer you just need a function that's based on the image ID and path, and is able to load the picture from the filesystem, resize it (probably using some tool like Imagemagick), and then update the record on the database based on the image ID. The same goes for notifying the user's friends. Taking the user ID as a parameter, you can retrieve the user's contacts from the database and then send a notification, perhaps in the form of an email, to each of those friends.

What you can learn from this example is that the power of messaging with RabbitMQ resides in how you combine exchanges and queues together. If you need some way to filter out messages, then you can use a topic exchange as in the previous section. Does one action in your application trigger several others that can run in parallel? Then use topic exchanges. If you want to "spy" on a flow of messages and then quit without leaving traces, then use anonymous queues set to be autodeleted. Once you get used to thinking about messaging patterns, you'll start seeing how simple some programming tasks can become.

But the advantages of this design over the one where everything happens in the same module don't stop here. Imagine now that the pictures are being resized too slowly; you need more computing power and you don't want to change the code. That's easy to solve. You can launch more consumer processes and RabbitMQ will take care of distributing the messages accordingly. Even if they're on different machines, it's no problem. Try to imagine now how you'd scale the original code, where everything happened sequentially while you were serving the request to the user. As you saw, parallel processing with RabbitMQ is simple.

Here are some other Manning titles you might be interested in:


Spring Batch in Action

Spring Batch in Action
Arnaud Cogoluegnes, Thierry Templier, Gary Gregory, Olivier Bazoud

Spring in Practice

Spring in Practice
Willie Wheeler, John Wheeler, and Joshua White

Spring Integration in Action

Spring Integration in Action
Mark Fisher, Jonas Partner, Marius Bogoevici, and Iwein Fuld