MQTT Command Queuing

I'm not sure how real this problem might be, but in working through the Gateway Node software design it occurred to me that there may be more than a single outstanding MQTT callback command awaiting action at any point in time. Recall that the Gateway Node generates MQTT messages, and forwards them to an MQTT broker, on receipt of packets from Sensor Nodes. It also receives MQTT instructions from the broker, via a callback mechanism, to pass on to relevant Sensor Nodes. My original interface with the callback mechanism assumed that there'd never be more than a single instruction to process at any point in time, but on reflection, it seemed that this may not always be the case.

Having implemented a queuing mechanism in support of RPDP, it seemed appropriate to use the same mechanism to resolve this issue. As such, instead of setting a few global variables and assuming that nothing would change before the processor next cycled around into TX_State, Gateway Node packets assembled in response to MQTT callbacks are assembled and placed in the mqttBuffer to be processed during the next transmission cycle.

The essential declarations of variables involved is similar to those used for the ackBuffer, as illustrated.

Packet Buffer Construct

#include <PacketHandler.h>// LoRa Packet management
#include <LinkedList.h>// Linked List support for MQTT and ACK buffers

struct bufferElement {
PacketHandler packet;
unsigned long startTime;
int resendCount;
};

LinkedList<bufferElement> *mqttBuffer = new LinkedList<bufferElement>();

Only the packet component of the buffer element is used in this particular application, but the use of a common structure for buffer elements makes transferring packets from the mqttBuffer to the ackBuffer, when acknowledgement of the original packet is required, a trivial matter.

Another class was defined to simplify the task of splitting out the individual components of an MQTT topic. This is not a 'universal' class in that it effectively only splits out the first three components of a topic (that's all that I have used to date).

The mqttTopic Class

// mqttTopic.h
#ifndef mqttTopic_h
#define mqttTopic_h

#include <Arduino.h>

class mqttTopic {
private:
String _node, _type, _qualifier;
void init();
public:
mqttTopic();
void begin(String _topic);
String node();
String type();
String qualifier();
};
#endif

// mqttTopic.cpp
#include "mqttTopic.h"

mqttTopic::mqttTopic() {
}

void mqttTopic::init() {
_node = "";
_type= "";
_qualifier = "";
}

void mqttTopic::begin(String _topic) {
init();
char delimiter = '/';
unsigned int _length = _topic.length();
int _i = 0;
while ((_i < _length) && (_topic.charAt(_i) != delimiter)) {
_node += _topic.charAt(_i);
_i++;
}
_i++;
while ((_i < _length) && (_topic.charAt(_i) != delimiter)) {
_type += _topic.charAt(_i);
_i++;
}
_i++;
while ((_i < _length) && (_topic.charAt(_i) != delimiter)) {
_qualifier += _topic.charAt(_i);
_i++;
}
}

String mqttTopic::node() {
return _node;
}

String mqttTopic::type() {
return _type;
}

String mqttTopic::qualifier() {
return _qualifier;
}

Operation

The following is the mqttCallback function that includes the MQTT packet buffering process. In this case, as distinct from the RPDP case, we do not need a timer function—we simply empty the buffer at an appropriate point in the transmission cycle. This process may nonetheless simply involve the transfer of one or more packets from the mqttBuffer to the ackBuffer for subsequent RPDP transmission.

MQTT Callback

void mqttCallback(String topic, byte* message, unsigned int length) {

// If a message is received on a */pump/* topic, check if the message is either on or off.
// A 'pump' topic will be of the form node/type/qualifier => node/pump/<n>
// Send an appropriate instruction to the pump controller Node

mqttTopic localTopic;
localTopic.begin(topic);
bool queuePacket = true;
if ( localTopic.type() == "pump" ) {
uint8_t topicQualifier = localTopic.qualifier().toInt();
switch ( topicQualifier ) {
case 1: {
pumpMAC = pump1MAC;
Serial.print("Pump 1 to ");
break;
}
case 2: {
pumpMAC = pump2MAC;
Serial.print("Pump 2 to ");
break;
}
case 3: {
pumpMAC = pump3MAC;
Serial.print("Pump 3 to ");
break;
}
// Add cases for additional qualifiers as required
default: {
Serial.println("[mqttCallback] WARNING: No topic qualifier specified");
Serial.print(" TOPIC: ");
Serial.println(topic);
queuePacket = false;
}
}
if ( queuePacket ) {
if ( messageString == "ON" ) {
pumpPowerState = PH_POWER_ON;
} else if ( messageString == "OFF" ) {
pumpPowerState = PH_POWER_OFF;
}
// The sequence number will be inserted before the packet is sent
element.packet.begin(pumpMAC,myMAC,0,PUMP);
element.packet.setRelFlag();
element.packet.setPumpId(topicQualifier);
element.packet.setPumpState(pumpPowerState);
mqttBuffer->add(element);
checkMqttBuffer = true;
state = TX_State;
}
} else {// Add code to process additional topics as required
Serial.println("[mqttCallback] WARNING: No matching topic type '" + localTopic.type() + "'");
}
}

The main loop of the sketch then includes the following in the TX_State case of the switch statement:

MQTT Buffer Processing

if ( checkMqttBuffer ) {
// Send any packets in the MQTT buffer
bufferElement element;
bufferSize = mqttBuffer->size();
while ( bufferSize > 0 ) {
element = mqttBuffer->remove(0);
bufferSize = mqttBuffer->size();
element.packet.setSequenceNumber(messageCounter);
element.packet.mqttOut(mqttClientPtr);
sendMessage(element.packet);
// We need to wait a tick here to let the packet go out
// before doing anything more with the radio
delay(100);

if ( element.packet.relFlag() ) {
// Add the packet to the buffer of packets awaiting ACK
element.resendCount = 1;
element.startTime = millis();
ackBuffer->add(element);
}
}
checkMqttBuffer = false;
}

Note that it is important for the Gateway Node to check the various buffers in the correct sequence, to ensure that one process does not block the other:

  1. Any ACK packet needs to be responded to with a priority, because the sender is awaiting a reply;
  2. The mqttBuffer is processed next, because its processing is not time-critical;
  3. The ackBuffer needs to be checked last because we will need to transition immediately to the RX_State if we are to receive any subsequent ACK.

The Gateway Node sketch provided at the bottom of the RPDP page includes an implementation of the MQTT queuing feature.

19-12-2024