MQTT client

Overview

This tutorial demonstrates how Mongoose Library can be used to implement an MQTT client. We'll create an MQTT client that:

  1. Connects to the public HiveMQ MQTT server mqtt://broker.hivemq.com:1883
  2. When connected, subscribes to the topic mg/+/test
  3. Publishes a hello message to the mg/clnt/test topic
  4. Receives that message back from the subscribed topic and closes the connection
  5. Creates a timer that checks if a connection is closed, and reconnects to repeat the whole process

Build and run

  • Follow the Build Tools tutorial to setup your development environment.
  • Start a terminal in the project directory; if you've not already done so, clone the Mongoose Library repo
    git clone https://github.com/cesanta/mongoose
    
  • Build the example, this will also start the MQTT client:
    cd mongoose/examples/mqtt-client
    make clean all
    
  • Observe the log
    58c9411 2 main.c:27:fn                  1 CREATED
    58c960e 2 main.c:42:fn                  1 CONNECTED to mqtt://broker.hivemq.com:1883
    58c960e 2 main.c:48:fn                  1 SUBSCRIBED to mg/+/test
    58c960e 2 main.c:56:fn                  1 PUBLISHED hello -> mg/clnt/test
    58c9708 2 main.c:61:fn                  1 RECEIVED hello <- mg/clnt/test
    

Build with TLS support

To build with Mongoose built-in TLS support, just do

make CFLAGS_EXTRA="-DMG_TLS=MG_TLS_BUILTIN"

In order to build with mbedTLS support, run:

make TLS=mbedtls

Check the "How to build" section of the TLS tutorial for specific information on other building options for your OS

For more information on developing TLS clients, check the TLS tutorial

API

We'll only use three functions from the MQTT API:

struct mg_connection *mg_mqtt_connect(struct mg_mgr *mgr, const char *url,
                                      struct mg_mqtt_opts *opts,
                                      mg_event_handler_t fn, void *fn_data);
void mg_mqtt_pub(struct mg_connection *c, const struct mg_mqtt_opts *opts);
void mg_mqtt_sub(struct mg_connection *c, const struct mg_mqtt_opts *opts);

The function mg_mqtt_connect() tries to connect to the MQTT broker whose URL is specified in parameter url, returning a connection handler or NULL on failure. The function specified in parameter fn will be our callback function, and parameter fn_data will be available as c->fn_data. We can indicate session and last will options in a structure pointed to by the opts parameter.

The function mg_mqtt_pub() performs the action of publishing a message; while the function mg_mqtt_sub() performs the action of subscribing to a topic. In both cases, all relevant information is contained in parameter opts, a pointer to a struct mg_mqtt_opts. The message body is expected at opts->message, the topic at opts->topic; both as mg_str.

For detailed information, see the documentation.

How it works

Tunable parameters, like MQTT server address, topic names, and QoS level, are contained in global variables. Also, we make an MQTT connection pointer, s_conn, globally visible; so a reconnection timer function is able to check that pointer for NULL, and create a new connection. The connection event handler will set this pointer to NULL when a connection closes. Every condition will trigger an event that we'll process in our event handler function.

Event handler

The MG_EV_OPEN event is the very first event that is sent to every connection when it is just created and added to the event manager, while MG_EV_OPEN will be sent on connection closure.

Connection failures will trigger an MG_EV_ERROR event, an error reason is passed as a char * pointer in parameter ev_data; check the error handling tutorial for more information.

When the TCP connection to the MQTT server is established, an MG_EV_CONNECT event fires and we initialise TLS if the MQTT broker URL is mqtts://:

Note that, for that to work, the application must be built with TLS enabled. By default, the HiveMQ URL is not TLS, so building with TLS is not required.

Once the MQTT server accepts us as a client, an MG_EV_MQTT_OPEN event is sent. Then, we subscribe to the desired topic, and immediately publish to it. Note that these functions do not immediately send data to the MQTT server. As any other Mongoose output function, they just append data to the output buffer. Data gets sent when we exit the event handler and Mongoose performs the event manager poll, mg_mgr_poll().

When we receive a message - and we should, an MG_EV_MQTT_MSG event is sent containing a struct mg_mqtt_message; we extract and print the message:

Main function

The main() function is simple: we initialise an event manager and start the event loop, as usual. We also create a timer - note that the timer is created on the stack, but since it is declared in main(), it lives as long as the application is running.

Note that the timer is initialised after the event manager. This is important because here the timer is created with the MG_TIMER_RUN_NOW flag, which executes the timer function immediately. Our timer function will use the event manager, so it must be initialised beforehand.

The timer function is called with an interval of 3 seconds; and our event manager poll timeout is 1 second. For more information on timers, check the timers tutorial.

