A Story of MQTT 5.0

The MQTT protocol has been around since the late 90s when it was created to enable the monitoring of a long distance oil pipeline. It went through several iterations before landing on version 3.1, published by IBM.

The next step was standardisation, at the OASIS standards body. As anyone who has taken part in a standardisation committee will know, this process is necessarily bureaucratic and slow. To speed up adoption, the main imperative was minimising disruption to existing implementations, as set out in the Technical Committee (TC) charter.

As a result, wholesale changes to the MQTT 3.1 specification were not allowed in the 3.1.1 standard. This meant that many irritating flaws could not be fixed nor widely sought enhancements included. This is where MQTT 5.0 comes in. While we still wanted to minimise disruption (no-one wanted to repeat the experiences of say AMQP 0.9 to 1.0), we also wanted to address the MQTT wish-list as far as possible so that major changes would not be needed for a long time to come. Whether we succeeded in that aim, time will tell.

To help us make sense of the multitude of items on that wish list, as I wrote in September 2016, they were grouped into four Big Ideas:

  • Improved error reporting
  • Extensible metadata
  • Scalability and large scale systems
  • Resource Constrained Clients and Performance Improvements

At that time, many of the solutions were not decided upon, but now with the availability of Committee Specification 01, I can write about the details. We are in the final stages of the standardisation process for 5.0. We hope to complete the process of rubber stamping in the next few months, and expect no substantive changes during that time.

Improved Error Reporting

Negative responses, or nacks, were the biggest omission from earlier versions of MQTT. If the client or server had a problem with the request or packet from the other end, the only recourse in many circumstances was to close the TCP connection. Connect packets were the exception to this: the connack always had a return code. MQTT 3.1.1 added a negative response code to subscribe requests because there was space available in the “granted QoS” field of the suback packet. Publish requests, however, were still not catered for. This is now remedied.

Reason Codes

As all ack packets now have reason codes, they have been consolidated into one set, which starts like this:

Reason codes are one byte. Values from 0 to 0x127 inclusive indicate successful outcomes, those from 0x128 to 0xFF unsuccessful. So as the subscribe response can have an error code:

msc {
    arcgradient="4";

    c [label="client"], b [label="broker"];

    c => b [label="connect"];
    b => c [label="connack(rc=0)"];

    c => b [label="subscribe"];
    b => c [label="suback(rc=0x80)"];

    ...;
}

so can the publish, when the QoS is greater than 0:

msc {
    arcgradient="4";

    c [label="client"], b [label="broker"];

    c => b [label="connect"];
    b => c [label="connack(rc=0)"];
    ...;
    c => b [label="publish, qos=1"];
    b => c [label="puback(rc=0x80)"];
    ...;
    c => b [label="publish, qos=2"];
    b => c [label="pubrec(rc=0x80)"];
    ...;
    c => b [label="publish, qos=2"];
    b => c [label="pubrec(rc=0x0)"];
    c => b [label="pubrel(rc=0x80)"];
    ...;
    c => b [label="publish, qos=2"];
    b => c [label="pubrec(rc=0x0)"];
    c => b [label="pubrel(rc=0x0)"];
    b => c [label="pubcomp(rc=0x80)"];
    ...;
}

For the QoS 2 exchange, it stops if any of the reason codes are 0x80 or above. This is a major improvement on previous versions of MQTT, where continuing with the exchange or terminating the connection were the only options.

Server initiated disconnect

In MQTT versions prior to 5.0, only the client could send a disconnect packet. This meant that in any case where the server wanted to end the conversation with a client, there was no option but to just terminate the TCP connection. A common case is when the server shuts down – there is no error in the interaction between broker and client, but the client has no idea what’s happening. In MQTT 5.0, the server can send a disconnect packet with a “Server shutting down” reason code:

msc {
    arcgradient="4";

    c [label="client"], b [label="broker"];

    c => b [label="connect"];
    b => c [label="connack(rc=0)"];
    ...;
    c => b [label="publish, qos=1"];
    b => c [label="disconnect(rc=139)"];
}

In this case, the client might wait for a while before attempting a reconnect, knowing that the server might not be available for a while.

Extensible metadata

The big change here is in addition to reason codes, each packet (apart from pings) can have properties. This is an extract of the full list:

Properties can be used to add extra information to responses, such as a reason string, or extra parameters to requests. A lot of the rest of the changes rely on properties because now we had a mechanism for adding that extra information to packets, we had to use it!

Request/response

The new request/response capability makes good use of properties. The requester subscribes to the topic it expects to receive responses on, then sets the value of the “Response Topic” property to that topic name. The responder simply uses that property to set the topic name for its response.

msc {
    arcgradient="4";

    c1 [label="client1"], b [label="broker"], c2 [label="client2"];

    c1 => b [label="subscribe, topic=resp"];
    b => c1 [label="suback"];

    c1 => b [label="publish, response_topic=resp"];
    b => c2 [label="publish, response_topic=resp"];

    ...;

    c2 => b [label="publish, topic=resp"];
    b => c1 [label="publish, topic=resp"];
}

