Receiving messages with the Paho embedded C client

In my previous post I described the motivation behind writing a new MQTT C client, and the first sample – how to publish a message with a small application program.

Now I want to go onto the second sample pub0sub1.c. This is meant to mimic the sort of program that subscribes to a command topic, and publishes a value from sensor at a regular interval – in this case 1 second.

The first part of the program is similar to the first, sending a connect packet. But now because we want to subscription, we should wait for the connack packet to be sent back from the server. First we set the timeout on the socket, so when we read from it, if no data arrives, it will return rather than block. Because we want to send a sensor reading every 1 second, we set the timeout to 1 second.

struct timeval tv;
tv.tv_sec = 1;  
tv.tv_usec = 0;  
setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));

A helper function, MQTTPacket_read is used to get the data for one packet into a supplied buffer buf. The buffer length is specified in buflen – to avoid overruns. MQTT packet lengths are defined in a specific variable-length field, which this function reads, and then attempts to read the rest of that packet.

MQTTPacket_read(buf, buflen, getdata);

The third parameter is a pointer to helper function which is called to read the data from whatever source is required. In this case, a socket:

int mysock = 0;

int getdata(char* buf, size_t count)
{
	return recv(mysock, buf, (size_t)count, 0);
}

The received data, of length count, is placed into the buffer, and the length of received data returned. MQTTPacket_read will return the code of the packet type received, or -1 if there was no data. So we check that the packet was a CONNACK, and then parse it with a call to MQTTDeserialize_connack. This allows us to check the connack return code.

if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
{
	int connack_rc;

	if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0)
	{
		printf("Unable to connect, return code %d\n", connack_rc);
		goto exit;
	}
}
else
	goto exit;

Next we follow a similar process with sending a subscribe packet and waiting for the suback.

topicString.cstring = "substopic";
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

rc = write(mysock, buf, len);
if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) 	/* wait for suback */
{
	int submsgid;
	int subcount;
	int granted_qos;

	rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
	if (granted_qos != 0)
	{
		printf("granted qos != 0, %d\n", granted_qos);
		goto exit;
	}
}
else
	goto exit;

Finally we create the main loop. It calls MQTTPacket_read to wait for incoming publish packets. The read times out after 1 second if no packet arrives. If a publish does arrive, we parse it into variables we can use, like payload_in.

while (!toStop)
{
	if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
	{
		int dup, qos, retained, msgid, payloadlen_in, rc;
		char* payload_in;
		MQTTString receivedTopic;

		rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
					&payload_in, &payloadlen_in, buf, buflen);
		printf("message arrived %.*s\n", payloadlen_in, payload_in);
	}

	printf("publishing reading\n");
	len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
	rc = write(mysock, buf, len);
}

Whether we receive an incoming message or not, we do send a reading from the sensor every time.

The structure MQTTString has two parts – a length delimited string and a C style null-delimited string.

typedef struct
{
	int len;
	char* data;
} MQTTLenString;

typedef struct
{
	char* cstring;
	MQTTLenString lenstring;
} MQTTString;

#define MQTTString_initializer {NULL, {0, NULL}}

MQTT strings are length delimited. To avoid unnecessary copying of received data, by default when a packet is parsed, or deserialized, the string data is not copied into an MQTTString. Instead, the length delimited type has its length set, along with a pointer to the appropriate place in the buffer into which MQTTPacket_read placed the data read from the socket.

Later, if we want to, we can convert the length delimited string into a null-delimited C string (and I will add a helper function for this). This will probably involve allocating some memory, and copying the data so we can add the null character. For efficiency, we were trying to avoid both actions, which is why C-style strings are not created by default.

All the possible error conditions are by no means catered for in this program. Adding that error handling would improve reliability at the cost of size. As it is, whereas the QoS 0 publisher came in at under 5k compiled (64-bit Linux), this program is less than 10k (more than a completely empty program). I’m reasonably happy with that.

These examples are using the standard socket calls. Next step is to try real embedded environments to see how well this MQTT client fits in.

New “Embedded” Paho MQTT C Client

Yesterday I added the first phase of a new C client for “embedded” platforms to the Paho project in a new repository. It might seem odd to start another C client library, but I think the logic is sound.

When I started writing the first version of the Paho C client it was to work with IBM MQTT servers like Microbroker, Message Broker (now IBM Integration Bus Advanced) and my own RSMB. In spite of trying to keep it as lightweight as I could, I had to respond to many requirements without which the library would not be ‘complete’. So it grew. The main priority was to make MQTT programming simple – size was not that important.