The event loop executes until a termination signal is received. After that, the mg_mgr_free() and mg_timer_free() functions are called to perform cleanup.

Timer function

The timer function implements the reconnection logic. It creates a client connection s_conn if it is NULL. This pattern should be used for any type of client connection that must be kept alive.

Here we connect with a clean session and request the broker to publish a "bye" message on our behalf, when detecting our disconnection.

Sending QoS 1 messages

Mongoose will publish the message, but it won't retry. Given that MQTT runs over TCP, the most common case is that the message will get through except when the connection is lost or the broker crashes. If you want to retry on failure, you can start a timer on send, catch the PUBACK message and stop the timer or retry when it expires.The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL when writing the code:

static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
  if (ev == MG_EV_MQTT_CMD) {
    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
    if (mm->cmd == MQTT_CMD_PUBACK) {
      OK = true;
      stop_timer();
    }
  }
}

  opts.topic = topic, opts.message = data, opts.qos = 1, opts.retain = false;
  opts.retransmit_id = 0;
  retries = 1;
  do {
    opts.retransmit_id = mg_mqtt_pub(c, &opts); // this takes care of setting the PUB flag when opts.retransmit_id != 0
    start_timer();
    while (timer is running);
  } while (!OK && retries--);

Note the example is not actually checking the return code inside the PUBACK message

Sending QoS 2 messages

Mongoose will publish the message, but it won't retry. Given that MQTT runs over TCP, the most common case is that the process will just complete in most cases. To fully implement the process, you have to start a timer on send, catch the PUBREC message (Mongoose will reply with PUBREL for you) and restart the timer, or retry when it expires. Then you will wait to catch the PUBCOMP message, but the broker might send PUBREC again if the PUBREL message didn't get through (e.g.: in cases where the network path is very congested in one way).

The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL when writing the code:

static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
  if (ev == MG_EV_MQTT_CMD) {
    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
  if (mm->cmd == MQTT_CMD_PUBREC) {
      PUB = true;
      id = mg_ntohs(mm->id);
      stop_timer();
  } else if (mm->cmd == MQTT_CMD_PUBCOMP) {
      OK = true;
      stop_timer();
    }
  }
}

  opts.topic = topic, opts.message = data, opts.qos = 2, opts.retain = false;
  opts.retransmit_id = 0;
  retries = 1;
  do {
    opts.retransmit_id = mg_mqtt_pub(c, &opts);
    start_timer();
    while (timer is running);
  } while (!PUB && retries--);
  if (!PUB) goto error;
  retries = 1;
  do {
    PUB = false;
    start_timer();
    while (timer is running);
    if(PUB) { // resend PUBREL
      mg_mqtt_send_header(c, MQTT_CMD_PUBREL, 2, remaining_len);  // MQTT5 3.6.1-1, flags = 2
      mg_send(c, &opts.retransmit_id, sizeof(opts.retransmit_id));
    }
  } while (!OK && retries--);

Note the example is not actually checking the return code inside the PUBREC and PUBCOMP messages (MQTT5)

Receiving QoS 2 messages

Mongoose will PUBREC and handle the message to the event handler, it is up to the user to verify this is not a repeated message. Given that MQTT runs over TCP, the most common case is that the process will just complete in most cases. To fully implement the process, you have to hold the message and start a timer, then catch the PUBREL packet (Mongoose will reply with PUBCOMP for you) and process the message, but the broker might send the message again if the PUBREC packet didn't get through (e.g.: in cases where the network path is very congested in one way). In case the broker would send PUBREL again because the PUBCOMP message didn't make it, Mongoose will reply with PUBCOMP for you as said, but you'll still get the event.

The following is a conceptual piece of pseudo-C to illustrate that, you might want to catch MG_EV_POLL when writing the code:

static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
  if (ev == MG_EV_MQTT_MSG ) {
    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
    if (mm->cmd == MQTT_CMD_PUBLISH) {
      id = mg_ntohs(mm->id);
      // check if this is the first time (do I have a message with that id ?, check the dup flag, ...)
      // then store the message. If it is a dup ignore it, Mongoose will send PUBREC for you
      // but you might want to restart the timer.
      start_timer();
    }
  } else if (ev == MG_EV_MQTT_CMD) {
    struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
    if (mm->cmd == MQTT_CMD_PUBREL) {
      id = mg_ntohs(mm->id);  // check we have an outstanding message with this id
      if (timer is running) { // we are waiting for this
        OK = true;
        stop_timer();
      }                       // otherwise, it is a resend, ignore it
    }
  }
}

  while (timer is running);
  OK ? // we have a unique message;

Note the example is not actually checking the return code inside the PUBREL message (MQTT5)