The “Correlation Data” property can be used to set an id for each request, so that replies can be matched to requests by the requester.

Payload format indicator

There were fairly contentious discussions about how much flexibility there should be in payload format settings. Some were in favour of user definable payload formats. Others felt that if people could define their own formats it was no better than the current position, unless some body kept an approved list of format indicators and their meanings. That seemed a step too far for MQTT. MIME types were discussed, but the final approach is minimalistic – just two values: binary, as 3.1.1, or UTF-8 data.

Enhancements for Scalability

Improved error reporting helps scalability because exchanges between servers and clients become more efficient. Properties are again crucial to the following functions.

Simplified session state

One of the other big irritations with 3.1.1, along with the lack of nacks for publish commands, is the behaviour of the “clean session” flag. In earlier versions of MQTT, this started out as the “clean start” flag, where the session state was only cleaned up at the start of a session, not at the end. This was good for clients, because it meant you could ensure a clean starting point, and leave the session around in case you needed to reconnect. Not so good for servers, because clients would tend to leave the state lying around for ever.

Later on, this flag was changed to “clean session”, cleaning the session state both at the start and end of the session. Good for servers. For clients, if they want to ensure a clean slate to start with, but then want to have session state saved, they have to connect twice:

msc {
    arcgradient="4";

    c [label="client"], b [label="broker"];

    c => b [label="connect cleansession=true"];
    b => c [label="connack"];
    c => b [label="disconnect"];
    ...;
    c => b [label="connect cleansession=false"];
}

We knew we should fix this situation once and for all. The “clean session” flag becomes “clean start” once more – session state is only cleaned up at the start of the session. Then there is the “session expiry interval” property, a four-byte integer value in seconds which defaults to zero if omitted. If it is set to 0xFFFFFFFF (UINT_MAX), the session does not expire. To accomplish the above scenario:

msc {
    arcgradient="4", wordwraparcs=on;

    c [label="client"], b [label="broker"];

    c => b [label="connect cleanstart=true, expiry_interval=0xFFFFFFFF"];
    ...;
}

The MQTT-SN “offline keep alive” scenario is also catered for. By setting the expiry interval to a suitable non-zero value, the client can ensure that the session state is saved as long as it reconnects regularly. If the client disappears entirely, the session state will be cleaned up at some point. Both clients and servers are happy.

Shared subscriptions

To allow load balancing of high throughput topics, the concept of shared subscriptions is introduced to MQTT. Messages on these topics are sent to one of a group of subscribers rather than to them all. The subscriber indicates that the subscription is shared simply by subscribing to a special topic pattern:


$share/{ShareName}/{filter}

where ShareName is the name of the shared subscription group, and filter is the usual topic filter used in the subscribe request.

Optional server capabilities

Some server functionality is expensive to implement at large scale. In MQTT 5.0, the server can advertise the limitations on the functionality it provides in the connack properties. Some examples:

Retain Available
are retained messages supported?
Maximum QoS
the maximum publish QoS the server will accept
Maximum Packet Size
the maximum packet size the server will accept
Receive maximum
the maximum number of concurrent QoS 1 and 2 message the server will handle

Resource Constrained Clients and Performance Improvements

Various features fall into this category, including some already described. Some further examples follow.

Nolocal subscriptions

Up until MQTT 5.0, the publisher of a message will receive that message back if it is subscribed to the same topic. People often find this out in their first experience of writing an MQTT application, when they implement a shared chat room. There is now a subscribe option noLocal which when set, indicates that the publishing application should not receive its own messages.

msc {
    arcgradient="4", wordwraparcs=on;

    c [label="client"], b [label="broker"];

    c => b [label="subscribe topic=a"];
    b => c [label="suback"];
    c => b [label="publish topic=a"];
    b => c [label="publish topic=a"];
    ...;
    c => b [label="subscribe topic=b, noLocal"];
    b => c [label="suback"];
    c => b [label="publish topic=b"];
    ...;
}

Retained message control

Options on the subscribe request have been added to:

  • 0 = Send retained messages at the time of the subscribe
  • 1 = Send retained messages at subscribe only if the subscription does not currently exist
  • 2 = Do not send retained messages at the time of the subscribe

This could help particularly with the implementation of MQTT bridges from one broker to another.

Topic aliases

This capability exists in MQTT-SN, to reduce the size of the publish packet when long topic names are used. The publish request allows a numeric topic alias to be specified, which can be used in subsequent publish packets. Topic aliases on the client and the server are independent of each other, in much the same way as packet ids are.

msc {
    arcgradient="4", wordwraparcs=on;

    c [label="client"], b [label="broker"];

    c => b [label="publish topic=long_name,alias=1"];
    c => b [label="publish alias=1"];
    ...;
    b => c [label="publish topic=server_long_name, alias=1"];
    b => c [label="publish alias=1"];
    ...;
}

Topic aliases only exist for the lifetime of a TCP connection.

Specifying client limitations

To help protect implementations on small devices, the client can specify its limitations using properties on the connect packet. Some examples:

Maximum Packet Size
the maximum packet size the client can accept
Receive maximum
the maximum number of concurrent QoS 1 and 2 message the client can handle

