MQTT client
Overview
This tutorial demonstrates how Mongoose Library can be used to implement an MQTT client. We'll create an MQTT client that:
- Connects to the public HiveMQ MQTT server
mqtt://broker.hivemq.com:1883
- When connected, subscribes to the topic
mg/+/test
- Publishes a
hello
message to themg/clnt/test
topic - Receives that message back from the subscribed topic and closes the connection
- Creates a timer that checks if a connection is closed, and reconnects to repeat the whole process
Build and run
- Follow the Build Tools tutorial to setup your development environment.
- Start a terminal in the project directory; if you've not already done so, clone the Mongoose Library repo
git clone https://github.com/cesanta/mongoose
- Build the example, this will also start the MQTT client:
cd mongoose/tutorials/mqtt/mqtt-client make clean all
- Observe the log
58c9411 2 main.c:27:fn 1 CREATED 58c960e 2 main.c:42:fn 1 CONNECTED to mqtt://broker.hivemq.com:1883 58c960e 2 main.c:48:fn 1 SUBSCRIBED to mg/+/test 58c960e 2 main.c:56:fn 1 PUBLISHED hello -> mg/clnt/test 58c9708 2 main.c:61:fn 1 RECEIVED hello <- mg/clnt/test
Build with TLS support
To build with Mongoose built-in TLS support, just do
make CFLAGS_EXTRA="-DMG_TLS=MG_TLS_BUILTIN"
In order to build with mbedTLS support, run:
make TLS=mbedtls
Check the "How to build" section of the TLS tutorial for specific information on other building options for your OS
For more information on developing TLS clients, check the TLS tutorial
API
We'll only use three functions from the MQTT API:
struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,
struct mg_mqtt_opts *opts,
mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
The function mg_mqtt_connect()
tries to connect to the MQTT broker whose URL is specified in parameter url
, returning a connection handler or NULL on failure. The function specified in parameter fn
will be our callback function, and parameter fn_data
will be available as c->fn_data
. We can indicate session and last will options in a structure pointed to by the opts
parameter.
The function mg_mqtt_pub()
performs the action of publishing a message; while the function mg_mqtt_sub()
performs the action of subscribing to a topic. In both cases, all relevant information is contained in parameter opts
, a pointer to a struct mg_mqtt_opts. The message body is expected at opts->message
, the topic at opts->topic
; both as mg_str.
For detailed information, see the documentation.
How it works
Tunable parameters, like MQTT server address, topic names, and QoS level, are contained in global variables. Also, we make an MQTT connection pointer, s_conn
, globally visible; so a reconnection timer function is able to check that pointer for NULL, and create a new connection. The connection event handler will set this pointer to NULL when a connection closes. Every condition will trigger an event that we'll process in our event handler function.
Event handler
The MG_EV_OPEN
event is the very first event that is sent to every connection when it is just created and added to the event manager, while MG_EV_OPEN
will be sent on connection closure.
Connection failures will trigger an MG_EV_ERROR
event, an error reason is passed as a char *
pointer in parameter ev_data
; check the error handling tutorial for more information.
When the TCP connection to the MQTT server is established, an MG_EV_CONNECT
event fires and we initialise TLS if the MQTT broker URL is mqtts://
:
Note that, for that to work, the application must be built with TLS enabled. By default, the HiveMQ URL is not TLS, so building with TLS is not required.
Once the MQTT server accepts us as a client, an MG_EV_MQTT_OPEN
event is sent. Then, we subscribe to the desired topic, and immediately publish to it. Note that these functions do not immediately send data to the MQTT server. As any other Mongoose output function, they just append data to the output buffer. Data gets sent when we exit the event handler and Mongoose performs the event manager poll, mg_mgr_poll()
.
When we receive a message - and we should, an MG_EV_MQTT_MSG
event is sent containing a struct mg_mqtt_message; we extract and print the message:
Main function
The main()
function is simple: we initialise an event manager and start the event loop, as usual. We also create a timer - note that the timer is created on the stack, but since it is declared in main()
, it lives as long as the application is running.
Note that the timer is initialised after the event manager. This is important because here the timer is created with the MG_TIMER_RUN_NOW
flag, which executes the timer function immediately. Our timer function will use the event manager, so it must be initialised beforehand.
The timer function is called with an interval of 3 seconds; and our event manager poll timeout is 1 second. For more information on timers, check the timers tutorial.
The event loop executes until a termination signal is received. After that, the mg_mgr_free()
and mg_timer_free()
functions are called to perform
cleanup.
Timer function
The timer function implements the reconnection logic. It creates a client connection s_conn
if it is NULL. This pattern should be used for
any type of client connection that must be kept alive.
Sending QoS 1 messages
Mongoose will publish the message, but it won't retry. Given that MQTT runs over TCP, the most common case is that the message will get through except when the connection is lost or the broker crashes. If you want to retry on failure, you can start a timer on send, catch the PUBACK message and stop the timer or retry when it expires.The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL
when writing the code:
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_PUBACK) {
OK = true;
stop_timer();
}
}
}
opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
opts.retransmit_id = 0;
retries = 1;
do {
opts.retransmit_id = mg_mqtt_pub(c, &opts); // this takes care of setting the PUB flag when opts.retransmit_id != 0
start_timer();
while (timer is running);
} while (!OK && retries--);
Note the example is not actually checking the return code inside the PUBACK message
Sending QoS 2 messages
Mongoose will publish the message, but it won't retry. Given that MQTT runs over TCP, the most common case is that the process will just complete in most cases. To fully implement the process, you have to start a timer on send, catch the PUBREC message (Mongoose will reply with PUBREL for you) and restart the timer, or retry when it expires. Then you will wait to catch the PUBCOMP message, but the broker might send PUBREC again if the PUBREL message didn't get through (e.g.: in cases where the network path is very congested in one way).
The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL
when writing the code:
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_PUBREC) {
PUB = true;
id = mg_ntohs(mm->id);
stop_timer();
} else if (mm->cmd == MQTT_CMD_PUBCOMP) {
OK = true;
stop_timer();
}
}
}
opts.topic = topic, opts.message = data, opts.qos = 2, opts.retain = false;
opts.retransmit_id = 0;
retries = 1;
do {
opts.retransmit_id = mg_mqtt_pub(c, &opts);
start_timer();
while (timer is running);
} while (!PUB && retries--);
if (!PUB) goto error;
retries = 1;
do {
PUB = false;
start_timer();
while (timer is running);
if(PUB) { // resend PUBREL
mg_mqtt_send_header(c, MQTT_CMD_PUBREL, 2, remaining_len); // MQTT5 3.6.1-1, flags = 2
mg_send(c, &opts.retransmit_id, sizeof(opts.retransmit_id));
}
} while (!OK && retries--);
Note the example is not actually checking the return code inside the PUBREC and PUBCOMP messages (MQTT5)
Receiving QoS 2 messages
Mongoose will PUBREC and handle the message to the event handler, it is up to the user to verify this is not a repeated message. Given that MQTT runs over TCP, the most common case is that the process will just complete in most cases. To fully implement the process, you have to hold the message and start a timer, then catch the PUBREL packet (Mongoose will reply with PUBCOMP for you) and process the message, but the broker might send the message again if the PUBREC packet didn't get through (e.g.: in cases where the network path is very congested in one way). In case the broker would send PUBREL again because the PUBCOMP message didn't make it, Mongoose will reply with PUBCOMP for you as said, but you'll still get the event.
The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL
when writing the code:
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
if (ev == MG_EV_MQTT_MSG ) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_PUBLISH) {
id = mg_ntohs(mm->id);
// check if this is the first time (do I have a message with that id ?, check the dup flag, ...)
// then store the message. If it is a dup ignore it, Mongoose will send PUBREC for you
// but you might want to restart the timer.
start_timer();
}
} else if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_PUBREL) {
id = mg_ntohs(mm->id); // check we have an outstanding message with this id
if (timer is running) { // we are waiting for this
OK = true;
stop_timer();
} // otherwise, it is a resend, ignore it
}
}
}
while (timer is running);
OK ? // we have a unique message;
Note the example is not actually checking the return code inside the PUBREL message (MQTT5)