Implements an MQTT agent (or daemon task) to enable multithreaded access to coreMQTT. More...
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "core_mqtt_agent.h"
#include "core_mqtt_agent_command_functions.h"
#include "core_mqtt_agent_default_logging.h"
Functions | |
static MQTTStatus_t | addAwaitingOperation (MQTTAgentContext_t *pAgentContext, uint16_t packetId, MQTTAgentCommand_t *pCommand) |
Track an operation by adding it to a list, indicating it is anticipating an acknowledgment. More... | |
static MQTTAgentAckInfo_t * | getAwaitingOperation (MQTTAgentContext_t *pAgentContext, uint16_t incomingPacketId) |
Retrieve an operation from the list of pending acks, and optionally remove it from the list. More... | |
static MQTTStatus_t | createCommand (MQTTAgentCommandType_t commandType, const MQTTAgentContext_t *pMqttAgentContext, void *pMqttInfoParam, MQTTAgentCommandCallback_t commandCompleteCallback, MQTTAgentCommandContext_t *pCommandCompleteCallbackContext, MQTTAgentCommand_t *pCommand) |
Populate the parameters of a MQTTAgentCommand struct. More... | |
static MQTTStatus_t | addCommandToQueue (const MQTTAgentContext_t *pAgentContext, MQTTAgentCommand_t *pCommand, uint32_t blockTimeMs) |
Add a command to the global command queue. More... | |
static MQTTStatus_t | processCommand (MQTTAgentContext_t *pMqttAgentContext, MQTTAgentCommand_t *pCommand, bool *pEndLoop) |
Process a MQTTAgentCommand struct. More... | |
static void | mqttEventCallback (MQTTContext_t *pMqttContext, MQTTPacketInfo_t *pPacketInfo, MQTTDeserializedInfo_t *pDeserializedInfo) |
Dispatch incoming publishes and acks to their various handler functions. More... | |
static void | handleAcks (const MQTTAgentContext_t *pAgentContext, const MQTTPacketInfo_t *pPacketInfo, const MQTTDeserializedInfo_t *pDeserializedInfo, MQTTAgentAckInfo_t *pAckInfo, uint8_t packetType) |
Mark a command as complete after receiving an acknowledgment packet. More... | |
static MQTTAgentContext_t * | getAgentFromMQTTContext (MQTTContext_t *pMQTTContext) |
Retrieve a pointer to an agent context given an MQTT context. More... | |
static MQTTStatus_t | createAndAddCommand (MQTTAgentCommandType_t commandType, const MQTTAgentContext_t *pMqttAgentContext, void *pMqttInfoParam, MQTTAgentCommandCallback_t commandCompleteCallback, MQTTAgentCommandContext_t *pCommandCompleteCallbackContext, uint32_t blockTimeMs) |
Helper function for creating a command and adding it to the command queue. More... | |
static void | concludeCommand (const MQTTAgentContext_t *pAgentContext, MQTTAgentCommand_t *pCommand, MQTTStatus_t returnCode, uint8_t *pSubackCodes) |
Helper function to mark a command as complete and invoke its callback. This function calls the releaseCommand callback. More... | |
static MQTTStatus_t | resendPublishes (MQTTAgentContext_t *pMqttAgentContext) |
Resend QoS 1 and 2 publishes after resuming a session. More... | |
static void | clearPendingAcknowledgments (MQTTAgentContext_t *pMqttAgentContext, bool clearOnlySubUnsubEntries) |
Clears the list of pending acknowledgments by invoking each callback with MQTTRecvFailed either for ALL operations in the list OR only for Subscribe/Unsubscribe operations. More... | |
static bool | validateStruct (const MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentCommandInfo_t *pCommandInfo) |
Validate an MQTTAgentContext_t and a MQTTAgentCommandInfo_t from API functions. More... | |
static bool | validateParams (MQTTAgentCommandType_t commandType, const void *pParams) |
Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE or PUBLISH. More... | |
static bool | isSpaceInPendingAckList (const MQTTAgentContext_t *pAgentContext) |
Called before accepting any PUBLISH or SUBSCRIBE messages to check there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE. More... | |
MQTTStatus_t | MQTTAgent_Init (MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentMessageInterface_t *pMsgInterface, const MQTTFixedBuffer_t *pNetworkBuffer, const TransportInterface_t *pTransportInterface, MQTTGetCurrentTimeFunc_t getCurrentTimeMs, MQTTAgentIncomingPublishCallback_t incomingCallback, void *pIncomingPacketContext) |
Perform any initialization the MQTT agent requires before it can be used. Must be called before any other function. More... | |
MQTTStatus_t | MQTTAgent_CommandLoop (MQTTAgentContext_t *pMqttAgentContext) |
Process commands from the command queue in a loop. More... | |
MQTTStatus_t | MQTTAgent_ResumeSession (MQTTAgentContext_t *pMqttAgentContext, bool sessionPresent) |
Resume a session by resending publishes if a session is present in the broker, or clear state information if not. More... | |
MQTTStatus_t | MQTTAgent_CancelAll (MQTTAgentContext_t *pMqttAgentContext) |
Cancel all enqueued commands and those awaiting acknowledgment while the command loop is not running. More... | |
MQTTStatus_t | MQTTAgent_Subscribe (const MQTTAgentContext_t *pMqttAgentContext, MQTTAgentSubscribeArgs_t *pSubscriptionArgs, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to call MQTT_Subscribe() for an MQTT connection. More... | |
MQTTStatus_t | MQTTAgent_Unsubscribe (const MQTTAgentContext_t *pMqttAgentContext, MQTTAgentSubscribeArgs_t *pSubscriptionArgs, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to call MQTT_Unsubscribe() for an MQTT connection. More... | |
MQTTStatus_t | MQTTAgent_Publish (const MQTTAgentContext_t *pMqttAgentContext, MQTTPublishInfo_t *pPublishInfo, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to call MQTT_Publish() for an MQTT connection. More... | |
MQTTStatus_t | MQTTAgent_ProcessLoop (const MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentCommandInfo_t *pCommandInfo) |
Send a message to the MQTT agent purely to trigger an iteration of its loop, which will result in a call to MQTT_ProcessLoop(). This function can be used to wake the MQTT agent task when it is known data may be available on the connected socket. More... | |
MQTTStatus_t | MQTTAgent_Connect (const MQTTAgentContext_t *pMqttAgentContext, MQTTAgentConnectArgs_t *pConnectArgs, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to call MQTT_Connect() for an MQTT connection. If a session is resumed with the broker, it will also resend the necessary QoS1/2 publishes. More... | |
MQTTStatus_t | MQTTAgent_Disconnect (const MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to disconnect an MQTT connection. More... | |
MQTTStatus_t | MQTTAgent_Ping (const MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a command to call MQTT_Ping() for an MQTT connection. More... | |
MQTTStatus_t | MQTTAgent_Terminate (const MQTTAgentContext_t *pMqttAgentContext, const MQTTAgentCommandInfo_t *pCommandInfo) |
Add a termination command to the command queue. More... | |
Implements an MQTT agent (or daemon task) to enable multithreaded access to coreMQTT.
The agent provides an equivalent API for each coreMQTT API. Whereas coreMQTT APIs are prefixed "MQTT_", the agent APIs are prefixed "MQTTAgent_". For example, that agent's MQTTAgent_Publish() API is the thread safe equivalent to coreMQTT's MQTT_Publish() API.
|
static |
Track an operation by adding it to a list, indicating it is anticipating an acknowledgment.
[in] | pAgentContext | Agent context for the MQTT connection. |
[in] | packetId | Packet ID of pending ack. |
[in] | pCommand | Pointer to command that is expecting an ack. |
|
static |
Retrieve an operation from the list of pending acks, and optionally remove it from the list.
[in] | pAgentContext | Agent context for the MQTT connection. |
[in] | incomingPacketId | Packet ID of incoming ack. |
|
static |
Populate the parameters of a MQTTAgentCommand struct.
[in] | commandType | Type of command. For example, publish or subscribe. |
[in] | pMqttAgentContext | Pointer to MQTT context to use for command. |
[in] | pMqttInfoParam | Pointer to MQTTPublishInfo_t or MQTTSubscribeInfo_t. |
[in] | commandCompleteCallback | Callback for when command completes. |
[in] | pCommandCompleteCallbackContext | Context and necessary structs for command. |
[out] | pCommand | Pointer to initialized command. |
|
static |
Add a command to the global command queue.
[in] | pAgentContext | Agent context for the MQTT connection. |
[in] | pCommand | Pointer to command to copy to queue. |
[in] | blockTimeMs | The maximum amount of time to milliseconds to wait in the Blocked state (so not consuming any CPU time) for the command to be posted to the queue should the queue already be full. |
|
static |
Process a MQTTAgentCommand struct.
[in] | pMqttAgentContext | Agent context for MQTT connection. |
[in] | pCommand | Pointer to command to process. |
[out] | pEndLoop | Whether the command loop should terminate. |
|
static |
Dispatch incoming publishes and acks to their various handler functions.
[in] | pMqttContext | MQTT Context |
[in] | pPacketInfo | Pointer to incoming packet. |
[in] | pDeserializedInfo | Pointer to deserialized information from the incoming packet. |
|
static |
Mark a command as complete after receiving an acknowledgment packet.
[in] | pAgentContext | Agent context for the MQTT connection. |
[in] | pPacketInfo | Pointer to incoming packet. |
[in] | pDeserializedInfo | Pointer to deserialized information from the incoming packet. |
[in] | pAckInfo | Pointer to stored information for the original operation resulting in the received packet. |
[in] | packetType | The type of the incoming packet, either SUBACK, UNSUBACK, PUBACK, or PUBCOMP. |
|
static |
Retrieve a pointer to an agent context given an MQTT context.
[in] | pMQTTContext | MQTT Context to search for. |
|
static |
Helper function for creating a command and adding it to the command queue.
[in] | commandType | Type of command. |
[in] | pMqttAgentContext | Handle of the MQTT connection to use. |
[in] | pCommandCompleteCallbackContext | Context and necessary structs for command. |
[in] | commandCompleteCallback | Callback for when command completes. |
[in] | pMqttInfoParam | Pointer to command argument. |
[in] | blockTimeMs | Maximum amount of time in milliseconds to wait (in the Blocked state, so not consuming any CPU time) for the command to be posted to the MQTT agent should the MQTT agent's event queue be full. |
|
static |
Helper function to mark a command as complete and invoke its callback. This function calls the releaseCommand callback.
[in] | pAgentContext | Agent context for the MQTT connection. |
[in] | pCommand | Command to complete. |
[in] | returnCode | Return status of command. |
[in] | pSubackCodes | Pointer to suback array, if command is a SUBSCRIBE. |
|
static |
Resend QoS 1 and 2 publishes after resuming a session.
[in] | pMqttAgentContext | Agent context for the MQTT connection. |
|
static |
Clears the list of pending acknowledgments by invoking each callback with MQTTRecvFailed either for ALL operations in the list OR only for Subscribe/Unsubscribe operations.
[in] | pMqttAgentContext | Agent context of the MQTT connection. |
[in] | clearOnlySubUnsubEntries | Flag indicating whether all entries OR entries pertaining to only Subscribe/Unsubscribe operations should be cleaned from the list. |
|
static |
Validate an MQTTAgentContext_t and a MQTTAgentCommandInfo_t from API functions.
[in] | pMqttAgentContext | MQTTAgentContext_t to validate. |
[in] | pCommandInfo | MQTTAgentCommandInfo_t to validate. |
true
if parameters are valid, else false
.
|
static |
Validate the parameters for a CONNECT, SUBSCRIBE, UNSUBSCRIBE or PUBLISH.
[in] | commandType | CONNECT, SUBSCRIBE, UNSUBSCRIBE, or PUBLISH. |
[in] | pParams | Parameter structure to validate. |
true
if parameter structure is valid, else false
.
|
static |
Called before accepting any PUBLISH or SUBSCRIBE messages to check there is space in the pending ACK list for the outgoing PUBLISH or SUBSCRIBE.
[in] | pAgentContext | Pointer to the context for the MQTT connection to which the PUBLISH or SUBSCRIBE message is to be sent. |
MQTTStatus_t MQTTAgent_Init | ( | MQTTAgentContext_t * | pMqttAgentContext, |
const MQTTAgentMessageInterface_t * | pMsgInterface, | ||
const MQTTFixedBuffer_t * | pNetworkBuffer, | ||
const TransportInterface_t * | pTransportInterface, | ||
MQTTGetCurrentTimeFunc_t | getCurrentTimeMs, | ||
MQTTAgentIncomingPublishCallback_t | incomingCallback, | ||
void * | pIncomingPacketContext | ||
) |
Perform any initialization the MQTT agent requires before it can be used. Must be called before any other function.
[in] | pMqttAgentContext | Pointer to struct to initialize. |
[in] | pMsgInterface | Command interface to use for allocating and sending commands. |
[in] | pNetworkBuffer | Pointer to network buffer to use. |
[in] | pTransportInterface | Transport interface to use with the MQTT library. See https://www.freertos.org/network-interface.html |
[in] | getCurrentTimeMs | Pointer to a function that returns a count value that increments every millisecond. |
[in] | incomingCallback | The callback to execute when receiving publishes. |
[in] | pIncomingPacketContext | A pointer to a context structure defined by the application writer. |
pIncomingPacketContext
context provided for the incoming publish callback MUST remain in scope throughout the period that the agent task is running.Example
MQTTStatus_t MQTTAgent_CommandLoop | ( | MQTTAgentContext_t * | pMqttAgentContext | ) |
Process commands from the command queue in a loop.
[in] | pMqttAgentContext | The MQTT agent to use. |
Example
MQTTStatus_t MQTTAgent_ResumeSession | ( | MQTTAgentContext_t * | pMqttAgentContext, |
bool | sessionPresent | ||
) |
Resume a session by resending publishes if a session is present in the broker, or clear state information if not.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | sessionPresent | The session present flag from the broker. |
MQTT_Publish()
Example
MQTTStatus_t MQTTAgent_CancelAll | ( | MQTTAgentContext_t * | pMqttAgentContext | ) |
Cancel all enqueued commands and those awaiting acknowledgment while the command loop is not running.
Canceled commands will be terminated with return code MQTTRecvFailed.
[in] | pMqttAgentContext | The MQTT agent to use. |
Example
MQTTStatus_t MQTTAgent_Subscribe | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
MQTTAgentSubscribeArgs_t * | pSubscriptionArgs, | ||
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to call MQTT_Subscribe() for an MQTT connection.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pSubscriptionArgs | Struct describing topic to subscribe to. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_Unsubscribe | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
MQTTAgentSubscribeArgs_t * | pSubscriptionArgs, | ||
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to call MQTT_Unsubscribe() for an MQTT connection.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pSubscriptionArgs | List of topics to unsubscribe from. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_Publish | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
MQTTPublishInfo_t * | pPublishInfo, | ||
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to call MQTT_Publish() for an MQTT connection.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pPublishInfo | MQTT PUBLISH information. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_ProcessLoop | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Send a message to the MQTT agent purely to trigger an iteration of its loop, which will result in a call to MQTT_ProcessLoop(). This function can be used to wake the MQTT agent task when it is known data may be available on the connected socket.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
Example
MQTTStatus_t MQTTAgent_Connect | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
MQTTAgentConnectArgs_t * | pConnectArgs, | ||
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to call MQTT_Connect() for an MQTT connection. If a session is resumed with the broker, it will also resend the necessary QoS1/2 publishes.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in,out] | pConnectArgs | Struct holding args for MQTT_Connect(). On a successful connection after the command is processed, the sessionPresent member will be filled to indicate whether the broker resumed an existing session. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_Disconnect | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to disconnect an MQTT connection.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_Ping | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a command to call MQTT_Ping() for an MQTT connection.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example
MQTTStatus_t MQTTAgent_Terminate | ( | const MQTTAgentContext_t * | pMqttAgentContext, |
const MQTTAgentCommandInfo_t * | pCommandInfo | ||
) |
Add a termination command to the command queue.
On command loop termination, all pending commands in the queue, as well as those waiting for an acknowledgment, will be terminated with error code MQTTRecvFailed.
[in] | pMqttAgentContext | The MQTT agent to use. |
[in] | pCommandInfo | The information pertaining to the command, including:
|
pCommandInfo
parameter MUST remain in scope at least until the callback has been executed by the agent task.Example