It is an administrative action or decision on the part of the server to decide what to do with messages that it receives bound for a client for which that message exceeds the constraints. This is not particularly different from 3.1.1 where the message would be sent anyway, and then the client might be forced to disconnect as its only recourse. At a minimum, the server should probably emit a warning log message.

Eclipse Paho Progress

A release of the Eclipse Paho project is planned for June 2018 with its first implementations of MQTT 5.0. I first implemented a broker to test against in the Paho test project. It combines 3.1.1 and 5.0 implementations, and has been used by James Sutton to implement the Java MQTT 5.0 support. It is used in Travis and AppVeyor continuous integration tests for the MQTT 5.0 branches. Example output when you start it up is shown below.

My own C clients, embedded and main are planned to have a June release. The MQTT 5.0 implementations continue in the embedded mqttv5 and mqttv5 branches. Please do give your feedback or thoughts on these implementations as they progress via the GitHub issues:

A New Fate for the Eclipse Paho “Test” Broker?

If you’re familiar with the Eclipse Paho test material, you’ll know that I wrote an MQTT 3.1.1 broker in Python with the purpose of providing a benchmark or oracle against which other implementations could be compared. Last year, I added support for the new MQTT V5.0 definition, so that client implementations of V5, including my own, could be easily tested. It has been used by Paho colleagues of mine, James Sutton and Allan Stockdill-Mander to develop MQTT V5 Java and Golang client libraries.

Then, I added the missing TLS support, so that it could satisfy testing requirements for secured connections. Adding TLS support in Python turned out to be really easy, much easier than I expected. I know how hard it is to add OpenSSL support in C, and I was prepared for at least some proportion of that effort, but there was none. I enabled multiple MQTT listeners to be configured, a la RSMB/Mosquitto. That meant the broker was comprehensive enough to be used for automated testing of existing client implementations. I used it to replace a remote Mosquitto broker in the automated Paho C client testing for Windows using AppVeyor, as installing Mosquitto on Windows is a little tricky, and it works fine in that capacity. All I had to do in the AppVeyor configuration file was to add these lines:


- cmd: git clone https://github.com/eclipse/paho.mqtt.testing.git
- cmd: cd paho.mqtt.testing\interoperability
- ps: Start-Process C:\Python36\python -ArgumentList 'startbroker.py -c client_testing.conf'

to get the broker running locally. WebSockets support was already included, so it means that each MQTT listener supports both 3.1.1 and V5, WebSockets and TLS. The 3.1.1 and V5 clients can communicate with each other (although this work is not complete and tested yet), both through normal publish commands and retained messages.

In a recent project at work I had added persistence to a Python project by using the ZODB object database, and that had turned out to be really easy too. These experiences got me to thinking that it should be easy to add MQTT-SN support as well, and create a broker with the capabilities that I had envisaged for RSMB before it was superseded by Mosquitto. Roger Light, the author of Mosquitto, has intended to add MQTT-SN support for a long time, but wanted to restructure the internals first. Due to other commitments, and some licensing obstacles, this has not yet come about, and with the advent of MQTT V5 which will have a higher priority, looks even further out.

A broker written in Python will not be as efficient as Mosquitto, nor is it particularly scalable as currently written because it uses Python’s threading model which does not take advantage of multiple cores. But as it is so easy to add new capabilities, it could well be one of the most complete MQTT implementations, soonest. In this way it would be complementary to Mosquitto.

I have now experimented with adding MQTT-SN support, a framework for bridges, and an HTTP listener which can provide APIs to query and update the broker state and configuration. We can add APIs to allow queries to return information about the currently connected clients, their subscriptions and messages for instance. The set of capabilities I envisage looks like this:

  • TCP listener supporting WebSockets, TLS, MQTT 3.1.1 and 5.0
  • UDP listener supporting MQTT-SN with DTLS, multicast
  • other potential MQTT-SN listeners: BLE, ZigBee
  • HTTP listener for state/configuration APIs. The broker behaviour for test purposes can potentially be reconfigured dynamically
  • TCP bridge capable supporting MQTT 3.1.1 and 5.0, WebSockets and TLS
  • UDP bridge supporting MQTT-SN with DTLS, multicast
  • HTTP bridge supporting webhooks?
  • dynamic bridge connections – adding/deleting bridges
  • variety of persistence options including ZODB

I think it’s obvious to state that with these capabilities it could serve as an MQTT-SN gateway for services that support MQTT, as a conversion mechanism between MQTT V3.1.1 and V5, and a dynamic flexible component of an MQTT network. I would still keep a focus on enabling MQTT testing, including for test suite generation. A lot of this work could be completed in the first half of 2018, other commitments depending.

This brings me on to my main motivation for writing this blog post. Perhaps this broker should have a life of its own, separate to Paho, as an Eclipse IoT project. It could quite well do the job as part of Paho, conversely it could have a higher profile and encourage more community participation if stood on its own. So, if you have an interest in seeing this broker as an Eclipse IoT standalone project, from the point of view of using it, or also contributing to it, please let me know – thanks.

