Open source messaging middleware

RabbitMQ Messaging

There are a variety of ways to which AMQP messages can be published and subscribed to (Figure 4). The simplest way is to create a queue, and then messages can be published and subscribed to from that queue.

Figure 4: RabbitMQ messaging overview.

To help with the distribution and filtering of messages, AMQP supports a number of different exchange types. Messages in an exchange use bindings based on a routing key to link them to a queue.

The main types of exchanges are direct, fanout, headers, and topic. An IoT example of a direct exchange would be if a group of Raspberry Pi sensor values were going into a Rasp Pi sensor exchange. When the Rasp Pi publishes a sensor result to the exchange, the message also includes a routing key to link the message to the correct queue (Figure 5).

Figure 5: RabbitMQ direct exchange routing.

An IoT example of a fanout exchange would be critical sensor failures. A sensor failure message is sent to Bill and Sam's work queues and the All Maintenance Points queue all at the same time (Figure 6).

Figure 6: RabbitMQ fanout exchange routing.

Connecting MQTT

After the MQTT plugin is installed, RabbitMQ can act as a standalone MQTT broker.

For an MQTT project, any ESP8266-supported Arduino hardware can be used. There are a number of MQTT Arduino libraries that are available. For this project, I used the PubSubClient library that can be installed using the Arduino Library Manager.

As a test project, I used a low cost MQ-2 Smoke Gas Sensor ($3) [3] that measures a combination of LPG, alcohol, propane, hydrogen, CO, and even methane. Note to fully use this sensor, some calibration is required. On the MQ-2 sensor, the analog signal is connected to Arduino pin A0 and the analogRead(thePin) function is used to read the sensor value.

Listing 2 is an Arduino example that reads the MQ-2 sensor and publishes the result to the RabbitMQ MQTT broker with a topic name of mq2_mqtt.

Listing 2


01 #include <ESP8266WiFi.h>
02 #include <PubSubClient.h>
04 // Update these with values suitable for your network.
05 const char* ssid = "your_ssid";
06 const char* password = "your_password";
07 const char* mqtt_server = "";
08 const char* mqtt_user = "admin1";
09 const char* mqtt_pass= "admin1";
11 const int mq2pin = A0; //the MQ2 analog input pin
13 WiFiClient espClient;
14 PubSubClient client(espClient);
16 void setup_wifi() {
17   // Connecting to a WiFi network
18   WiFi.begin(ssid, password);
19   while (WiFi.status() != WL_CONNECTED) {
20     delay(500);
21     Serial.print(".");
22   }
23   Serial.println("WiFi connected");
24   Serial.println("IP address: ");
25   Serial.println(WiFi.localIP());
26 }
28 void reconnect() {
29   // Loop until we're reconnected
30   Serial.println("In reconnect...");
31   while (!client.connected()) {
32     Serial.print("Attempting MQTT connection...");
33     // Attempt to connect
34     if (client.connect("Arduino_Gas", mqtt_user, mqtt_pass)) {
35       Serial.println("connected");
36     } else {
37       Serial.print("failed, rc=");
38       Serial.print(client.state());
39       Serial.println(" try again in 5 seconds");
40       delay(5000);
41     }
42   }
43 }
45 void setup() {
46   Serial.begin(9600);
47   setup_wifi();
48   client.setServer(mqtt_server, 1883);
49 }
51 void loop() {
52   char msg[8];
53   if (!client.connected()) {
54     reconnect();
55   }
57   sprintf(msg,"%i",analogRead(mq2pin));
58   client.publish("mq2_mqtt", msg);
59   Serial.print("MQ2 gas value:");
60   Serial.println(msg);
62   delay(5000);
63 }

Once the MQTT value is published, any MQTT client can subscribe to it. Listing 3 is a Python MQTT subscribe example.

Listing 3

01 import paho.mqtt.client as mqtt
03 def on_message(client, userdata, message):
04     print ("Message received: "  + message.payload)
06 client = mqtt.Client()
07 client.username_pw_set("admin1", password='admin1')
08 client.connect("", 1883)
10 client.on_message = on_message       #attach function to callback
12 client.subscribe("mq2_mqtt")
13 client.loop_forever()                 #start the loop

Read MQTT Messages Using AMQP

MQTT clients can subscribe to MQTT messages directly, or it's possible to configure RabbitMQ to have the MQTT data accessible using AMQP. The routing of MQTT messages to AMQP queues is done using the amp.topic direct exchange (Figure 7).

Figure 7: Routing MQTT messages to AMQP queues.

To configure RabbitMQ to forward MQTT, the following steps are required:

  1. Create a new RabbitMQ queue.
  2. Create a binding between the MQTT exchange and the queue.

These two steps can be done in a number of ways: manually, in the RabbitMQ config file, using the rabbitmqadmin command-line tool, or via a program. To use rabbitmqadmin enter:

./rabbitmqadmin declare queue name=mq2_amqp durable=true
./rabbitmqadmin declare binding source=amq.topic \
  destination_type=queue destination=mq2_amqp routing_key=mq2_mqtt

rabbitmqadmin can also be used to check the queue for messages (Listing 4).

Listing 4

Checking the Queue for Messages

01 ./rabbitmqadmin get queue=mq2_amqp
02 +-------------+-----------+---------------+---------+
03 | routing_key | exchange  | message_count | payload |
04 +-------------+-----------+---------------+---------+
05 | mq2_mqtt    | amq.topic | 77            | 157     |
06 +-------------+-----------+---------------+---------+

Buy this article as PDF

Express-Checkout as PDF
Price $2.95
(incl. VAT)

Buy Linux Magazine

Get it on Google Play

US / Canada

Get it on Google Play

UK / Australia

Related content

  • WiFi Thermo-Hygrometer

    A WiFi sensor monitors indoor humidity and temperature and a Node-RED dashboard reports the results, helping you to maintain a pleasant environment.

  • Sensu Monitoring Software

    When the Twitter hashtag #monitoringsucks gained popularity a few years ago, it seemed as though monitoring software had reached its limits and stagnated. Will Sensu launch a new golden age?

  • Logstash

    When something goes wrong on a system, the logfile is the first place to look for troubleshooting clues. Logstash, a log server with built-in analysis tools, consolidates logs from many servers and even makes the data searchable.

  • Adafruit IO API

    The Adafruit IO API offers a convenient means for network-ready sensors and other components.

  • Monitoring Old Devices

    Create monitoring dashboards with SSH, command-line tools, and Node-RED.

comments powered by Disqus

Direct Download

Read full article as PDF:

Price $2.95