Receiving messages with the Paho embedded C client

In my previous post I described the motivation behind writing a new MQTT C client, and the first sample – how to publish a message with a small application program.

Now I want to go onto the second sample pub0sub1.c. This is meant to mimic the sort of program that subscribes to a command topic, and publishes a value from sensor at a regular interval – in this case 1 second.

The first part of the program is similar to the first, sending a connect packet. But now because we want to subscription, we should wait for the connack packet to be sent back from the server. First we set the timeout on the socket, so when we read from it, if no data arrives, it will return rather than block. Because we want to send a sensor reading every 1 second, we set the timeout to 1 second.

struct timeval tv;
tv.tv_sec = 1;  
tv.tv_usec = 0;  
setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));

A helper function, MQTTPacket_read is used to get the data for one packet into a supplied buffer buf. The buffer length is specified in buflen – to avoid overruns. MQTT packet lengths are defined in a specific variable-length field, which this function reads, and then attempts to read the rest of that packet.

MQTTPacket_read(buf, buflen, getdata);

The third parameter is a pointer to helper function which is called to read the data from whatever source is required. In this case, a socket:

int mysock = 0;

int getdata(char* buf, size_t count)
{
	return recv(mysock, buf, (size_t)count, 0);
}

The received data, of length count, is placed into the buffer, and the length of received data returned. MQTTPacket_read will return the code of the packet type received, or -1 if there was no data. So we check that the packet was a CONNACK, and then parse it with a call to MQTTDeserialize_connack. This allows us to check the connack return code.

if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
{
	int connack_rc;

	if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
	{
		printf("Unable to connect, return code %d\n", connack_rc);
		goto exit;
	}
}
else
	goto exit;

Next we follow a similar process with sending a subscribe packet and waiting for the suback.

topicString.cstring = "substopic";
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

rc = write(mysock, buf, len);
if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) 	/* wait for suback */
{
	int submsgid;
	int subcount;
	int granted_qos;

	rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
	if (granted_qos != 0)
	{
		printf("granted qos != 0, %d\n", granted_qos);
		goto exit;
	}
}
else
	goto exit;

Finally we create the main loop. It calls MQTTPacket_read to wait for incoming publish packets. The read times out after 1 second if no packet arrives. If a publish does arrive, we parse it into variables we can use, like payload_in.

while (!toStop)
{
	if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
	{
		int dup, qos, retained, msgid, payloadlen_in, rc;
		char* payload_in;
		MQTTString receivedTopic;

		rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
					&payload_in, &payloadlen_in, buf, buflen);
		printf("message arrived %.*s\n", payloadlen_in, payload_in);
	}

	printf("publishing reading\n");
	len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
	rc = write(mysock, buf, len);
}

Whether we receive an incoming message or not, we do send a reading from the sensor every time.

The structure MQTTString has two parts – a length delimited string and a C style null-delimited string.

typedef struct
{
	int len;
	char* data;
} MQTTLenString;

typedef struct
{
	char* cstring;
	MQTTLenString lenstring;
} MQTTString;

#define MQTTString_initializer {NULL, {0, NULL}}

MQTT strings are length delimited. To avoid unnecessary copying of received data, by default when a packet is parsed, or deserialized, the string data is not copied into an MQTTString. Instead, the length delimited type has its length set, along with a pointer to the appropriate place in the buffer into which MQTTPacket_read placed the data read from the socket.

Later, if we want to, we can convert the length delimited string into a null-delimited C string (and I will add a helper function for this). This will probably involve allocating some memory, and copying the data so we can add the null character. For efficiency, we were trying to avoid both actions, which is why C-style strings are not created by default.

All the possible error conditions are by no means catered for in this program. Adding that error handling would improve reliability at the cost of size. As it is, whereas the QoS 0 publisher came in at under 5k compiled (64-bit Linux), this program is less than 10k (more than a completely empty program). I’m reasonably happy with that.