What’s happening with RSMB?

Some people will be aware that I wrote a small MQTT broker in about 2008, made available on IBM’s alphaWorks website. It was called RSMB (Really Small Message Broker) when released, because the name we used first for it, Nanobroker, was already taken. Somewhat amusingly, an IBM website for RSMB still exists.

A year or two later, Roger Light asked Andy Stanford-Clark why RSMB wasn’t open source (not my decision), so Andy suggested Roger write his own. And that’s how Mosquitto started, as a drop in replacement for RSMB. When MQTT software was being contributed to the Eclipse foundation, IBM contributed a Java client and my C client to the Paho project, and Mosquitto was contributed as the broker. IBM did contribute the RSMB source to the Mosquitto project on my encouragement, to serve as a repository of potentially useful code, and because it had support for MQTT-SN.

So in my mind, Mosquitto became the official replacement for RSMB, and I expected RSMB to outlive its usefulness pretty quickly. As it happens, MQTT-SN support in Mosquitto has been on the back burner ever since, because Roger wanted to rebase the internals of Mosquitto on an event library before tackling it. Unfortunately, this ran into a number of issues, social and technical. I’m still hoping that it will happen.

But one of the alternative approaches to MQTT-SN support is now available in Paho (written by Tomoaki Yamaguchi) – a transparent gateway which converts MQTT-SN into MQTT. Transparent because it creates a new MQTT connection for each MQTT-SN client, so that the MQTT broker has visibility of those clients. RSMB acts as an aggregating gateway, where one MQTT bridge connection carries the traffic for all MQTT-SN clients, and the MQTT broker sees only one.

I do wonder if an MQTT-SN to MQTT gateway will in fact be a better solution because it may allow easier support of additional underlying transports. The gateway has UDP and XBee right now, others such as BLE and even serial could be useful.

Mosquitto in the meantime has added further capabilities such as TLS and WebSocket support, and many more. If there were a niche that Mosquitto has left open then I would be happy to support RSMB in that, but I don’t think there is. The combination of Mosquitto and the Paho transparent gateway will do a better job all round.

Where are we with MQTT-SN?

This question was posed to me recently, with the added observation that it seemed no progress had been made in the last two years.  From my perspective this isn’t quite true.  Although the specification is still maintained by IBM, no movement has been made to standardize it at OASIS like MQTT has been. That is not to say that the subject of MQTT-SN has not arisen in the MQTT OASIS Technical Committee (TC) discussions, it has. But amongst the numerous improvements that we wanted to make to MQTT 3.1.1, and an ambitious timescale — publishing the new version MQTT 5.0 this year (2017) — addressing non TCP networks in MQTT was put to one side.

There is a continual wavering in my mind as to the importance of MQTT-SN. On one hand, edge computing platforms are getting ever more powerful, on the other, low power consumption is very important for battery power. The powerful edge processors are likely to have TCP stacks for which MQTT is appropriate: but there are still use cases for which low power use is the crucial factor. UDP, Bluetooth Low Energy (BLE) are typical transports used in this case.

If there is enough interest in a standardized MQTT-SN, its features could be addressed by the MQTT OASIS TC. There are a number of options. Incorporating MQTT-SN or its features into the main MQTT specification is one. However MQTT currently requires a reliable underlying network transport (TCP). To change that assumption we would have to consider very carefully the implications, which could take a significant amount of time. It has been suggested that MQTT-SN could be dealt with as a committee note: I’m not sure what form that would take.

In the meantime, I’ve been trying to get more MQTT-SN capabilities added to the Eclipse® Paho project. After I contributed an initial MQTT-SN packet library a couple of years ago, and the RSMB broker with MQTT-SN support to the Eclipse Mosquitto project at its inception, things stalled for a while (but not for a lack of interest on my part). There are forks of RSMB with fixes to the MQTT-SN support, and various gateways and clients around. One of those, written by Tomoaki Yamaguchi, had gained some support and I forget exactly how it happened, but Tomoaki has now contributed an MQTT-SN transparent gateway to Paho.

Over the past months this gateway has matured nicely. I recently used it to replace the use of RSMB in this experiment of Benjamin Cabé’s and it worked well. If we add BLE support to the gateway, then the Node.js BLE to UDP MQTT-SN forwarder in that experiment would not be required either. The gateway is written with the intention of allowing other transports to be added, so this should be eminently feasible.

I do remember seeing an email or post or something recently describing some other MQTT-SN components written by the poster. But now I can’t find or remember where I saw it. If this wasn’t a dream, and you know of such a thing, or it was you, please do let me know.

Where do we go from here?  Two avenues: practical implementations and the specification.  After the MQTT 5.0 standard is published, we can see if there is any interest in the OASIS TC in pursuing the application of MQTT to non-TCP networks.  In the meantime, I will continue working on and encouraging MQTT-SN contributions of the current specification to Paho and elsewhere.

If you are interested in MQTT-SN and would like to see it considered by the OASIS MQTT TC, or make any other comments to the TC, then you can use the mailing list.  If you know of any other implementations or have any questions, or suggestions for Paho, then I will be happy to hear of them

