The new MQTT V5 API for the Eclipse Paho C client

I’ve started to write the new MQTT version 5.0 support for the Eclipse Paho C clients. As I’ve reached the stage where some very simple tests are running, I thought this was a good time to try to elicit some feedback. I’ve had to make some compromises in the API, balancing the minimization of disruption to application programs with the natural incorporation the new MQTT 5.0 capabilities.

If you’ve used the main Paho C client, you’ll know it comes in two flavours:

MQTTClient
The synchronous version – intended to be easy to get started. Most calls block, waiting for a response from the server. This was the first API, and the only one when I created it. I modelled it after the existing Java API for consistency.
MQTTAsync
The asynchronous version – (almost) entirely non-blocking. Responses are reported in callback functions. This was a response to the need to run in asynchronous environments such as iOS and Windows.

The mqttv5 branch in the Github repo contains all the MQTT V5 updates to date. The test15 and test45 test programs have the first tests for MQTTClient and MQTTAsync libraries respectively. (At the moment, only test1 in each file has been migrated to MQTT V5, so only look at those).

MQTTClient for V5

First we have to create the client object with MQTTClient_create – this stays the same.


rc = MQTTClient_create(&client, options.connection,
 "single_threaded_test", MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);

Next we must connect to the server.  As outlined in my previous post the main difference in MQTT V5 packets is the addition of the properties section, and for acknowledgements and disconnect, a reason code byte. Properties exist on both requests and their acknowledgements.

The connect call thus adds a properties object, for the connect request itself, plus the will message, and as part of the response. The MQTT V3 code:


MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

int rc = MQTTClient_connect(client, &conn_opts))

becomes:


MQTTClient_connectOptions opts = MQTTClient_connectOptions_initializer;
MQTTProperties props = MQTTProperties_initializer;
MQTTProperties willProps = MQTTProperties_initializer;
MQTTResponse response = {SUCCESS, NULL};

response = MQTTClient_connect5(c, &opts, &props, &willProps);

The property and will property fields can be NULL if not required. The properties structure is manipulated with the add, free and copy functions. To add properties:


property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 30;
MQTTProperties_add(&props, &property);

and


property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);

Memory is allocated in the process of adding a property to the properties structure, so when you’ve finished with it this must be freed:


MQTTProperties_free(response.properties);

The response structure looks like this:


typedef struct MQTTResponse
{
  enum MQTTReasonCodes reasonCode;
  MQTTProperties* properties; /* optional */
} MQTTResponse;

I’m not particularly happy with the name of the function, MQTTClient_connect5, but it does have the benefit of being short with a minimal change of name. Other options I considered were:

  • MQTT5Client_connect
  • MQTTClient_connectWithProperties
  • MQTTClient_connectExt

I quite like the “connectWithProperties” option, but it didn’t get much approval from the audience I’ve questioned so far — it also doesn’t reflect all the parameters required.

The other MQTT functions have similar changes. An MQTTResponse return structure instead of a single int, and an extra input parameter for properties. For the publishMessage function, the properties are added to the MQTTClient_message structure:


MQTTClient_message pubmsg = MQTTClient_message_initializer;

property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&pubmsg.properties, &property);

response = MQTTClient_publishMessage5(c, test_topic, &pubmsg, &dt);

And for the subscribe function, there are added subscribe-specific options:


MQTTSubscribe_options subopts = MQTTSubscribe_options_initializer;

subopts.retainAsPublished = 1;
subopts.noLocal = 0;
subopts.retainHandling = 0; /* 0, 1 or 2*/

response = MQTTClient_subscribe5(client, test_topic, subsqos, &subopts, &props);

Finally the disconnect function has both input properties and reason code:


enum MQTTReasonCodes reasonCode = SUCCESS;

rc = MQTTClient_disconnect5(client, 0, reasonCode, &props);

At the moment, the return from disconnect is still just an integer, as there is no MQTT response to a disconnect packet other than closing the connection.

I haven’t implemented the new MQTT V5 auth exchange packets yet, but I envisage this callback and call:



