0%

MQTT协议中发布

MQTT协议基础(4):MQTT协议中的发布

MQTT协议中发布相关知识包括PUBLISH数据包、PUBACK数据包、PUBREC数据包、PUBREL数据包、PUBCOMP数据包

一个典型的MQTT消息发送&接收的流程图如下:

具体流程:

  • ClientA连接到Broker
  • ClientB连接到Broker,并订阅主题Topic 1;
  • ClientA给 Broker发送一个Publish数据包,主题为 Topic 1;
  • Broker 收到 ClientA 的消息,发现ClientB 订阅了Topic1,然后通过发送PUBLISH数据包的方式将消息转发到ClientB
  • ClientB 从Broker接收到该消息。

发布消息(PUBLISH)

  • 固定头

    关键点

    • PUBLISH报文类型为3.
    • DUP标识位,当DUP置0时,表明是第一次发送该消息。当置1时,表明是重发消息,这种情况只有QoS大于0的时候才能使用。
    • QoS标识位,服务质量等级
    • Retain标识位,客户端发布的RETAIN=1的消息,服务端保留为问候消息。当新客户订阅相应主题时,服务端将消息发送给订阅者。
  • 可变头

    可变头按顺序包含主题名和报文标识符两个字段。其中报文标识符只有在QoS大于0的时候才存在。

    主题名字段的格式

    报文标识符只有两个字节,具体格式如下

    举个例子

  • 消息体

    PUBLISH的消息体就是该数据包要发送的数据,它可以是任意格式的数据,比如二进制数据,文本,JSON等

    相关演示代码

    node.js下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    //引入mqtt库
    var mqtt= require('mqtt')

    //建立连接
    var client= mqtt.connect('mqtt://mqtt.eclipse.org', {
    clientId:"mqtt_client_id",
    clean:false
    })

    //捕获返回码&当前会话标志
    client.on('connect', function (connack){
    if(connack.returnCode== 0)
    {
    client.publish("topic_a", JSON.stringify({current:25}), {qos:1}, function (err)
    {
    if(err == undefinded)
    {console.log("publish finished")
    client.end()
    }
    else
    {
    console.log("publish failed")}
    }
    }
    )
    }
    else
    {
    console.log("connection finished")
    }
    })

    c++的publish函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
    * @param topic - the topic to publish to
    * @param message - the message to send
    * @return success code -
    */
    int publish(const char* topicName, Message& message);

    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
    * @param topic - the topic to publish to
    * @param payload - the data to send
    * @param payloadlen - the length of the data
    * @param qos - the QoS to send the publish at
    * @param retained - whether the message should be retained
    * @return success code -
    */
    int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);

    /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
    * @param topic - the topic to publish to
    * @param payload - the data to send
    * @param payloadlen - the length of the data
    * @param id - the packet id used - returned
    * @param qos - the QoS to send the publish at
    * @param retained - whether the message should be retained
    * @return success code -
    */
    int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);

发布确认(PUBACK)

在QoS1的情况下,Sender向Receiver发送PUBLISH数据包,Receiver返回一个PUBACK数据包

  • 固定头

    关注点:报文类型为4,剩余长度为2,只有可变头,没有消息体

  • 可变头

    可变头中只包含一个2字节的等待确认的包标识符

在QoS2情况下,面对PUBLISH数据包的响应过程如下

在这种情况下,包含另外三种数据包形式:PUBREC PUBREL PUBCOMP

发布收到(PUBREC)

  • 固定头

    关注的点:类型为5,剩余长度为2,只包含可变头,无消息体

  • 可变头

    只包含待确定的报文标识符

发布释放(PUBREL)

  • 固定头

    关注的点:类型为6,剩余长度为2,只包含可变头,无消息体

  • 可变头

    注意报文标识符的统一

发布完成(PUBCOMP)

  • 固定头

    关注的点:类型为7,剩余长度为2,只包含可变头,无消息体

  • 可变头

    至此QoS2下的发布才结束,保证只有一次的设计目的。
    示意图如下
    深度剖析