Using the Eclipse Paho “Test” Broker to Help Test MQTT

I may not have finished all my goals for the test material in the Eclipse Paho project, as outlined in this blog post of mine, but some components are still useful in their current state.

Recently my IBM and Paho colleague, James Sutton, needed to check the behaviour of the Java and Android clients when receiving an error code in response to an MQTT subscribe request. This error code, returned in the MQTT suback packet, was introduced in the last, and current, version of MQTT, 3.1.1. It takes the form of an 0x80 value in the granted QoS (Quality of Service) field, for which only 0, 1 or 2 are valid values – the set of integers which QoS can take in MQTT.

Now you could fire up a broker like Eclipse Mosquitto and configure it to disallow a subscription to a certain topic. If you know the broker well enough, this may be quick and easy. It’s possible you might have to do some fishing around, and figure out whether that broker is actually returning 0x80 as you wanted.

So James turned to the Paho test broker (startbroker.py in this repo.) (You must use Python 3 to run it, not Python 2). As it stands, this broker will return 0x80 if you try to subscribe to the topic “test/nosubscribe” (see line 322 in MQTTBrokers.py). That’s pretty easy. If you checked out that file, you will notice another two topics: “test/QoS 1 only” and “test/QoS 0 only”, which will return granted QoSs of a maximum of 1 and 0 respectively. These behaviours can be hard to elicit out of a standard MQTT broker.

Ultimately I guess I should make the specific topics which these responses are attached to configurable, but they aren’t right now.

There are some other characteristics of this broker, which single it out as a “test” broker rather than a product:

  • the goal of the coding is clarity rather than performance. You can tell me whether I succeeded
  • there is no persistence: if you want to simulate stopping and restarting a broker, this broker can just remain running, and disconnect all clients
  • MQTT specification conformance statements are embedded in the code, so that when a test suite is run against the broker, it can tell you which statements were encountered, and which weren’t.
  • the broker has parameters to choose behaviours which can vary but still conform to the MQTT specification:
    1. whether to publish QoS 2 messages on PUBREL or not
    2. whether multiple matching subscriptions result in one publication, or more than one
    3. whether queued QoS 0 messages are dropped if the client is disconnected, or not
    4. whether zero_length_clientids are allowed

    it’s possible more might be added in the future.

The main missing feature of this broker is TLS – but it does have WebSocket support. TLS is on my to do list, as will be updates for the next version of MQTT, 5, which we are working on, and is tentatively scheduled for completion next year.

As outlined in the blog post, this broker is meant to help with broker testing as well, as an oracle.

For some more information, see the Eclipse Paho website for the test tools.

You can use issues on this project to ask for new features or identify bugs, or pull requests to offer your own contributions.

Paho 1.1 Released

Finally I have produced the remaining pieces, and am pleased to be able to say that Paho 1.1 is released.

What’s new:

1) Paolo Patierno’s .Net and WinRT MQTT client, M2Mqtt, version 4.0.0
2) An Android service – a wrapper around the existing Java client. Version 1.0.2
3) Embedded C and C++ MQTT client libraries, version 1.0.0

Updated versions of the existing client libraries:

1) Roger Light’s Python MQTT client, version 1.1
2) The full fat Posix/Windows/MacOS C client, updated to version 1.0.3
3) The Java MQTT client, updated to version 1.0.2
4) The JavaScript MQTT client, updated to version 1.0.1

The downloads page is updated, and the formal release documentation is available.

Thanks to everyone who has contributed, with code, bugs, suggestions or advice — anything 🙂

I’ve updated some essentials on the web site, including references to the MQTT 3.1.1 standard, but there are updates that are still needed. If you find anything, please let me know.

An Example of Using TLS with the Paho MQTT Embedded C++ Client

Understandably, I’ve been asked a few times whether the Paho embedded client library will work with TLS. It will, but the only platform where I’ve written the code to do it so far is on mbed mbed. The reason why it hasn’t been widely publicised is that it uses the CyaSSL TLS library, which is licensed under the GPL. This means binaries linked with CyaSSL also have to be GPL, rather than the Apache license usually used on mbed, or the Eclipse licenses used for Paho.

But I can show you the code I did write. Now the client library is structured to be portable to any network library without changing the core code. It seems I need to explain this better, and further, certainly on the main Paho website as well as this blog. (Soon, I promise.) The following module is one I wrote to create a network class for CyaSSL which can be used by the embedded client class, MQTTClient. The two core methods which are needed are read and write; these are called whenever the library needs data, or has data to send. This is a basic connect — no client certificates are involved, but should serve as a model for what can be done. Here is the complete module — skip afterwards to see how to use it.


#if !defined(MQTTSSL_H)
#define MQTTSSL_H

#include "MQTT_mbed.h"
#include "mbed.h"
#include "EthernetInterface.h"

#include 


static TCPSocketConnection mysock; 

static int SocketReceive(CYASSL* ssl, char *buf, int len, void *ctx)
{
    int rc = mysock.receive(buf, len);
    if (rc == -1)
        rc = -2;  // -2 is WANT_READ
    return rc;
}
 