MQTTResponse MQTTClient_authArrived(void* context, enum MQTTReasonCodes rc, MQTTProperties props);

response = MQTTClient_auth(client, reasonCode, &props);

The callback will be used to implement an extended auth exchange initiated by the server, the function when the exchange is started by the client.

MQTTAsync for V5

Most of the changes to the MQTTAsync API are in parameter structures and in callback signatures rather than the API signatures themselves. The “create” and “setCallbacks” calls remain the same. V5 properties for both the connect and will messages are added to the existing connect options along with new callback signatures for success and failure:


MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
opts.onSuccess5 = test1_onConnect;
opts.onFailure = NULL;
opts.context = c;

property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 30;
MQTTProperties_add(&props, &property);

property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);

opts.connectProperties = &props;
opts.willProperties = &willProps;

rc = MQTTAsync_connect(c, &opts);

The return from the connect call remains an integer, as the expanded response information is made available in the callbacks in the MQTTAsync_successData5 structure:


typedef struct
{
	...
	enum MQTTReasonCodes reasonCode; /* MQTT V5 reason code returned */
	MQTTProperties props; /* MQTT V5 properties returned, if any */
	/** A union of the different values that can be returned for subscribe, unsubscribe and publish. */
	union
	{
		...
		/* For connect, the server connected to, MQTT version used, and sessionPresent flag */
		struct
		{
			char* serverURI;
			int MQTTVersion;
			int sessionPresent;
		} connect;
		...
	} alt;
} MQTTAsync_successData5;

In the test, the contents of the properties returned by the connack are logged in the success callback:


void test1_onConnect(void* context, MQTTAsync_successData5* response)
{
	...
	MyLog(LOGA_INFO, "Connack properties:");
	logProperties(&response->props);
	...
}

In subscribe, unsubscribe and send (publish) calls, the response options structure contains the properties as well as the new success and callback function pointers. As the options are now not just for responses, there is a synonym for the structure, called callOptions:


MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
MQTTProperty property;
MQTTProperties props = MQTTProperties_initializer;

property.identifier = USER_PROPERTY;
property.value.data.data = "test user property";
property.value.data.len = strlen(property.value.data.data);
property.value.value.data = "test user property value";
property.value.value.len = strlen(property.value.value.data);
MQTTProperties_add(&props, &property);
opts.properties = props;

pubmsg.payload = "a much longer message that we can shorten to the extent that we need to payload up to 11";
pubmsg.payloadlen = 11;
pubmsg.qos = 2;
pubmsg.retained = 0;
rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
MQTTProperties_free(&props);

The call options structure also includes the new V5 subscribe options:


MQTTAsync_callOptions opts = MQTTAsync_callOptions_initializer;
opts.properties = props;

opts.subscribe_options.retainAsPublished = 1;
rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
MQTTProperties_free(&props);

and for disconnect, similarly as for connect, the disconnectOptions structure is extended:


MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
MQTTProperty property;
int rc;

opts.onSuccess = test1_onDisconnect;
opts.context = c;
opts.reasonCode = UNSPECIFIED_ERROR;

property.identifier = SESSION_EXPIRY_INTERVAL;
property.value.integer4 = 0;
MQTTProperties_add(&opts.properties, &property);

rc = MQTTAsync_disconnect(c, &opts);
MQTTProperties_free(&opts.properties);

As for the MQTTClient API I’ve not added the AUTH packet capabilities yet. They will echo the other APIs:


int rc = MQTTAsync_setAuthReceived(c, context, callback_pointer);

rc = MQTTAsync_auth(c, &opts);

The MQTTAsync_auth call, to send an AUTH packet, will be able to be called from the AuthReceived callback in response to an AUTH packet from the server, or separately to enable the client to initiate an authentication exchange.

That rounds up the summary for now, I’m very interested to hear any thoughts.

Author: Ian Craggs

I am the project lead of Eclipse Paho, a member of the Eclipse IoT working group and Eclipse IoT PMC, and co-chair of the OASIS MQTT-SN standardization Technical Sub-Committee.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.