These examples are using the standard socket calls. Next step is to try real embedded environments to see how well this MQTT client fits in.

New “Embedded” Paho MQTT C Client

Yesterday I added the first phase of a new C client for “embedded” platforms to the Paho project in a new repository. It might seem odd to start another C client library, but I think the logic is sound.

When I started writing the first version of the Paho C client it was to work with IBM MQTT servers like Microbroker, Message Broker (now IBM Integration Bus Advanced) and my own RSMB. In spite of trying to keep it as lightweight as I could, I had to respond to many requirements without which the library would not be ‘complete’. So it grew. The main priority was to make MQTT programming simple – size was not that important.

The end result is a library which builds to about 80k (if you strip the debugging symbols), and is not easily stripped down if you want something smaller. A later addition was the asynchronous client, in the same repository, which changed the programming model to be entirely non-blocking, but didn’t make the library any smaller.

Even though most devices are getting ever more capable, there are still the smallest, most frugal motes that have limited capabilities. MQTT was intended to work on these devices, which is why the protocol is public – you can write a bespoke client library if you want. The existing client libraries are too big and inflexible, so this is the first stage in attempting to fix that.

I didn’t want to destabilize the existing libraries, so decided to start afresh. These small devices might not use the usual libraries for networking, threads, timers or even memory allocation. So I thought I would start from the bottom up, with a basic lower layer that would omit all network or memory allocation calls. This lower layer merely serializes and deserializes MQTT packet data so that it can be written to or read from the data destination or source. The end goal is a totally asynchronous API – perhaps with an external definition akin to the existing asynchronous API.

The best way to see it in action is to look at some samples. The simplest sample qos0pub.c is to send a QoS 0 MQTT message. It connects, publishes and disconnects all in one go, making it one of the shortest MQTT applications you can write. The core of the program looks like:

MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
char buf[200];
MQTTString topicString = MQTTString_initializer;
char* payload = "mypayload";
int payloadlen = strlen(payload);int buflen = sizeof(buf);

data.clientID.cstring = "me";
data.keepAliveInterval = 20;
data.cleansession = 1;
len = MQTTSerialize_connect(buf, buflen, &data); /* 1 */

topicString.cstring = "mytopic";
len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); /* 2 */

len += MQTTSerialize_disconnect(buf + len, buflen - len); /* 3 */

rc = Socket_new("127.0.0.1", 1883, &mysock);
rc = write(mysock, buf, len);
rc = close(mysock);

We set up the data we want to connect with, and add that to a buffer (1). Then we add the MQTT publish command to the end of the same buffer (2). Lastly we add the disconnect command to the same buffer (3). The disconnect is not strictly necessary – it’s an act of kindness to the server, for the cost of a few bytes.

We can write this buffer directly to a socket which is connected to an MQTT server, and then close the socket immediately, and we’ve published a message. The entire program comes in at about 5k over and above a completely empty one using RedHat and gcc – which is more like it, although I’d prefer smaller.

I’ll talk about receiving messages in a second post.

MQTT, QoS and Persistence

Over the years, every now and again I get asked “what is the point of supporting QoS 1 or 2 in MQTT without full persistence?” I drew up this table a while back to help answer that question.

First a reminder of MQTT definitions of reliability of QoS:

  • 0 – at most once (fire and forget)
  • 1 – at least once
  • 2 – once only
Error Situation No Persistence Persistence
Communications errors: broken connection from whatever cause (intermittent signals e.g. over wireless, hardware problem, etc) All QoSs apply with any persistence (will hold to their reliability criteria). All QoSs apply with any persistence (will hold to their reliability criteria).
Power failure to remote host machine (bridge connection) All QoSs apply with any persistence (will hold to their reliability criteria). All QoSs apply with any persistence (will hold to their reliability criteria).
Power failure to local host machine All in-progress messages lost. Retained messages and subscriptions lost Shutdown persistence: all state since last restart lost.
Full persistence: full protection for all QoSs, as long as all persistence store write caching is turned off, both in the operating system and in the hardware. Otherwise, data can still be lost
Filesystem corruption All state lost All state lost.
Normal persistence will not help. Raid or other disk mirroring needed
Hard drive failure All state lost All state lost.
Normal persistence will not help. Raid or other disk mirroring needed
Complete system destruction All state lost All state lost.
Normal persistence will not help. Mirroring to another system needed
Destruction of location All state lost All state lost.
Normal persistence will not help. Mirroring to system in remote location needed
End of the world or universe All state lost All state lost.
Normal persistence will not help. Remote location needs to be on another planet or in another universe

