MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是由 IBM 的 Andy Stanford-Clark 和 Arlen Nipper 于 1999 年创建的轻量级消息传输协议,专为 低带宽、高延迟、不可靠网络环境 设计。
MQTT 的核心定位是 物联网(IoT)的通信标准。它提供了:
# Python MQTT 发布者
import paho.mqtt.client as mqtt
import json
import time
# MQTT 配置
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC = "/sensors/temperature"
# 创建客户端
client = mqtt.Client(client_id="publisher_001")
# 连接
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# 发布消息
while True:
data = {
"device_id": "sensor_01",
"temperature": 22.5 + (time.time() % 5),
"humidity": 45 + (time.time() % 10),
"timestamp": int(time.time())
}
payload = json.dumps(data)
result = client.publish(MQTT_TOPIC, payload, qos=1)
print(f"发送消息: {payload}")
time.sleep(5)
client.disconnect()
# Python MQTT 订阅者
import paho.mqtt.client as mqtt
import json
# MQTT 配置
MQTT_BROKER = "broker.emqx.io"
MQTT_PORT = 1883
MQTT_TOPIC = "/sensors/#"
# 回调:连接成功
def on_connect(client, userdata, flags, rc):
print(f"连接成功: {rc}")
client.subscribe(MQTT_TOPIC, qos=1)
# 回调:收到消息
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload)
print(f"收到消息: topic={msg.topic}, data={payload}")
except:
print(f"收到消息: topic={msg.topic}, payload={msg.payload}")
# 创建客户端
client = mqtt.Client(client_id="subscriber_001")
client.on_connect = on_connect
client.on_message = on_message
# 连接
client.connect(MQTT_BROKER, MQTT_PORT, 60)
# 开始循环
client.loop_forever()
// Node.js MQTT 发布者
const mqtt = require("mqtt");
const broker = "mqtt://broker.emqx.io:1883";
const topic = "/sensors/temperature";
// 连接
const client = mqtt.connect(broker, {
clientId: "publisher_001"
});
client.on("connect", () => {
console.log("已连接到 MQTT Broker");
// 定时发布消息
setInterval(() => {
const data = {
device_id: "sensor_01",
temperature: 22.5 + Math.random() * 5,
humidity: 45 + Math.random() * 10,
timestamp: Math.floor(Date.now() / 1000)
};
const payload = JSON.stringify(data);
client.publish(topic, payload, { qos: 1 });
console.log(`发送消息: ${payload}`);
}, 5000);
});
// Node.js MQTT 订阅者
const mqtt = require("mqtt");
const broker = "mqtt://broker.emqx.io:1883";
const topic = "/sensors/#";
// 连接
const client = mqtt.connect(broker, {
clientId: "subscriber_001"
});
client.on("connect", () => {
console.log("已连接到 MQTT Broker");
client.subscribe(topic, { qos: 1 });
});
client.on("message", (topic, message) => {
try {
const data = JSON.parse(message.toString());
console.log(`收到消息: topic=${topic}, data=`, data);
} catch {
console.log(`收到消息: topic=${topic}, payload=${message.toString()}`);
}
});
// PHP MQTT 发布者(需要 php-mosquitto 扩展)
$broker = "broker.emqx.io";
$port = 1883;
$topic = "/sensors/temperature";
$client = new Mosquitto\Client();
$client->connect($broker, $port);
while (true) {
$data = [
"device_id" => "sensor_01",
"temperature" => 22.5 + rand(0, 50) / 10,
"humidity" => 45 + rand(0, 100) / 10,
"timestamp" => time()
];
$payload = json_encode($data);
$client->publish($topic, $payload, 1);
echo "发送消息: $payload\n";
sleep(5);
}
// Arduino/ESP8266 MQTT 示例
#include
#include
const char* ssid = "your-ssid";
const char* password = "your-password";
const char* mqtt_server = "broker.emqx.io";
WiFiClient espClient;
PubSubClient client(espClient);
// 连接 WiFi
void setup_wifi() {
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
// MQTT 回调
void callback(char* topic, byte* payload, unsigned int length) {
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.printf("收到消息: %s\n", message.c_str());
}
void setup() {
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
if (client.connect("esp8266_client")) {
client.subscribe("/sensors/#");
Serial.println("MQTT 连接成功");
}
}
void loop() {
if (!client.connected()) {
client.connect("esp8266_client");
}
client.loop();
// 发送传感器数据
float temperature = 22.5 + random(0, 50) / 10.0;
String payload = "{\"temp\": " + String(temperature) + "}";
client.publish("/sensors/temperature", payload.c_str());
delay(5000);
}
| 对比项 | MQTT | HTTP | WebSocket | AMQP |
|---|---|---|---|---|
| 消息头大小 | 2 字节 | ~200 字节 | ~20 字节 | ~10 字节 |
| 模式 | 发布/订阅 | 请求/响应 | 全双工 | 发布/订阅 |
| QoS 支持 | ✅ 3 级 | ❌ | ❌ | ✅ |
| 遗嘱消息 | ✅ | ❌ | ❌ | ❌ |
| 资源消耗 | 极低 | 高 | 中 | 中 |
| 适用场景 | IoT 设备 | Web 应用 | 实时 Web | 企业消息 |
MQTT 核心概念、发布/订阅模式、QoS 级别
Broker 搭建(EMQX/Mosquitto)、Topic 设计、通配符使用
遗嘱消息、持久会话、认证授权、SSL/TLS 加密
物联网项目、传感器数据采集、与云平台集成
MQTT 是物联网世界的"通用语言"。
它用 轻量级、发布/订阅、三种 QoS 解决了低带宽、不可靠网络环境下的设备通信问题。MQTT 是 IoT 工程师的必修课。
"MQTT 让万物互联变得简单可靠。" 🌐