static int SocketSend(CYASSL* ssl, char *buf, int len, void *ctx)
{
    int rc = mysock.send(buf, len);
    return rc;
}


class MQTTSSL
{
public:
    MQTTSSL() : eth()
    {
        ssl = 0;
        ctx = 0;
        
        eth.init();                          // Use DHCP
        eth.connect();
        
        CyaSSL_Init();
        CyaSSL_Debugging_ON();
        method = CyaTLSv1_2_client_method();
    }
    
    int connect(char* hostname, int port, int timeout=1000)
    {
        int rc = -1;
        
        /* Initialize CyaSSL Context */
        if ( (ctx = CyaSSL_CTX_new(method)) == NULL)
        {
            WARN("unable to get ctx");
            goto exit;
        }
        CyaSSL_CTX_set_verify(ctx, SSL_VERIFY_NONE, 0);
        CyaSSL_SetIORecv(ctx, SocketReceive); 
        CyaSSL_SetIOSend(ctx, SocketSend);
        
        mysock.set_blocking(false, timeout);    
        if ( (rc = mysock.connect(hostname, port)) != 0)
            goto exit;
        
        if ( (ssl = CyaSSL_new(ctx)) == NULL)
        {
            ERROR("unable to get SSL object");
            rc = -1; goto exit;
        }   
        CyaSSL_set_using_nonblock(ssl, 1);
        if ( (rc = CyaSSL_connect(ssl)) != SSL_SUCCESS)
        {    
            rc = CyaSSL_get_error(ssl, 0);
            WARN("err = %d, %s\n", rc, CyaSSL_ERR_error_string(rc, "\n"));
            WARN("SSL Connection Error\n");
            rc = -1;
        }
        else
        {
            LOG("SSL Connected\n") ;
            rc = 0;
        }
    exit:
        return rc;
    }

    int read(unsigned char* buffer, int len, int timeout)
    {
        int rc = 0;
                
        mysock.set_blocking(false, timeout);  
        rc = CyaSSL_read(ssl, buffer, len);
        DEBUG("called CyaSSL_read len %d rc %d\n", len, rc);
        return rc;
    }
    
    int write(unsigned char* buffer, int len, int timeout)
    {
        int rc = 0;
        mysock.set_blocking(false, timeout);  
        rc = CyaSSL_write(ssl, buffer, len);
        DEBUG("called CyaSSL_write len %d rc %d\n", len, rc);
        return rc;
    }
    
    int disconnect()
    {
        CyaSSL_free(ssl);
        int rc = mysock.close();
 
        CyaSSL_CTX_free(ctx) ;
        eth.disconnect();
        return rc;
    }
    
    EthernetInterface& getEth()
    {
        return eth;
    }
    
private:

    EthernetInterface eth;

    CYASSL_METHOD*  method;
    CYASSL_CTX*     ctx;
    CYASSL*         ssl;
    
};

#endif

To use it, you need to create an instance of MQTTSSL, and pass the class as a template parameter when you create an MQTT client instance.


MQTTSSL ipstack;
MQTT::Client client(ipstack);

Now follows the sort of function you might use to connect. You have to make the network connection first, before calling the MQTT connect.


int connect(MQTT::Client* client, MQTTSSL* ipstack)
{   
    const char* iot_ibm = ".messaging.internetofthings.ibmcloud.com";
    
    // Network connect first
    char hostname[strlen(org) + strlen(iot_ibm) + 1];
    sprintf(hostname, "%s%s", org, iot_ibm);
    int rc = ipstack->connect(hostname, IBM_IOT_PORT);
    if (rc != 0)
        return rc;
     
    // Construct clientId - d:org:type:id
    char clientId[strlen(org) + strlen(type) + strlen(id) + 5];
    sprintf(clientId, "d:%s:%s:%s", org, type, id);
    
    // Now MQTT Connect
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.clientID.cstring = clientId;    
    if ( (rc = client->connect(&data)) == 0) 
        displayMessage("Connected");
    return rc;
}

If this is successful, you can now make other MQTT calls, like subscribe and publish.


if ( (rc = client.subscribe("iot-2/cmd/+/fmt/json", MQTT::QOS1, messageArrived)) != 0)
           WARN("rc from MQTT subscribe is %d\n", rc); 

MQTT::Message message;
char* pubTopic = "iot-2/evt/status/fmt/json";
            
char buf[250];
sprintf(buf,
     "{\"d\":{\"myName\":\"IoT mbed\",\"accelX\":%0.4f,\"accelY\":%0.4f,\"accelZ\":%0.4f,\"temp\":%0.4f,\"joystick\":\"%s\",\"potentiometer1\":%0.4f,\"potentiometer2\":%0.4f}}",
            MMA.x(), MMA.y(), MMA.z(), sensor.temp(), joystickPos, ain1.read(), ain2.read());
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf);
client.publish(pubTopic, &message);

I will write this up more fully soon. With any luck, this has also given you an idea of what needs to be done to port the client to a different network API.

Paho Embedded C++ MQTT Client on mbed and for Arduino

