In my previous post I described the motivation behind writing a new MQTT C client, and the first sample – how to publish a message with a small application program.
Now I want to go onto the second sample pub0sub1.c. This is meant to mimic the sort of program that subscribes to a command topic, and publishes a value from sensor at a regular interval – in this case 1 second.
The first part of the program is similar to the first, sending a connect packet. But now because we want to subscription, we should wait for the connack packet to be sent back from the server. First we set the timeout on the socket, so when we read from it, if no data arrives, it will return rather than block. Because we want to send a sensor reading every 1 second, we set the timeout to 1 second.
struct timeval tv; tv.tv_sec = 1; tv.tv_usec = 0; setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(struct timeval));
A helper function, MQTTPacket_read is used to get the data for one packet into a supplied buffer buf. The buffer length is specified in buflen – to avoid overruns. MQTT packet lengths are defined in a specific variable-length field, which this function reads, and then attempts to read the rest of that packet.
MQTTPacket_read(buf, buflen, getdata);
The third parameter is a pointer to helper function which is called to read the data from whatever source is required. In this case, a socket:
int mysock = 0; int getdata(char* buf, size_t count) { return recv(mysock, buf, (size_t)count, 0); }
The received data, of length count, is placed into the buffer, and the length of received data returned. MQTTPacket_read will return the code of the packet type received, or -1 if there was no data. So we check that the packet was a CONNACK, and then parse it with a call to MQTTDeserialize_connack. This allows us to check the connack return code.
if (MQTTPacket_read(buf, buflen, getdata) == CONNACK) { int connack_rc; if (MQTTDeserialize_connack(&connack_rc, buf, buflen) != 1 || connack_rc != 0) { printf("Unable to connect, return code %d\n", connack_rc); goto exit; } } else goto exit;
Next we follow a similar process with sending a subscribe packet and waiting for the suback.
topicString.cstring = "substopic"; len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); rc = write(mysock, buf, len); if (MQTTPacket_read(buf, buflen, getdata) == SUBACK) /* wait for suback */ { int submsgid; int subcount; int granted_qos; rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); if (granted_qos != 0) { printf("granted qos != 0, %d\n", granted_qos); goto exit; } } else goto exit;
Finally we create the main loop. It calls MQTTPacket_read to wait for incoming publish packets. The read times out after 1 second if no packet arrives. If a publish does arrive, we parse it into variables we can use, like payload_in.
while (!toStop) { if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH) { int dup, qos, retained, msgid, payloadlen_in, rc; char* payload_in; MQTTString receivedTopic; rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic, &payload_in, &payloadlen_in, buf, buflen); printf("message arrived %.*s\n", payloadlen_in, payload_in); } printf("publishing reading\n"); len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen); rc = write(mysock, buf, len); }
Whether we receive an incoming message or not, we do send a reading from the sensor every time.
The structure MQTTString has two parts – a length delimited string and a C style null-delimited string.
typedef struct { int len; char* data; } MQTTLenString; typedef struct { char* cstring; MQTTLenString lenstring; } MQTTString; #define MQTTString_initializer {NULL, {0, NULL}}
MQTT strings are length delimited. To avoid unnecessary copying of received data, by default when a packet is parsed, or deserialized, the string data is not copied into an MQTTString. Instead, the length delimited type has its length set, along with a pointer to the appropriate place in the buffer into which MQTTPacket_read placed the data read from the socket.
Later, if we want to, we can convert the length delimited string into a null-delimited C string (and I will add a helper function for this). This will probably involve allocating some memory, and copying the data so we can add the null character. For efficiency, we were trying to avoid both actions, which is why C-style strings are not created by default.
All the possible error conditions are by no means catered for in this program. Adding that error handling would improve reliability at the cost of size. As it is, whereas the QoS 0 publisher came in at under 5k compiled (64-bit Linux), this program is less than 10k (more than a completely empty program). I’m reasonably happy with that.
These examples are using the standard socket calls. Next step is to try real embedded environments to see how well this MQTT client fits in.