RabbitMQ 是一个开源的消息代理(Message Broker),由 Rabbit Technologies(现 VMware)于 2007 年开发。RabbitMQ 基于 AMQP(高级消息队列协议) 实现,是最流行的开源消息中间件之一。
RabbitMQ 的核心定位是 可靠、灵活的消息队列系统。它提供了:
RabbitMQ 由 Rabbit Technologies 于 2007 年开发,最初是为了提供 AMQP 协议的开源实现。2010 年,Rabbit Technologies 被 VMware 收购,后转入 Pivotal Software。
// 使用 php-amqplib 库
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// ========== 发送消息 ==========
// 声明队列
$channel->queue_declare('hello', false, true, false, false);
// 发送消息
$msg = new AMQPMessage('Hello World!', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$channel->basic_publish($msg, '', 'hello');
// ========== 接收消息 ==========
$callback = function($msg) {
echo '收到消息: ', $msg->body, "\n";
$msg->ack();
};
$channel->basic_consume('hello', '', false, false, false, false, $callback);
while($channel->is_consuming()) {
$channel->wait();
}
// 关闭连接
$channel->close();
$connection->close();
// 创建 Exchange
$channel->exchange_declare('logs', 'fanout', false, true, false);
// 绑定队列到 Exchange
$channel->queue_bind('queue1', 'logs');
$channel->queue_bind('queue2', 'logs');
// 发送消息到 Exchange
$msg = new AMQPMessage('Broadcast message');
$channel->basic_publish($msg, 'logs');
// Topic Exchange
$channel->exchange_declare('topic_logs', 'topic', false, true, false);
// 绑定带有通配符的 Routing Key
$channel->queue_bind('queue_critical', 'topic_logs', '*.critical');
$channel->queue_bind('queue_all', 'topic_logs', '#');
// 发送消息到 Topic Exchange
$msg = new AMQPMessage('Error message');
$channel->basic_publish($msg, 'topic_logs', 'error.critical');
# 配置文件 /etc/rabbitmq/rabbitmq.conf
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3
# 启用镜像队列
# 在管理界面或通过命令设置
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
# 内存和磁盘限制
vm_memory_high_watermark.relative = 0.8
disk_free_limit.relative = 1.0
// 生产者确认
$channel->confirm_select();
$msg = new AMQPMessage('Important message', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$channel->basic_publish($msg, '', 'task_queue');
$channel->wait_for_pending_acks();
// 消费者手动确认
$callback = function($msg) {
try {
// 处理消息
processMessage($msg->body);
$msg->ack(); // 确认成功
} catch (Exception $e) {
$msg->nack(true); // 拒绝并重新入队
}
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
RabbitMQ 安装、核心概念、简单收发消息
Exchange 类型、Bindings、队列属性、消息持久化
集群配置、镜像队列、高可用、消息确认机制
多语言集成、延迟消息、死信队列、监控运维
RabbitMQ 是微服务时代的"消息高速公路"。
它用 灵活的路由、可靠的消息传递、多协议支持,成为分布式系统、微服务架构、事件驱动架构的核心组件。
无论你用 Java、Python、PHP 还是 Go,RabbitMQ 都是 跨语言异步通信的最佳解决方案。
"RabbitMQ 让服务之间的对话变得可靠而优雅。" 🐰