The end result is a library which builds to about 80k (if you strip the debugging symbols), and is not easily stripped down if you want something smaller. A later addition was the asynchronous client, in the same repository, which changed the programming model to be entirely non-blocking, but didn’t make the library any smaller.

Even though most devices are getting ever more capable, there are still the smallest, most frugal motes that have limited capabilities. MQTT was intended to work on these devices, which is why the protocol is public – you can write a bespoke client library if you want. The existing client libraries are too big and inflexible, so this is the first stage in attempting to fix that.

I didn’t want to destabilize the existing libraries, so decided to start afresh. These small devices might not use the usual libraries for networking, threads, timers or even memory allocation. So I thought I would start from the bottom up, with a basic lower layer that would omit all network or memory allocation calls. This lower layer merely serializes and deserializes MQTT packet data so that it can be written to or read from the data destination or source. The end goal is a totally asynchronous API – perhaps with an external definition akin to the existing asynchronous API.

The best way to see it in action is to look at some samples. The simplest sample qos0pub.c is to send a QoS 0 MQTT message. It connects, publishes and disconnects all in one go, making it one of the shortest MQTT applications you can write. The core of the program looks like:

MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
char buf[200];
MQTTString topicString = MQTTString_initializer;
char* payload = "mypayload";
int payloadlen = strlen(payload);int buflen = sizeof(buf);

data.clientID.cstring = "me";
data.keepAliveInterval = 20;
data.cleansession = 1;
len = MQTTSerialize_connect(buf, buflen, &data); /* 1 */

topicString.cstring = "mytopic";
len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); /* 2 */

len += MQTTSerialize_disconnect(buf + len, buflen - len); /* 3 */

rc = Socket_new("127.0.0.1", 1883, &mysock);
rc = write(mysock, buf, len);
rc = close(mysock);

We set up the data we want to connect with, and add that to a buffer (1). Then we add the MQTT publish command to the end of the same buffer (2). Lastly we add the disconnect command to the same buffer (3). The disconnect is not strictly necessary – it’s an act of kindness to the server, for the cost of a few bytes.

We can write this buffer directly to a socket which is connected to an MQTT server, and then close the socket immediately, and we’ve published a message. The entire program comes in at about 5k over and above a completely empty one using RedHat and gcc – which is more like it, although I’d prefer smaller.

I’ll talk about receiving messages in a second post.

Use of Gotos in C code (in RSMB)

I saw that John Donovan criticised my use of gotos in the Really Small Message Broker (RSMB) C code a little while back in his article MQTT and the language of the Internet of Things.

To quote:

And, despite some missing features, a crash bug (for which I have posted a fix), and a scary number of goto statements (i.e. more than 0), it does the job admirably.

The crash bug was due to an unimplemented feature of MQTT-SN, but let’s get onto the use of gotos.  The RSMB code is written in C, not C++.  I made that decision in 2008 when I started to implement RSMB in C++.  I thought I would use all the features of C++ to build a nicely object oriented architecture, while making the code as efficient and small as it could be.  That approach lasted all of a few hours, as I  discovered that template support in the version of gcc I had to hand was decidedly dodgy.

Nothing for it, I had to resort to standard, portable C.  On the bright side, that meant I didn’t have to contend with huge standard libraries being included in the executable.  On the down side (and this is a big downside) I had to obtain or write my own collection libraries.

As RSMB became part of WebSphere MQ for a while, under the name of Telemetry Daemon for Devices, I was required to add some features to aid bug fixing.  One of these is a stack trace (the pros and cons of that belong in a different post).  What that meant is that each function had to end with a macro:


  FUNC_EXIT;
}

In those functions where some code inside a nested loop or two found an error and needed to leave the function, I had two potential approaches (apart from restructuring the loops):

  1. have multiple returns from the function, with multiple FUNC_EXIT macros, or
  2. use a goto to the single exit point of the function.

I chose the latter, as it meant only one FUNC_EXIT call was needed, meaning less scope for forgetting to include it before any return statement. This means that each function only has one exit point, which some people prefer in any case.

The use of goto is therefore constrained to the following pattern:


int function(int parm)
{
...
   goto exit;  /* an error condition */
...
   goto exit;  /* another error condition */
...

exit:
  FUNC_EXIT;
}

It struck me recently, rather belatedly, that this is what exceptions are. So if you dislike my use of gotos, please consider them to be exceptions instead.

With the right sort of macros, maybe I could even make them look like exceptions



#define raise(label) goto label;

#define handle(label) label:

but I think that would be wilfully obscure 🙂