Which Paho MQTT C API to use? (And some history)

It may not be obvious if you take a look at the Paho MQTT (http://mqtt.org) C client libraries (http://www.eclipse.org/paho/) , but my intention all along was to avoid proliferation of C APIs.

When I first started writing a C client for MQTT, around 2008, there was one other C MQTT API that I knew about – the now withdrawn IA93 (http://www-01.ibm.com/support/docview.wss?uid=swg24006525).  With all respect to the author, this had an interface which I did not want to copy.  Firstly, there were the variable length option structures, which meant that you had to copy topic strings, for instance, into their correct location in the structure before making the API call.  Secondly, there was the overly complicated method of setting up the API to operate with background threads.

IA93 did have one feature that I felt it was desirable to copy: the ability to work using no background threads.  As MQTT is intended for embedded devices, we would like the library to work in very simple environments.

My other starting point was the MQTT Java API at the time, I saw no reason why the C API should be much more difficult to use.  So I set out to create a C API that was close to the Java API, with the addition that it could be optionally run with no background threads.  This became the MQTTClient API (http://www.eclipse.org/paho/files/mqttdoc/Cclient/index.html).  The parameters are similar to the Java API – no unnecessary variable length structures.  The method I chose to indicate whether you want background threads or not is to simply make a call to MQTTClient_setCallbacks().  Asking for callbacks means that you have to have a background thread, right?

I was developing Really Small Message Broker (RSMB) around the same time, which is single threaded, using multiplexed I/O on the TCP port.  At the time, every MQTT server I knew of was multi-threaded – at least one thread for every connecting MQTT client.  I thought it would be interesting to see how far you could get with a single thread: reminding me of the techniques we had to use in the 1980s to write games on ZX81s and the like.  So, I thought reusing the same approach would be good for the C client – hence only one background thread no matter how many client objects you create (limited to whatever the ‘select’ system call will handle).

Another conscious decision was to use C rather than C++.  I started RSMB in C++, but quickly found out that compiler template support was pretty buggy at the time, so dropped back to ANSI standard C.  That was somewhat disappointing, but several years later does have the advantage of not being incompatible with Objective-C.

The final part of the story so far is the widespread use of graphical environments, or more accurately, environments where the application itself is a series of callbacks.   The ubiquitous example is the JavaScript program running in the web browser.  When the Paho MQTT JavaScript API was devised, it needed to be completely asynchronous.  The previous C API, in the mould of the Java API, had some calls which would block, to make programming easier.  We needed a totally asynchronous C API too, so I followed the model of the JavaScript API – which unfortunately meant a new interface, MQTTAsync (http://www.eclipse.org/paho/files/mqttdoc/Casync/index.html).

So, which to use when?

  1. MQTTClient, single-threaded: when you want to use only one thread for your entire application.
  2. MQTTClient, multi-threaded: when you want ease of use, aren’t bothered about some calls blocking, but still want messages to be sent and received in the background.
  3. MQTTAsync: when you want no calls to block at all.  (Messages will be sent and received on two background threads).

The MQTTAsync API happens to perform better than the MQTTClient API, but that wasn’t the rationale behind it.

Thanks to a Paho contributor, Frank Pagliughi, there is also now a C++ layer over the asynchronous C API.  His intent is to have an API which mirrors the corresponding Java one as much as possible.

Hmm, I think we’ve been here before!