发布:2023/11/2 20:47:39作者:管理员 来源:本站 浏览次数:485
1.起步
安装步骤自行下载安装。
可视化界面:http://localhost:15672 默认登录用户名guest,密码guest
5672:通信默认端口号
15672:可视化管理控制台默认端口号
25672:集群通信端口号
注意: 阿里云 ECS 服务器如果出现 RabbitMQ 安装成功,外网不能访问是因为安全组的问题没有开放端口 解决方案
2.基本概念
1.生产者:即发送消息客户端
释义:给队列发送消息,俗称生产者。
2.消费者:即接收消息服务端
释义:接收生产者发送过来的消息,并加以处理。
3.channel:连接通道
释义:RabbitMQ的连接对象通道,在代码中全部操作基于channel通道。
4.exchange:交换机
释义:消息生产者发送消息到交换机(也可以说交换机是消息第一步到的地方),交换机基于一系列路由关系将消息传递到消息队列(queue)。生产者发消息的时候必须指定一个 exchange,否则消息无法直接到达消息队列,Exchange将消息路由到一个或多个Queue中(或者丢弃)。若不指定 exchange(为空)会默认指向 AMQP default 交换机,AMQP default 路由规则是根据 routingKey 和 mq 上有没有相同名字的队列进行匹配路由。
5.queue:消息队列
释义:消息队列,真正意义上消息存储的地方。
6.routingKey:路由键(queue可满足多种routingKey)
释义:关系路由键。例:一个房间(queue),一个钥匙(exchange),一段暗号(routingKey),只有当三者同时满足消息就会匹配成功。
7.durable:消息持久化
释义:消息队列是存在于内存中的,如果当服务器挂掉会导致消息丢失,所以消息持久化非常有必要。
8.Ack消息应答:收到消息消费后告诉队列消息---{ noAck: false }
释义:Ack为消息应答机制。消费端成功处理完消息后向将消息回传至消息队列,RabbitMQ会将此条消息删除。如果消费端在消息处理过程中出现宕机,挂掉了没有将消息回传至消息队列,此时RabbitMQ会视为此条消息没有被处理将消息重新放回消息队列等待重新被处理,所以消息应答机制也是非要有必要的。
2.1交换机类型:exchange交换机有四种类型
1.fanout类型(广播模式):
640?wx_fmt=png
它的路由规则很简单,它会把所以发送到该exchange的消息全部路由到与他绑定的Queue里面,不需要设置路由键(类似广播). 例:所有与exchange交换机有关的Queue有关的消息队列都会发送。
2.direct类型:
640?wx_fmt=png
它根据routingKey定制的规则进行路由(在基本概念中6处有解释)
生产者发送消息的routingKey为【IOS】,则消费者接收的routingKey也要为【IOS】
生产者发送消息的routingKey为空,则消费者接收的routingKey也要为空。
生产者发送消息的routingKey为【IOS】,消费者接收的routingKey为空。消费者将接收不到任何消息。
如果生产者发送消息的routingKey为【IOS,空】,消费者接收的routingKey为空和为IOS的都可以接收消息。如果有2个消费者同时开启他们routingKey一个为【IOS】一个为空,以10条消息为例2个消费者分别消费5条消息。(轮训分发)
3.topic类型:
640?wx_fmt=png
生产者指定 routingKey 消息根据消费端指定的队列通过模糊匹配的方式进行相应转发,两种通配符模式: #:可匹配一个或多个关键字 *:只能匹配一个关键字
4.header类型:
exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的 header 数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值 header Exchange 类型用的比较少,可以自行 google 了解。
3.轮询调度与公平调度
3.1轮训调度:
在不开启公平调度模式下,默认即开启轮询调度。(direct类型中第四条有描述到)
3.2公平调度:
实现公平调度,当消费者1在消费消息的过程中MQ没有收到Ack的消息反馈将不会再次发送消息给消息1。(意思就是活干完在给你活,干多少活给多少消息,谁干的快谁给的多)
//开启公平调度
await channel.prefetch(1,false)
/**
*prefetch 参数说明
*count:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费
*
*global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false
*/
4.代码实例
生产者:
const amqp = require('amqplib');
async function product(params) {
//1.创建连接对象
const connection = await amqp.connect('amqp://localhost:5672');
//2.获取通道对象
const channel = await connection.createChannel();
//3.声明交换机参数
const exchangeName = 'exchange_direct1';
//4.声明路由key参数
const routingKey = 'ios';
//5.消息参数
const msg = "hello direct";
//6.关联交换机,交换机设置类型,并将消息持久化 { durable: true } 参数为将消息持久化
await channel.assertExchange(exchangeName, 'direct', { durable: true })
//7.设置公平调度
await channel.prefetch(1,false)
for (var i = 0; i < 10; i++) {
//8.发送消息
await channel.publish(exchangeName, routingKey, Buffer.from(`${msg}--------第${i}条消息`))
}
console.log("生产者消息发送完毕")
//9.关闭通道
await channel.close();
//10.关闭连接
await connection.close();
}
product()
消费者:(开启2个消费者,这里同时实现公平调度,消息应答机制)
const amqp = require('amqplib')
async function consumer() {
//1.创建连接对象
const connection = await amqp.connect("amqp://localhost:5672");
//2.获取通道
const channel = await connection.createChannel();
//3.声明交换机参数
const exchangeName = 'exchange_direct1';
//4.声明消息队列参数
const queueName = 'Queue_direct1';
//5.声明路由key参数
const routingKey = "ios";
//6.关联交换机,设置交换机类型,并将消息持久化 { durable: true } 参数为将消息持久化
await channel.assertExchange(exchangeName, 'direct', { durable: true });
//7.关联消息队列 autoDelete:true 设置队列为空时自动删除
await channel.assertQueue(queueName,{autoDelete:true, durable: false});
//8.绑定关系(队列,交换机,路由键)
await channel.bindQueue(queueName, exchangeName, routingKey)
//9.设置公平调度
await channel.prefetch(1, false)
//10.消费队列消息
await channel.consume(queueName, msg => {
console.log("routingKey---ios:" + msg.content.toString());
//11.开启消息通知
channel.ack(msg);
}, { noAck: false });//12.开启消息应答机制
console.log("消费端启动成功")
}
async function consumer2() {
//1.创建连接对象
const connection = await amqp.connect("amqp://localhost:5672");
//2.获取通道对象
const channel = await connection.createChannel();
//3.声明交换机参数
const exchangeName = 'exchange_direct1';
//4.声明队列参数
const queueName = 'Queue_direct1';
//5.声明路由key参数
const routingKey = "ios";
//6.关联交换机,设置交换机类型,并将消息持久化{ durable: true } 参数为将消息持久化
await channel.assertExchange(exchangeName, 'direct', { durable: true });
//7.关联消息队列 autoDelete:true 设置队列为空时自动删除
await channel.assertQueue(queueName,{autoDelete:true, durable: false});
//8.绑定关系(队列,交换机,路由键)
await channel.bindQueue(queueName, exchangeName, routingKey)
//9.设置公平调度
await channel.prefetch(1, false)
//10.消费队列消息
await channel.consume(queueName, msg => {
console.log("routingKey:" + msg.content.toString());
//11.开启消息通知
channel.ack(msg);
}, { noAck: false });//12.开启消息应答机制
console.log("消费端启动成功")
}
consumer()
consumer2()
五.实现RPC模式
生产者:
const amqp = require('amqplib');
const uuid = require('uuid')
async function product(params) {
//1.创建连接对象
const connection = await amqp.connect('amqp://localhost:5672');
//2.获取通道对象
const channel = await connection.createChannel();
const exchangeName = "exchange_rpc1";
const queue = "rpc_1";
const routingKey = "routingKey_rpc1"
const replyQueue = "reply_rpc1"
await channel.assertExchange(exchangeName, 'direct', { durable: true });//消息持久化
await channel.assertQueue(queue,{autoDelete:true, durable: false})
await channel.bindQueue(queue, exchangeName, routingKey)
await channel.prefetch(1, false)
await channel.publish(exchangeName, routingKey, Buffer.from("哈喽,rpc"),
{ contentType: "application/json", deliveryMode: 2, correlationId: uuid.v1(), replyTo: replyQueue })
console.log("生产者消息发送完毕")
await channel.assertQueue(replyQueue)
await channel.consume(replyQueue, msg => {
// console.log(msg)
console.log("routingKey:" + msg.content.toString());
//11.开启消息通知
channel.ack(msg);
}, { noAck: false })
}
product()
消费者:
const amqp = require('amqplib')
async function consumer2() {
//1.创建连接对象
const connection = await amqp.connect("amqp://localhost:5672");
//2.获取通道对象
const channel = await connection.createChannel();
//3.声明交换机参数
const exchangeName = "exchange_rpc1";
//4.声明队列参数
const queue = "rpc_1";
//5.声明路由key参数
const routingKey = "routingKey_rpc1"
//6.关联交换机,设置交换机类型,并将消息持久化
await channel.assertExchange(exchangeName, 'direct', { durable: true });//消息持久化
//7.关联消息队列
await channel.assertQueue(queue,{autoDelete:true, durable: false});
//8.绑定关系(队列,交换机,路由键)
await channel.bindQueue(queue, exchangeName, routingKey)
//9.设置公平调度
await channel.prefetch(1, false)
//10.消费队列消息
await channel.consume(queue, msg => {
console.log(msg)
console.log("routingKey:" + msg.content.toString());
channel.assertQueue(msg.properties.replyTo);
channel.publish(exchangeName, msg.properties.replyTo, Buffer.from("消息反馈"),
{ contentType: "application/json", deliveryMode: 2, correlationId: msg.properties.correlationId })
//11.开启消息通知
channel.ack(msg);
}, { noAck: false });//12.开启消息应答机制
console.log("消费端启动成功")
}
consumer2()
栗子:1.用户A向用户B发送消息
实现:
1.用户A根据(用户A ID + 后缀)创建消息队列与routingKey,用户B根据(用户B ID + 后缀)创建消息队列与routingKey
2.用户A获取到用户B的消息队列名,routingKey,发送消息时附带自己的消息队列与routingKey。
3.用户B请求AMQP消息队列拿出用户A消息,并根据用户附带在消息中的replyTo,correlationId 向用户A的消息队列中发送消息(反馈动作)
4. await channel.assertQueue(queue,{autoDelete:true, durable: false}); 关联消息队列时设置{autoDelete:true, durable: false}。意思是消息队列里面消息为空时,自动删除队列。
© Copyright 2014 - 2024 柏港建站平台 ejk5.com. 渝ICP备16000791号-4