Enterprise Integration Zone is brought to you in partnership with:

Ross founded the open source Mule® project in 2003. Frustrated by integration "donkey work," he set out to create a new platform that emphasized ease of development and re-use of components. He started the Mule project to bring a modern approach, one of assembly, rather than repetitive coding, to developers worldwide. He is now the Founder and CTO of Mulesoft. Ross is a DZone MVB and is not an employee of DZone and has posted 66 posts at DZone. You can read more from them at their website. View Full User Profile

Lightweight publish/subscribe with Mule and MQTT

06.10.2013
| 3265 views |
  • submit to reddit

Originally authored by David Dossot

If you think that telemetry should only be dealt with by Mr. Chekov, think again… When the “Internet of things” met publish/subscribe, the need for a lightweight messaging protocol became more acute. And this is when the MQ Telemetry Transport ( in acronym parlance) came into play. From its own words, this connectivity protocol “was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium“.

In a world where everything will eventually have an IP address, messages will be constantly flowing between devices, message brokers and service providers. And when there are messages to massage, who you gonna call? Mule ESB of course! With this new MQTT Connector, built on Dan Miller‘s solid ground work, the Internet of things, which had its rabbit, will now have its Mule!

In this blog we will look at an example where Mule is used to integrate conference booth QR Code scanners with an MQTT broker. Why use MQTT? If you’ve ever been to a technical conference and expo, you’ve surely tasted how abysmal the quality of a WIFI network can be. Beside confirming that the shoemaker’s children always go barefoot, it is also an encouragement for using a messaging protocol that’s both resilient and modest in its network needs. With this said, let’s first start by looking at the overall architecture diagram.

Architecture

The main components in this architecture are:

  • A laptop webcam that captures QR Codes (using the excellent ZBar toolkit) and writes their content into files,
  • Mule running on the same laptop that controls the webcam,
  • A single Mule flow that picks up the generated files, transforms their CSV content into JSON and publishes the captured data to an MQTT broker,
  • Any number of remote systems subscribed to the relevant MQTT topic in order to be informed of all attendee QR Code scans that occur.

A direct connection between ZBar and Mule was possible, for example using socat and a TCP/IP channel, but the file approach has been preferred because it is more robust, decouples the two applications and doesn’t introduce any adverse latency.

Implementation

To get started with the MQTT connector we need to either add it as a Maven dependency in our project’s POM, as explained in the installation instructions available online, or add it to Mule Studio, via the connector’s update site.

We also need to install ZBar to control the laptop’s webcam: if it’s not available in our operating system’s package manager, we’ll have to follow the online installation instructions.

Let’s peek into the very simple Mule configuration that implements this scenario. It is fully shown below, without the XML schema namespace declarations.

<configuration>
    <expression-language>
        <global-functions file="qrcode_data_to_map.mel" />
    </expression-language>
</configuration>

<mqtt:config name="mqttClient"
    brokerServerUri="${mqttBrokerServerUri}"
    clientId="booth-reader-${boothId}">
    <reconnect-forever frequency="5000" />
</mqtt:config>

<flow name="fileToMqttBridge">
    <file:inbound-endpoint path="/tmp/mule/in" />

    <object-to-string-transformer />

    <!-- extract attendee info and transform to JSON payload -->
    <expression-transformer
        expression="#[extractAttendeeData(message.payload, '${boothId}')]" />
    <json:object-to-json-transformer />

    <mqtt:publish topicName="scans/booths" />

    <logger level="INFO"
        message="Booth ${boothId} has scanned attendee #[attendeeId]" />
</flow>

The file inbound endpoint takes care of polling the file system for readings coming from ZBar, while the MQTT publisher sends the transformed data to the topic on the remote broker. Notice that the MQTT connector has been configured with a “reconnect-forever” strategy to ensure that Mule reconnects to the remote broker in case of issues.

The bulk of the work is done by the expression transformer, whose MEL code is shown below:

import org.mule.util.StringUtils;

def extractAttendeeData(payload, boothId) {
    csv = StringUtils.strip(StringUtils.substringAfter(payload, 'QR-Code:'));
    data = StringUtils.splitAndTrim(csv, ',');
    flowVars.attendeeId = data[0];
    [
       'boothId':      boothId,
       'attendeeId' :  data[0],
       'fullName' :    data[1],
       'companyName' : data[2],
       'email' :       data[3]
    ];
}

It extracts the QR Code information, parses it and outputs a Map ready to be serialized as JSON. Notice that the unique booth ID is both used to create a unique MQTT client ID and is also injected in the message that’s published, so that it’s possible to know where an attendee’s badge was scanned. Mule’s File and MQTT connectors take care of the rest: they are respectively in charge of picking the files up and publishing messages to the “scans/booths” MQTT topic.

We then use a simple Python script to dump the QR code readings ZBar into individual files:

#!/usr/bin/python

import sys
import tempfile

while True:
    data = sys.stdin.readline()
    if data:
        f = tempfile.NamedTemporaryFile(dir='/tmp/mule/in', delete=False)
        f.write(data)
        f.close()
        print 'Wrote %s to %s' % (data, f.name)

We then start ZBar with the following command:

zbarcam --nodisplay | dump_sysin.py

With this in place, scanning the following QR Code:


leads to the following JSON message to be published on the ”scans/booths” MQTT topic:

{
    "attendeeId": "123456",
    "boothId": "E234",
    "email": "jqdoe@anon.com",
    "fullName": "John Q. Doe",
    "companyName": "Anonymous Inc."
}

Consumers

When this data is published on the MQTT topic, it is available to all the consumers that are registered to listen to this particular topic. This allows different unrelated applications to act on the fact that an attendee has scanned his or her QR Code at a booth. One of these applications could aggregate all the visits and send a summary to each booth owner, another could gamify the conference where this system is used and assign badges and prizes to the most active attendees, for example.

Since, the MQTT connector also supports listening to topics, these consuming applications could be powered by Mule too. But this is a story for another blog post…



Published at DZone with permission of Ross Mason, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)