I’ve written previously about the embedded client libraries in Paho. There’s
MQTTPacket, and then the start of the design for the higher level APIs.

I started the first C++ API in ARM mbed as IBM and ARM were working together on an Internet of Things (IoT) kit. The ARM mbed online tools make it very easy to build and load applications onto an mbed-enabled micro controller. Just download the built application, drag to the USB-connected filesystem, then reset the micro controller. Easy as that!

To fit into mbed API conventions, the API:

  • is written in C++
  • uses templates
  • does not use the STL (uses too much resources)
  • is blocking, for ease of use

and uses no dynamic memory allocations. It’s called MQTTClient, and you can read the API description here.

The definitive source for the library is still in Paho, but both MQTTPacket and MQTTClient are also available directly in the mbed online environment. You can find them in the code for the MQTT team along with a sample application HelloMQTT, which uses the mbed Ethernet library.

If you want to use these APIs to connect to the IBM IoT service with MQTT, you can find applications that allow you to do just that in the IBM IoT team code library.

It’s easy to port the MQTTClient library to other networking libraries and other operating systems. So far, we’ve done that for Linux and Arduino.  The Linux port makes it easy to build and test the code in the normal Eclipse build environments.  The Arduino port was started to see if it would work.  It does, and the resulting API is actually quite close to the Arduino conventions too.

The prebuilt Arduino client library is available here. After downloading it, in the Arduino IDE use Sketch -> Import Library… -> Add Library… selecting the downloaded zip file. A sample sketch demonstrating the client is included in the download. I intend to add an entry on the Arduino libraries page sometime soon.

We’ll make ports for other operating systems available as we create them.  But if anyone gets around to a port before we do, we’d love that to be contributed to Paho.  Operating systems such as Contiki, TinyOS, FreeRTOS and RIOT.  If you have a favourite embedded OS that I’ve not mentioned, please suggest it.

Porting the Paho synchronous embedded C++ client

The MQTT library on mbed.org also now exists in Eclipse Paho. I intend Paho to be the master copy.

There are two APIs in this library, the largely complete synchronous API in MQTTClient.h, and the unfinished asynchronous API in MQTTAsync.h. In this post I’ll be talking about the synchronous API. I have discussed the major design points previous posts, now I’ll describe how to use the API. The documentation page on mbed can be found here.

The API implementation is intended to avoid system calls to allow it to be easily ported. Its function is limited purely to MQTT rather than networking setup for instance. This does mean that there is some work to use the API on a new OS, but we plan to collect examples in Paho for as many OSes as we can — contributions welcome!

The API is defined as a class template with these parameters:


template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE = 100, int MAX_MESSAGE_HANDLERS = 5>

There are two class parameters, Network and Timer which are needed to provide platform specific implementations of sending and receiving data from the network, and timing, respectively. The third and fourth parameters are limits on the maximum MQTT packet size to be handled and the number of message handling callbacks. For each subscription, a message handling callback is needed, so you can think of this second limit as the maximum number of subscriptions you can have active at one time. Increasing these limits will increase the storage used when allocating the MQTT client object.

The file mbed.h contains an mbed specific implmentation of the Timer class. The file MQTTSocket.h contains an mbed specific network implementation for the Ethernet interface. The only requirements for the network class that you need to supply are the two methods:


int read(unsigned char* buffer, int len, int timeout);
int write(unsigned char* buffer, int len, int timeout);

Where

buffer
is the location to write data to, or read data from
len

is the number of bytes of that datatimeoutis the maximum number of milliseconds for the operation to complete.

The timer class counts down from a value to zero. I chose this way because it is always used for timeouts, so it checks to see whether the time has expired:


bool expired();                 // has the time expired?
void countdown_ms(int ms);      // start the timer for ms milliseconds
void countdown(int seconds);    // start the timer in seconds
int left_ms();                  // return the time left in milliseconds

Looking at the mbed example HelloMQTT, you see we have to connect the network interface before calling the MQTT connect:


MQTTEthernet ipstack = MQTTEthernet();
              
MQTT::Client<MQTTEthernet, Countdown> client = MQTT::Client<MQTTEthernet, Countdown>(ipstack);

int rc = ipstack.connect("m2m.eclipse.org", 1883);
if (rc != 0)
   lcd.printf("rc from TCP connect is %d\n", rc);
 
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
data.MQTTVersion = 3;
data.clientID.cstring = "mbed-sample";
data.username.cstring = "testuser";
data.password.cstring = "testpassword";
if ( (rc = client.connect(&data)) != 0)
   lcd.printf("rc from MQTT connect is %d\n", rc);

and then we can call subscribe, publish, unsubscribe and disconnect methods on the client in the usual sort of way for MQTT APIs.

Examples of how to use this API on Linux is in this
directory on Paho, in the hello.cpp and stdoutsub.cpp programs. The API is already running on Arduino, for which we’ll add an example, and for other platforms and operating systems later.

Portable MQTT C/C++ clients for resource-limited platforms

