MQTT协议基础(4):MQTT协议中的发布
MQTT协议中发布相关知识包括PUBLISH数据包、PUBACK数据包、PUBREC数据包、PUBREL数据包、PUBCOMP数据包
一个典型的MQTT消息发送&接收的流程图如下:
具体流程:
- ClientA连接到Broker
- ClientB连接到Broker,并订阅主题Topic 1;
- ClientA给 Broker发送一个Publish数据包,主题为 Topic 1;
- Broker 收到 ClientA 的消息,发现ClientB 订阅了Topic1,然后通过发送PUBLISH数据包的方式将消息转发到ClientB
- ClientB 从Broker接收到该消息。
发布消息(PUBLISH)
固定头
关键点
- PUBLISH报文类型为3.
- DUP标识位,当DUP置0时,表明是第一次发送该消息。当置1时,表明是重发消息,这种情况只有QoS大于0的时候才能使用。
- QoS标识位,服务质量等级
- Retain标识位,客户端发布的RETAIN=1的消息,服务端保留为问候消息。当新客户订阅相应主题时,服务端将消息发送给订阅者。
可变头
可变头按顺序包含主题名和报文标识符两个字段。其中报文标识符只有在QoS大于0的时候才存在。
主题名字段的格式
报文标识符只有两个字节,具体格式如下
举个例子
消息体
PUBLISH的消息体就是该数据包要发送的数据,它可以是任意格式的数据,比如二进制数据,文本,JSON等
相关演示代码
node.js下代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31//引入mqtt库
var mqtt= require('mqtt')
//建立连接
var client= mqtt.connect('mqtt://mqtt.eclipse.org', {
clientId:"mqtt_client_id",
clean:false
})
//捕获返回码&当前会话标志
client.on('connect', function (connack){
if(connack.returnCode== 0)
{
client.publish("topic_a", JSON.stringify({current:25}), {qos:1}, function (err)
{
if(err == undefinded)
{console.log("publish finished")
client.end()
}
else
{
console.log("publish failed")}
}
}
)
}
else
{
console.log("connection finished")
}
})c++的publish函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param topic - the topic to publish to
* @param message - the message to send
* @return success code -
*/
int publish(const char* topicName, Message& message);
/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param topic - the topic to publish to
* @param payload - the data to send
* @param payloadlen - the length of the data
* @param qos - the QoS to send the publish at
* @param retained - whether the message should be retained
* @return success code -
*/
int publish(const char* topicName, void* payload, size_t payloadlen, enum QoS qos = QOS0, bool retained = false);
/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param topic - the topic to publish to
* @param payload - the data to send
* @param payloadlen - the length of the data
* @param id - the packet id used - returned
* @param qos - the QoS to send the publish at
* @param retained - whether the message should be retained
* @return success code -
*/
int publish(const char* topicName, void* payload, size_t payloadlen, unsigned short& id, enum QoS qos = QOS1, bool retained = false);
发布确认(PUBACK)
在QoS1的情况下,Sender向Receiver发送PUBLISH数据包,Receiver返回一个PUBACK数据包
固定头
关注点:报文类型为4,剩余长度为2,只有可变头,没有消息体
可变头
可变头中只包含一个2字节的等待确认的包标识符
在QoS2情况下,面对PUBLISH数据包的响应过程如下
在这种情况下,包含另外三种数据包形式:PUBREC PUBREL PUBCOMP
发布收到(PUBREC)
固定头
关注的点:类型为5,剩余长度为2,只包含可变头,无消息体
可变头
只包含待确定的报文标识符
发布释放(PUBREL)
固定头
关注的点:类型为6,剩余长度为2,只包含可变头,无消息体
可变头
注意报文标识符的统一
发布完成(PUBCOMP)
固定头
关注的点:类型为7,剩余长度为2,只包含可变头,无消息体
可变头
至此QoS2下的发布才结束,保证只有一次的设计目的。
示意图如下