一、消息的投递可靠性
在使用RabbitMQ时,应杜绝消息任何消息的丢失或投递失败的场景。RabiitMQ提供2种方式控制消息的投递可靠性模式:
- confirm 确认模式
- return 退回模式
RabbitMQ的整个消息的投递路径:
producer –> rabbitmq broker –> exchange –> queue –> consumer
- 消息从producer –> exchange 会返回一个 confirmcallback
- 消息从 exange –> queue 会返回一个 returncallback
可利用者两个callback来控制消息投递的可靠性。
这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
1、确认模式(confirmcallback):
通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
实现步骤:
确认模式开启:ConnectionFactory中开启publisher-confirms=”true”
<!-- 定义rabbitmq connectionFactory --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-confirms="true" publisher-returns="true" />
在rabbitTemplate定义ConfirmCallBack回调函数
@Test public void testConfirm() { //2. 定义回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 相关配置信息 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm方法被执行了...."); if (ack) { //接收成功 System.out.println("接收成功消息" + cause); } else { //接收失败 System.out.println("接收失败消息" + cause); //做一些处理,让消息再次发送。 } } }); //3. 发送消息 rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm...."); }
2、退回模式(returncallback):
通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)。
实现步骤:
开启回退模式:publisher-returns=”true”
同确认模式的1设置ReturnCallBack
设置Exchange处理消息的模式:
- 如果消息没有路由到Queue,则丢弃消息(默认)
- 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
@Test
public void testReturn() {
//3. 设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2.设置ReturnCallBack
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 消息对象
* @param replyCode 错误码
* @param replyText 错误信息
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 执行了....");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
//处理
}
});
//3. 发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
二、consumer Ack 消费者确认
ack指acknowledge,确认。
确认模式
Acknowledge = “NONE”:不确认
Acknowledge = “MANUAL”:手动确认
Acknowledge = “AUTO” :自动确认
实现步骤
- 设置手动签收。acknowledge=”manual”
<!--定义监听器容器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> </rabbit:listener-container>
2. 让监听器类实现ChannelAwareMessageListener接口
3. 如果消息成功处理,则调用channel的 basicAck()签收
4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
```java
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3/0;//出现错误
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
//channel.basicReject(deliveryTag,true);
}
}
}
三、消费端限流
设置消费端一次最多拉取多少条消息,当消息确认后才能继续拉取。
Consumer 限流机制
- 确保ack机制为手动确认。acknowledge=”manual”
- listener-container配置属性
perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
<!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
四、TTL(过期时间)
TTL全称time to live,过期时间/存活时间。
当消息到过期时间还没有被消费时,会自动删除。
队列统一过期
消息单独过期
如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
队列过期后,会将队列所有消息全部移除。(常用)
消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
1、队列统一过期时间
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--设置queue的参数-->
<rabbit:queue-arguments>
<!--x-message-ttl指队列的过期时间-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--绑定交换机-->
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
for (int i = 0; i < 10; i++) {
// 发送消息
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}
2、消息单独过期
// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1.设置message的信息
message.getMessageProperties().setExpiration("5000");//消息的过期时间
//2.返回该消息
return message;
}
};
//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);
五、DLX(死信队列)
DLX全称Dead Letter Exchange,之所以叫死信队列,是因为其他的MQ产品没有交换机。
当消息成为死信后,可以被重新发送到另一个交换机,这和个交换机就是DLX,死信交换机。
消息成为死信的方式:
- 队列消息长度达到限制
- 消费者拒收消息,basicNack/basicReject ,并且把不把消息重新放回原目标队列 requeue = false
- 原队列存在消息过期设置,消息到达超时间未被消费。
实现步骤
死信队列:
- 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
- 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
- 正常队列绑定死信交换机
设置两个参数: x-dead-letter-exchange:死信交换机名称 x-dead-letter-routing-key:发送给死信交换机的routingkey
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
测试
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
//2. 测试长度限制后,消息死信
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
}
//3. 测试消息拒收
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
}
六、延迟队列(重点)
延迟队列:即消息进入队列后不会立即被删除,只有到达指定的时间,之后才会被消费。
在RabbitMQ中没有提供延迟队列的功能,但是可以使用 TTL(过期时间)+ DLX(死信队列)来实现。
案例:客户下单,30分钟未支付,取消订单,回滚库存。
实现步骤
- 定义正常交换机(order_exchange)和队列(order_queue)
- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
- 绑定,设置正常队列过期时间为30分钟(为了便于查看,时间改为10s)
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
发送消息
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
//2.打印倒计时10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
消费方
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出现异常,拒绝接受");
//4.拒绝签收,不重回队列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
七、日志
默认日志路径
/var/log/rabbitmq/rabbitmq@主机名.log
日志包含Rabbitmq版本号、Erlang版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ的配置文件地址、内存限制、默认账户guest的创建、权限的配置
八、RabbitMQ相关命令
九、消息的追踪
在任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是使用因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。
注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令
rabbitmqctl trace_off:关闭Firehose命令
rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。
启用插件:rabbitmq-plugins enable rabbitmq_tracing
十、消息的可靠性保障
十一、消息的幂等性保障
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。