When I started writing the Paho MQTT C client as long ago as 2008, it looked to me like the world of embedded operating systems was converging on Linux and Windows variants. This was based on my (limited) experience with embedded systems such as the Arcom Viper and various WinCE hand held devices. Consequently I wrote the Paho MQTT C client using Posix and Windows system APIs.

Now that decision doesn’t look so appropriate. There is a proliferation of cheap and small embedded platforms – some can run Linux, some can’t. In my last two posts I starting describing a new, more portable MQTT C client at its simplest. That first layer simply takes C data and serializes it into a buffer ready for sending across the network using any network APIs you choose:


len = MQTTSerialize_connect(buf, buflen, &data); /* 1 */
len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); /* 2 */
rc = write(mysock, buf, len);

– there are corresponding API calls for deserializing buffers received from the network into C data structures.

API for mbed

This works fine, but I was also asked to write an API that would work well in the mbed world (The mbed development platform is the fastest way to create products based on ARM microcontrollers). C++ is the standard language here, but non-Posix networking APIs. What I’ve come up with so far (after some discussion) is an API that has an outline like this:


template<class Network, class Timer, class Thread, class Mutex>
class Client
{
public:    
    /* connectionLost and default messageArrived callbacks to be added */
    struct Result
    {
        /* success or failure result data */
        Client<Network, Timer, Thread, Mutex>* client;
        int connack_rc;
    };
    typedef void (*resultHandler)(Result*); 

    Client(Network* network, const Limits limits = Limits()); 

    int connect(MQTTPacket_connectData* options = 0, resultHandler fn = 0);     
    int publish(const char* topic, Message* message, resultHandler rh = 0);
    int subscribe(const char* topicFilter, enum QoS qos, messageHandler mh, resultHandler rh = 0);
    int unsubscribe(const char* topicFilter, resultHandler rh = 0);
    int disconnect(int timeout, resultHandler rh = 0);

    void yield(int timeout);
}

This API will look familiar to anyone who is acquainted with the Paho MQTT APIs for C and Java. The resultHandler parameters are optional callback functions which give the outcome of the standard calls. If these are not set, then the MQTT operation calls will block until complete, using no background thread. If resultHandler callback functions are set, then a background thread will be started and will handle MQTT networking while the application can continue with other business.

I am considering splitting the API into two completely separate units, one blocking and single-threaded, the second non-blocking and multi-threaded. This would make the intended use clearer, and reduce the size of both APIs (resultHandlers would be unnecessary in the single-threaded API for instance). Other points to note:

  • C++ Standard Template Library (STL) not used – too heavyweight
  • messageHandler callback per subscription (see discussion) – there will also be a default messageHandler
  • system APIs for networking, timing and threading passed in as template parameters (for the synchronous API threading and mutex classes are not needed)
  • heap memory allocation minimized (I hope to avoid it altogether)
  • memory use limited at object construction time by the limits parameter (see below)
  • network connection handled outside of this API
  • persistence, if/when added, as another template parameter

The limits parameter looks like this:

typedef struct limits
{
    int MAX_MQTT_PACKET_SIZE;       // two buffers, for sending and receiving, will be allocated of this size
    int MAX_MESSAGE_HANDLERS;       // each subscription requires a message handler
    int MAX_CONCURRENT_OPERATIONS;  // each command which runs concurrently can have a result handler, when we are in multi-threaded mode
    int command_timeout;            // how long to wait for any command to complete, in seconds
        
    limits()
    {
        MAX_MQTT_PACKET_SIZE = 100;
        MAX_MESSAGE_HANDLERS = 5;
        MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode
        command_timeout = 30;
    }
} Limits;

Here are example programs to use this API on mbed and Linux. These demonstrate the portability of the API – it’s not just for mbed, or for one networking API.

I’ve not finished this API yet, and it will be pulled back into Paho.

Three Layers

I’ve been thinking that a third layer supplying offline message buffering would complete the picture. (I’m hoping that I will never have to write another MQTT C client API after this effort 🙂 ) This simply means the ability to send and receive messages from the API when there is no connection to the MQTT server. This feature is frequently asked for, but does increase the internal complexity of an API and thus its memory requirements. Some of the questions that arise when considering a buffered client API are:

  • what if messages are “sent” by the application, but the client library never connects successfully?
  • how many messages should we buffer? what size?
  • should the messages be sent/received when the application is not running?
  • when and how often should the API try to (re-)connect to the server?
  • how should buffered messages be stored? for how long?

There are reasonable answers to these questions which will make such an API suitable for embedded platforms, and certainly for a portable C/C++ MQTT API. The family of MQTT C/C++ APIs for portability/embedded systems would then look like this:

  1. serialization/deserialization API – currently C only
  2. blocking and non-blocking unbuffered MQTT client in a style similar to current C and Java APIs, but portable across system libraries – currently C++ only (on top of library 1)
  3. buffered MQTT API, portable across system libraries (on top of library 2).

Whether each of these layers should be in both C and C++, I’m not sure. I think it depends on the demand. I am relishing using C++ again (without STL) and am reluctant to go back to C for layers 2 and 3 unless there’s a real need.

Comments or suggestions on any of this are welcome.