一、模式总览
生产者工程:
application.yml文件配置RabbitMQ相关信息;
在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
application.yml文件配置RabbitMQ相关信息
创建消息处理类,用于接收队列中的消息并进行处理
模式总结
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
人话:队列和消费者一对一
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
人话:相当于多线程(消费者)处理一批数据(队列)
3、广播模式 Fanout
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到所有绑定的队列
人话:一队列绑定多个消费者,一有消息,所有消费者都要处理
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
人话:生产者指定(通过routing key)发送到那个队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
人话:生产者指定(通过routing key)发送到那个队列,但是可以用通配符
二、公共配置
(1)依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)配置文件
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: / #虚拟 host
username: guest
password: guest
# ----------------- 下面为可选项 --------------------
#开启消息确认模式
publisher-returns: true
publisher-confirms: true
template:
# 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
mandatory: true
listener:
simple:
acknowledge-mode: manual #手动ACK
default-requeue-rejected: false #个字段一定要设置成 false 不然无法消费的数据不会进入死信队列的
concurrency: 1 #同一个队列启动几个消费者
max-concurrency: 1 #启动消费者最大数量
prefetch: 1 #限制每次发送一条数据
retry:
enabled: true #是否支持重试
1、HelloWorld模式
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
// helloworld模式
@Test
void testHello(){
// 参数:队列名,消息内容(可以是对象)
rabbitTemplate.convertAndSend("hello","hello world");
// 或
Map<String,Object> map = new HashMap<>();
map.put("s1","hello world");
rabbitTemplate.convertAndSend("hello",map);
}
消费者
@Configuration
public class RabbitSimpleConfig {
@Bean
public Queue simpleQueue(){
/*
* 可用以下形式:
* new Queue("simpleQueue") - 持久,非排他,非自动删除
* new Queue("simpleQueue",false,false,false,null)
*/
return new Queue("hello");
}
}
@Component
public class MyListener {
/**
* 监听某个队列的消息
* @param message 接收到的消息,参数类型必须与发送的对象一致
* @RabbitListenerd的queues为队列名
*/
@RabbitListener(queues = "hello")
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
// 接收map参数
@RabbitListener(queues = "hello")
public void myListener1(Map<String,Object> message){
System.out.println("消费者接收到的消息为:" + message);
}
}
也可以采用以下的写法
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class MyListener {
@RabbitHandler
public void receive1(String message){
System.out.println("message = " + message);
}
}
输出
消费者接收到的消息为:hello world
2、WorkQueue模式
一条消息仅被一个消费者消费
说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要外配置
基本同上,写多个消费者即可
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello work!");
}
}
消费者
配置类同上
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveWork1(String message){
System.out.println("work message1 = " + message);
}
// concurrency:并发数量,为这个消费者开启 10 个线程
@RabbitListener(queuesToDeclare = @Queue("work"),concurrency = "10")
public void receiveWork2(String message){
System.out.println("work message2 = " + message + "-->" + Thread.currentThread().getName());
}
3、Fanout广播模式
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息的能力。
Fanout模式(广播模式)
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:
1)临时队列方式
生产者
注意这里发送消息时不需要 routingkey
,指定 exchange
即可,routingkey
可以直接传一个 null
。
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() throws InterruptedException {
//convertAndSend(String exchange, String routingKey, Object object)
// 参数为:交换机名,routingKey,消息
rabbitTemplate.convertAndSend("logs","","这是日志广播");
}
消费者
@Component
public class FanoutCustomer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name="logs",type = "fanout")
))
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name="logs",type = "fanout") //绑定交换机类型
))
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
输出
message1 = 这是日志广播
message2 = 这是日志广播
2)配置类定义方式
详细说明见 Routing 路由模式的配置类定义方式
@Configuration
public class RabbitFanoutConfig {
public final static String FANOUTNAME = "sang-fanout";
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUTNAME, true, false);
}
@Bean
Queue queueOne() {
return new Queue("queue-one");
}
@Bean
Queue queueTwo() {
return new Queue("queue-two");
}
@Bean
Binding bindingOne() {
return BindingBuilder.bind(queueOne()).to(fanoutExchange());
}
@Bean
Binding bindingTwo() {
return BindingBuilder.bind(queueTwo()).to(fanoutExchange());
}
}
在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。接下来创建两个消费者,如下:
@Component
public class FanoutReceiver {
@RabbitListener(queues = "queue-one")
public void handler1(String message) {
System.out.println("FanoutReceiver:handler1:" + message);
}
@RabbitListener(queues = "queue-two")
public void handler2(String message) {
System.out.println("FanoutReceiver:handler2:" + message);
}
}
4、Routing路由模式
也被成为 Direct (直连交换机模式)
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
总结:Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
1)临时队列方式
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirect(){
//convertAndSend(String exchange, String routingKey, Object object)
// 参数为:交换机名,routingKey,消息
rabbitTemplate.convertAndSend("MyDirects","error","error 的日志信息");
rabbitTemplate.convertAndSend("MyDirects","info","info 的日志信息");
}
消费者
@Component
public class DirectCustomer {
@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(), //创建临时队列
key={"info","error"}, //绑定 routingKey,可绑定多个
exchange = @Exchange(type = "direct",name="MyDirects") //绑定交换机
)})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(),
key={"error"},
exchange = @Exchange(type = "direct",name="MyDirects")
)})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
输出
message1 = error 的日志信息
message2 = error 的日志信息
message1 = info 的日志信息
2)配置类定义方式
就是将 @RabbitListener
的内容剥离到配置类中
(1)创建队列 Bean
方式一
new Queue(ITEM_QUEUE,true,true,false);
三个参数说明
durable
:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列,当前连接有效exclusive
:是否开启排他性,默认也是false,只能被创建的连接使用,而且当连接关闭后队列即被删除。此参数优先级高于durableautoDelete
:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
@Configuration
public class RabbitMQConfig {
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明队列
@Bean("itemQueue")
public Queue itemQueue(){
return new Queue(ITEM_QUEUE,true,true,false);
}
}
方式二:推荐
采用链式编程,相关方法的说明同上
durable(队列名)
:持久化nonDurable(队列名)
:不进行持久化exclusive()
:开启排他性autoDelete()
:自动删除build()
:创建队列
@Configuration
public class RabbitMQConfig {
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明队列
@Bean("itemQueue")
public Queue itemQueue(){
// QueueBuilder.nonDurable(ITEM_QUEUE).exclusive().autoDelete().build();
return QueueBuilder.durable(ITEM_QUEUE).build();
}
}
(2)创建交换机 Bean
创建交换机
直连交换机:ExchangeBuilder.directExchange(交换机名);
交换机配置
durable()
:交换机是否持久化,指重启后该交换机是否存在autoDelete()
:如果该交换机没有队列的话,是否自动删除
ExchangeBuilder.directExchange("交换机名").durable(true).autoDelete().build();
完整配置
//交换机名称
public static final String DIRECT_EXCHANGE = "direct_exchange";
//声明交换机, 交换机名自动将大写变成 _ : direct_exchange
@Bean("directExchange")
public Exchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
}
(3)队列交换机绑定
交换机根据消息的 routing_key ,决定将消息发送到那个队列
//绑定 将队列和交换机绑定, 并设置用于匹配键
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("directExchange") Exchange exchange){
return BindingBuilder
.bind(queue) // 设置队列
.to(exchange) // 设置交换机
.with("item.#");// 设置 routing_key
}
(4)生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() throws InterruptedException {
//convertAndSend(String exchange, String routingKey, Object object)
// 参数为:交换机名,routingKey,消息
rabbitTemplate.convertAndSend("item_queue","item","这是路由模式");
}
(5)消费者
@Component
public class FanoutCustomer {
@RabbitListener(queues = "item_queue")
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(queues = "item_queue")
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
输出
message1 = 这是路由模式
message2 = 这是路由模式
5、Topics通配符模式
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!(路由模式升级版)
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
图解:
- 红色Queue:绑定的是
usa.#
,因此凡是以usa.
开头的routing key
都会被匹配到 - 黄色Queue:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配
1)临时队列方式
生产者
@Autowired
private RabbitTemplate rabbitTemplate;
//topic
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
}
消费者
@Component
public class TopCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.*"}, //指定以user.开头的routing key
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.#"},
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
2)配置类定义方式
@Configuration
public class RabbitTopicConfig {
public final static String TOPICNAME = "sang-topic";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPICNAME, true, false);
}
@Bean
Queue xiaomi() {
return new Queue("xiaomi");
}
@Bean
Queue huawei() {
return new Queue("huawei");
}
@Bean
Queue phone() {
return new Queue("phone");
}
@Bean
Binding xiaomiBinding() {
return BindingBuilder.bind(xiaomi()).to(topicExchange())
.with("xiaomi.#");
}
@Bean
Binding huaweiBinding() {
return BindingBuilder.bind(huawei()).to(topicExchange())
.with("huawei.#");
}
@Bean
Binding phoneBinding() {
return BindingBuilder.bind(phone()).to(topicExchange())
.with("#.phone.#");
}
}
- 首先创建 TopicExchange,参数和前面的一致。然后创建三个 Queue,第一个 Queue 用来存储和 “xiaomi” 有关的消息,第二个 Queue 用来存储和 “huawei” 有关的消息,第三个 Queue 用来存储和 “phone” 有关的消息。
- 将三个 Queue 分别绑定到 TopicExchange 上,第一个 Binding 中的 “xiaomi.#” 表示消息的 routingkey 凡是以 “xiaomi” 开头的,都将被路由到名称为 “xiaomi” 的 Queue 上,第二个 Binding 中的 “huawei.#” 表示消息的 routingkey 凡是以 “huawei” 开头的,都将被路由到名称为 “huawei” 的 Queue 上,第三个 Binding 中的 “#.phone.#” 则表示消息的 routingkey 中凡是包含 “phone” 的,都将被路由到名称为 “phone” 的 Queue 上。
接下来针对三个 Queue 创建三个消费者,如下:
@Component
public class TopicReceiver {
@RabbitListener(queues = "phone")
public void handler1(String message) {
System.out.println("PhoneReceiver:" + message);
}
@RabbitListener(queues = "xiaomi")
public void handler2(String message) {
System.out.println("XiaoMiReceiver:"+message);
}
@RabbitListener(queues = "huawei")
public void handler3(String message) {
System.out.println("HuaWeiReceiver:"+message);
}
}
6、Header模式
HeadersExchange 是一种使用较少的路由策略,HeadersExchange 会根据消息的 Header 将消息路由到不同的 Queue 上,这种策略也和 routingkey无关,配置如下:
@Configuration
public class RabbitHeaderConfig {
public final static String HEADERNAME = "javaboy-header";
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(HEADERNAME, true, false);
}
@Bean
Queue queueName() {
return new Queue("name-queue");
}
@Bean
Queue queueAge() {
return new Queue("age-queue");
}
@Bean
Binding bindingName() {
Map<String, Object> map = new HashMap<>();
map.put("name", "sang");
return BindingBuilder.bind(queueName())
.to(headersExchange()).whereAny(map).match();
}
@Bean
Binding bindingAge() {
return BindingBuilder.bind(queueAge())
.to(headersExchange()).where("age").exists();
}
}
这里的配置大部分和前面介绍的一样,差别主要体现的 Binding 的配置上,第一个 bindingName 方法中,whereAny 表示消息的 Header 中只要有一个 Header 匹配上 map 中的 key/value,就把该消息路由到名为 “name-queue” 的 Queue 上,这里也可以使用 whereAll 方法,表示消息的所有 Header 都要匹配。whereAny 和 whereAll 实际上对应了一个名为 x-match 的属性。bindingAge 中的配置则表示只要消息的 Header 中包含 age,不管 age 的值是多少,都将消息路由到名为 “age-queue” 的 Queue 上。
接下来创建两个消息消费者:
@Component
public class HeaderReceiver {
@RabbitListener(queues = "name-queue")
public void handler1(byte[] msg) {
System.out.println("HeaderReceiver:name:"
+ new String(msg, 0, msg.length));
}
@RabbitListener(queues = "age-queue")
public void handler2(byte[] msg) {
System.out.println("HeaderReceiver:age:"
+ new String(msg, 0, msg.length));
}
}
注意这里的参数用 byte 数组接收。然后在单元测试中创建消息的发送方法,这里消息的发送也和 routingkey 无关,如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void headerTest() {
Message nameMsg = MessageBuilder
.withBody("hello header! name-queue".getBytes())
.setHeader("name", "sang").build();
Message ageMsg = MessageBuilder
.withBody("hello header! age-queue".getBytes())
.setHeader("age", "99").build();
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, ageMsg);
rabbitTemplate.send(RabbitHeaderConfig.HEADERNAME, null, nameMsg);
}
}
这里创建两条消息,两条消息具有不同的 header,不同 header 的消息将被发到不同的 Queue 中去。
三、API
1、发送消息
/**
* 将java对象转为amqp的message,并且以默认的路由key发送到默认的队列
*/
void convertAndSend(Object message) throws AmqpException;
/**
* 将java对象转为amqp的message,并且以指定的 routingKey 发送到默认的队列
*/
void convertAndSend(String routingKey, Object message);
/**
* 将java对象转为amqp的message,并且以指定的 routingKey 发送到指定的队列
*/
void convertAndSend(String exchange, String routingKey, Object message);
上面三个为基本的 api ,同时还有 3 个重载方法是上面的扩展(增加了 MessagePostProcessor
参数)
MessagePostProcessor
:在消息发送之前对其进行处理的处理器
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor);
MessagePostProcessor
一般可抽取为公共配置
// MessagePostProcessor 是接口无法直接创建,需要实现其中的方法
MessagePostProcessor messagePostProcessor = message -> {
// messageProperties 是用来配置消息的
MessageProperties messageProperties = message.getMessageProperties();
// 设置延迟时间,下面两句效果是一样的,需要安装插件
messageProperties.setHeader(MessageProperties.X_DELAY, 3000);
messageProperties.setDelay(3000);
// 设置超时时间
messageProperties.setExpiration("10000");
// 设置是否持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend("test_queue", "", order, messagePostProcessor);
发送对象注意要实现序列化接口 Serializable
指定唯一id
在前几参数的基础上还可以加一个参数 CorrelationData correlationData
,用于指定消息的唯一id
rabbitTemplate.convertAndSend("test","", "测试消息内容",
new CorrelationData(UUID.randomUUID().toString()));
2、接收消息
@RabbitListener官方介绍
将方法标记为指定 queues()
(或bindings()
)上的Rabbit消息侦听器的目标的注释。
containerFactory()
标识用于构建 RabbitMQ 监听器容器的 RabbitListenerContainerFactory
。如果没有设置,会默认将 beanName 为 rabbitListenerContainerFactory
的 bean 作为容器工厂,除非配置中已经提供了显式的默认值。
@RabbitListener
注解的处理是通过注册一个 RabbitListenerAnnotationBeanPostProcessor
来完成的。这可以手动完成,或者更方便地通过 <rabbit:annotation-driven/>
元素或 @EnableRabbit
也就是说,允许带该注解的方法具有与MessageMapping提供的类似的灵活参数,如下:
com.rabbitmq.client.Channel
访问Channelorg.springframework.amqp.core.Message
来访问原始的AMQP消息@payload
注释的方法参数,即消息体内容@header
注释的方法参数来提取特定的头值,包括由AmqpHeaders定义的标准AMQP头@headers
注释的参数必须也可分配给java.util.Map
以获得对所有头文件的访问。MessageHeaders
参数,用于访问所有标头。MessageHeaderAccessor
或AmqpMessageHeaderAccessor
,方便地访问所有方法参数。
// 可以监听多个队列
// Message 原生消息信息。消息头+消息体
@RabbitListener(queues = {QUEUE_NAME, QUEUE_NAME_2})
public void test1(Message message){
// 获取消息体
byte[] body = message.getBody();
// 获取消息的参数和头信息
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
Object header = messageProperties.getHeader("__TypeId__");
System.out.println(Arrays.toString(body));
System.out.println(headers);
System.out.println(header);
}
@RabbitListener(queues = {QUEUE_NAME})
public void test2(@Payload Order order,
@Header("__TypeId__") String type,
@Headers Map<String, Object> headers,
Channel channel){
System.out.println(order);
System.out.println(type);
System.out.println(headers);
System.out.println(channel);
}
带注释的方法可以有一个非空返回类型。当它们这样做时,方法调用的结果将作为应答发送到传入消息的ReplyTo头(应答头)
定义的队列。当没有设置此值时,可以通过向方法声明添加 @SendTo
来提供默认队列。
当提供 bindings()
时,应用程序上下文包含 org.springframework.amqp.rabbit.core
。RabbitAdmin、队列、交换器和绑定会自动声明。
在方法级别定义时,将为每个方法创建侦听器容器。messagelistener
是一个org.springframework.amqp.rabbit.listener.adapter
。MessagingMessageListenerAdapter
,配置为org.springframework.amqp.rabbit.listener.MethodRabbitListenerEndpoint
。
当在类级别定义时,单个消息侦听器容器用于服务所有用 @RabbitHandler
注释的方法,即将整个类作为一个监听器, @RabbitHandler
修饰具体执行业务的方法。此类带注释方法的方法签名不能造成任何歧义,以至于单个方法可以针对特定的入站消息进行解析,意思就是可以根据消息体的参数类型自动选择使用哪个方法。
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter配置了一个org.springframework.amqp.rabbit.listener.MultiMethodRabbitListenerEndpoint。
演示@RabbitHandler
/**
* 消费者
*/
@Component
@RabbitListener(queues = {"test"})
public class MyListener2 {
@RabbitHandler
public void handleString(String content){
System.out.println("1");
System.out.println(content);
}
@RabbitHandler
public void handleOrder(Order order){
System.out.println("2");
System.out.println(order);
}
@RabbitHandler
public void handleUser(User user){
System.out.println("3");
System.out.println(user);
}
}
生产者
/**
* 生产者
*/
rabbitTemplate.convertAndSend("test","", "测试消息内容");
Order order = new Order(1L, "西瓜", 100, new BigDecimal("100"));
rabbitTemplate.convertAndSend("test","", order);
User user = new User("张三", 11);
rabbitTemplate.convertAndSend("test","", user);
结果
1
测试消息内容
2
Order(id=1, goodName=西瓜, goodNum=100, price=100)
3
User(name=张三, age=11)
3、修改对象传输格式
默认 rabbitTemplate
发送对象时通过序列化后发送的,可通过下面的配置修改为通过 json 格式传输
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
四、过期时间
1、默认情况
首先我们来看看默认情况。
默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
2、TTL
TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信
,关于死信
以及死信队列
,松哥后面再和大家介绍。
TTL 的设置有两种不同的方式:
- 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
- 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
那如果两个都设置了,以时间短的为准。
当我们设置了消息有效期后,消息过期了就会被从队列中删除了(进入到死信队列),但是两种方式对应的删除时机有一些差异:
- 对于第一种方式,当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入 RabbitMQ 后是存在一个消息队列中,队列的头部是最早要过期的消息,所以 RabbitMQ 只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除。
- 对于第二种方式,当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。
3、单条消息过期
Message message = MessageBuilder.withBody("hello world".getBytes())
.setExpiration("10000")
.build();
rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_QUEUE_DEMO, message);
在创建 Message 对象的时候我们可以设置消息的过期时间,这里设置消息的过期时间为 10 秒。
4、队列消息过期
修改声明队列的 bean 即可
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}
5、特殊情况
还有一种特殊情况,就是将消息的过期时间 TTL 设置为 0,这表示如果消息不能立马消费则会被立即丢掉,这个特性可以部分替代 RabbitMQ3.0 以前支持的 immediate 参数,之所以所部分代替,是因为 immediate 参数在投递失败会有 basic.return 方法将消息体返回(这个功能可以利用死信队列来实现)。
五、死信队列
1、死信交换机
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
- 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
- 消息过期
- 队列达到最大长度,排在前面的消息会进入死信队列
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
死信队列:绑定了死信交换机的队列就是死信队列。
2、配置死信队列
这里的死信队列采用 routing 路由模式为例,其他模式也可以采用
首先我们来创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起:
public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
public static final String DLX_QUEUE_NAME = "dlx_queue_name";
public static final String DLX_ROUTING_KEY = "dlx_routing_key";
/**
* 配置死信交换机
*
* @return
*/
@Bean
DirectExchange dlxDirectExchange() {
return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
}
/**
* 配置死信队列
* @return
*/
@Bean
Queue dlxQueue() {
return new Queue(DLX_QUEUE_NAME);
}
/**
* 绑定死信队列和死信交换机
* @return
*/
@Bean
Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxDirectExchange())
.with(DLX_ROUTING_KEY);
}
这其实跟普通的交换机,普通的消息队列没啥两样。
接下来为普通的消息队列配置死信交换机,如下:
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 0);
//设置队列对应的死信交换机
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(JAVABOY_QUEUE_DEMO, true, false, false, args);
}
就两个参数:
- x-dead-letter-exchange:配置死信交换机。
- x-dead-letter-routing-key:配置死信
routing_key
。
这就配置好了。
将来发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
死信消息队列的消费和普通消息队列的消费并无二致:
@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
public void dlxHandle(String msg) {
System.out.println("dlx msg = " + msg);
}
六、延迟队列
定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如:
- 在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
- 我买了一个智能砂锅,可以用来煮粥,上班前把素材都放到锅里,然后设置几点几分开始煮粥,这样下班后就可以喝到香喷喷的粥了,那么这个煮粥的指令也可以看成是一个延迟任务,放到一个延迟队列中,时间到了再执行。
- 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
- 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
- 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。
整体上来说,在 RabbitMQ 上实现定时任务有两种方式:
- 利用 RabbitMQ 自带的消息过期和死信队列机制,实现定时任务。
- 使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
1、插件实现
(1)插件安装
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
选择适合自己的版本,我这里选择最新的 3.9.0 版。
下载完成后在命令行执行如下命令将下载文件拷贝到 Docker 容器中去(/plugins 目录不可修改):
docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez some-rabbit:/plugins
这里第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置。
接下来再执行如下命令进入到 RabbitMQ 容器中:
docker exec -it some-rabbit /bin/bash
进入到容器之后,执行如下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用成功之后,还可以通过如下命令查看所有安装的插件,看看是否有我们刚刚安装过的插件,如下:
rabbitmq-plugins list
OK,配置完成之后,接下来我们执行 exit
命令退出 RabbitMQ 容器。然后开始编码。
(2)创建队列交换机
@Configuration
public class RabbitConfig {
public static final String QUEUE_NAME = "javaboy_delay_queue";
public static final String EXCHANGE_NAME = "javaboy_delay_exchange";
// 交换机类型为延迟队列
public static final String EXCHANGE_TYPE = "x-delayed-message";
@Bean
Queue queue() {
return new Queue(QUEUE_NAME, true, false, false);
}
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
// 指定交换机的模式为 路由模式(其他模式也可以)
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(customExchange()).with(QUEUE_NAME).noargs();
}
}
这里主要是交换机的定义有所不同
这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:
- 交换机名称。
- 交换机类型,这个地方是固定的。
- 交换机是否持久化。
- 如果没有队列绑定到交换机,交换机是否删除。
- 其他参数。
最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
(3)消费者
接下来我们再创建一个消息消费者:(和其他的相同,没什么特别的)
@Component
public class MsgReceiver {
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg) {
logger.info("handleMsg,{}",msg);
}
}
打印一下消息内容即可。
(4)生产者
接下来再写一个单元测试方法来发送消息:
// MessagePostProcessor 是接口无法直接创建,需要实现其中的方法
MessagePostProcessor messagePostProcessor = message -> {
// messageProperties 是用来配置消息的
MessageProperties messageProperties = message.getMessageProperties();
// 设置延迟时间,需要安装插件,以下两种方法是一样的
messageProperties.setHeader(MessageProperties.X_DELAY, 30000);
messageProperties.setDelay(30000);
return message;
};
rabbitTemplate.convertAndSend("test", "", order, messagePostProcessor);
在消息头中设置消息的延迟时间。
(5)管理页面创建
通过管理平台创建队列时,必须添加参数 x-delayed-type
,值为所需的交换机类型,否则会出现以下错误
Invalid argument, 'x-delayed-type' must be an existing exchange type"
(6)注意事项
基于RabbitMQ插件的方式可以实现延迟消息,并且不存在消息阻塞的问题,但是因为是基于插件的,而这个插件支持的最大延长时间是 (2^32)-1
毫秒,大约49天,超过这个时间就会被立即消费。但是他基于RabbitMQ实现,所以在可用性、性能方便都很不错
2、DLX+TTL实现
DLX(死信交换机)+TTL(消息超时时间):我们可以把死信队列就当成延迟队列。
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key
,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
重点:遵循一个服务一个交换机的原则
@Configuration
public class MyMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000);
Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
return orderDelayQueue;
}
@Bean
public Queue orderReleaseQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
@Bean
//监听秒杀业务队列
public Queue orderSeckillOrderQueue(){
return new Queue("order.seckill.order.queue",true,false,false);
}
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateBingding() {
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}
@Bean
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
}
@Bean
public Binding orderReleaseOtherBingding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);
}
@Bean
//秒杀业务绑定关系
public Binding orderSeckillOrderQueueBinding(){
return new Binding("order.seckill.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.seckill.order",null);
}
}
七、优先级队列
八、惰性队列
一般情况下,消息存放在内存中
而惰性队列,消息存放在磁盘中
九、消息可靠性
1、消息的发送可靠性
以下内容我主要描述如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。
(1)消息发送机制
RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。所以要确保消息发送的可靠性,主要从两方面去确认:
- 消息成功到达 Exchange
- 消息成功到达 Queue
如果能确认这两步,那么我们就可以认为消息发送成功了。
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
- 确认消息到达 Exchange。
- 确认消息到达 Queue。
- 开启定时任务,定时投递那些发送失败的消息。
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
- 开启事务机制(不推荐)
- 发送方确认机制(重要)
这是两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报错
(2)开启事务
首先需要先提供一个事务管理器,如下:
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
接下来,在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
@Service
public class MsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Transactional
public void send() {
//设置通信信道为事务模式
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend(
RabbitConfig.JAVABOY_EXCHANGE_NAME,
RabbitConfig.JAVABOY_QUEUE_NAME,
"hello rabbitmq!".getBytes());
int i = 1 / 0;
}
}
这里注意两点:
- 发送消息的方法上添加
@Transactional
注解标记事务。 - 调用 setChannelTransacted 方法设置为 true 开启事务模式。
这就 OK 了。
在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
- 客户端发出请求,将信道设置为事务模式。
- 服务端给出回复,同意将信道设置为事务模式。
- 客户端发送消息。
- 客户端提交事务。
- 服务端给出响应,确认事务提交。
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式
(3)confirm(发布确认)机制
原理
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
实现
在 application.properties 中配置开启消息发送方确认机制
# 消息到达交换器的确认回调
spring.rabbitmq.publisher-confirm-type=correlated
# 下面两个都要配置才能使用ReturnCallback回调
# 配置消息到达队列的回调
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步发送优先回调我们的回调方法
# 开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息到RabbitTemplate.ReturnCallback中的returnedMessage方法
spring.rabbitmq.template.mandatory=true
第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:
none
:表示禁用发布确认模式,默认即此。correlated
:表示成功发布消息到交换器后会触发的回调方法。simple
:类似 correlated,并且支持waitForConfirms()
和waitForConfirmsOrDie()
方法的调用。
接下来我们要开启两个监听,具体配置如下:
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger logger =
LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
// 确定消息到达交换器后的回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("{}:消息成功到达交换器",correlationData.getId());
}else{
logger.error("{}:消息发送失败", correlationData.getId());
}
}
// 消息路由到队列失败时被调用
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
}
}
关于这个配置类,我说如下几点:
- 定义配置类,实现
RabbitTemplate.ConfirmCallback
和RabbitTemplate.ReturnCallback
两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。 - 定义
initRabbitTemplate
方法并添加@PostConstruct
注解,在该方法中为rabbitTemplate
分别配置这两个 Callback。
这就可以了。
2、失败重试
失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
(1)自带重试机制
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
从上往下配置含义依次是:
- 开启重试机制。
- 重试起始间隔时间。
- 最大重试次数。
- 最大重试间隔时间。
- 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
(2)业务重试
业务重试主要是针对消息没有到达交换器的情况。
如果消息没有成功到达交换器,根据我们第二小节的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
整体思路是这样:
1、首先创建一张表,用来记录发送到中间件上的消息,像下面这样:
每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
- status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
- tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
- count:表示消息重试次数。
2、在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
3、在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
4、另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
当然这种思路有两个弊端:
- 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
- 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。
此处可用Redis、ES替代mysql
3、消费可靠性
(1)两种消费思路
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
- 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
- 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
两种方式我都举个例子看下。
先来看推(push):
这种方式大家比较常见,就是通过 @RabbitListener
注解去标记消费者,如下:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(String msg) {
System.out.println("msg = " + msg);
}
}
当监听的队列中有消息时,就会触发该方法。
再来看拉(pull):
@Test
public void test01() throws UnsupportedEncodingException {
Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
这是消息两种不同的消费模式。
如果需要从消息队列中持续获得消息,就可以使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。
(2)确保消费成功
为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
- 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
- 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
我们来看一张图:
如上图所示,在 RabbitMQ 的 web 管理页面:
- Ready 表示待消费的消息数量。
- Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
这是我们可以从 UI 层面观察消息的消费情况确认情况。
当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
- 待消费的消息
- 已经投递给消费者,但是还没有被消费者确认的消息
换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。
(3)消息拒绝
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle(Channel channel, Message message) {
//获取消息编号
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//拒绝消息
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
}
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
- 获取消息编号 deliveryTag。
- 调用 basicReject 方法拒绝消息。
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
需要注意的是,basicReject 方法一次只能拒绝一条消息。
(4)自动确认
消息确认分为自动确认和手动确认,我们分别来看。
先来看看自动确认,在 Spring Boot 中,默认情况下,消息消费就是自动确认的。
我们来看如下一个消息消费方法:
@Component
public class ConsumerDemo {
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle2(String msg) {
System.out.println("msg = " + msg);
int i = 1 / 0;
}
}
通过 @Componet
注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener
注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
(5)手动确认
推模式手动确认
要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这个配置表示将消息的确认模式改为手动确认。
接下来我们来看下消费者中的代码:
@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//消息消费的代码写到这里
String s = new String(message.getBody());
System.out.println("s = " + s);
//消费完成后,手动 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//手动 nack
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
将消费者要做的事情放到一个 try..catch
代码块中。
如果消息正常消费成功,则执行 basicAck
完成确认。
如果消息消费失败,则执行 basicNack
方法,告诉 RabbitMQ 消息消费失败。
这里涉及到两个方法:
- basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:
- 第一个参数表示消息的 id;(id在该通道内自增)
- 第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
- basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:
- 第一个参数表示消息的 id;
- 第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;
- 第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。true:重新进入当前队列,false:进入死信队列
当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题
拉模式手动确认
拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:
public void receive2() {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
long deliveryTag = 0L;
try {
GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
deliveryTag = getResponse.getEnvelope().getDeliveryTag();
System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
(6)接口幂等性
消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。
幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。
基于redis
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
- id-0(正在执行业务)
- id-1(执行业务成功)
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。
乐观锁
十、消息可靠性代码实现
此处我们将用 ES 作为消息队列日志持久化数据库,将消息在各个节点成功或失败通过日志进行持久化,并可实现消息的重新发送
0、公共类
(1)es日志实体类
@Data
public class MqLogModel {
private String id;
/**
* 交换机类型
*/
private String type;
/**
* 交换机
*/
private String exchange;
/**
* 队列名
*/
private String queueKey;
/**
* 路由键
*/
private String routingKey;
/**
* 内容
*/
private String body;
/**
* 发送者服务器IP
*/
private String sender;
/**
* 发送时间
*/
private Date sendTime;
/**
* 发送状态,0-失败,1-成功
*/
private String sendStatus;
/**
* 发送异常信息
*/
private String sendExceptionMsg;
/**
* 消费者服务器IP
*/
private String consumer;
/**
* 消费时间
*/
private Date consumeTime;
/**
* 结束时间,即消费执行完的时间
*/
private Date endTime;
/**
* 消费状态,0-失败,1-成功
*/
private String consumeStatus;
/**
* 失败原因
*/
private String exceptionMsg;
/**
* 重试次数, 执行失败且大于0时生效,每重试一次加一
*/
private Integer retryCount;
/**
* 重发最新时间
*/
private Date retryTime;
}
(2)MqLogService
@Component
public class MqLogService {
public final static String SUCCESS_STATUS = "1";
public final static String ERROR_STATUS = "0";
public final static String MQ_LOG_INDEX = "mq_log_index";
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* Mq 消息发送前存储日志
*/
public MqLogModel preSend(String exchange, String routingKey, Message message, CorrelationData correlationData, MessageConverter messageConverter) {
Object data = messageConverter.fromMessage(message);
int retryCount = 0;
if (correlationData != null && correlationData.getId() != null){
String correlationDataId = correlationData.getId();
MqLogModel oldModel = getMqLogById(correlationDataId);
if (oldModel != null){
// 重发的消息
retryCount = oldModel.getRetryCount() + 1;
}
}
MqLogModel model = new MqLogModel();
model.setId(correlationData == null || correlationData.getId() == null ? UUID.randomUUID().toString() : correlationData.getId());
//model.setType();
model.setExchange(exchange);
model.setQueueKey(routingKey);
model.setRoutingKey(routingKey);
model.setBody(JSONUtil.toJsonStr(data));
model.setSender(getHost());
model.setSendTime(new Date());
model.setSendStatus(ERROR_STATUS);
model.setSendExceptionMsg("");
model.setConsumer("");
model.setConsumeStatus(ERROR_STATUS);
model.setConsumeTime(new Date());
model.setExceptionMsg("");
model.setEndTime(new Date());
model.setRetryCount(retryCount);
saveLog(model);
return model;
}
/**
* 保存日志
*/
public boolean saveLog(MqLogModel model){
IndexRequest request = new IndexRequest(MQ_LOG_INDEX);
request.id(model.getId());
request.timeout(TimeValue.timeValueSeconds(10));
request.source(JSONUtil.toJsonStr(model), XContentType.JSON);
IndexResponse response = null;
try {
response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
if (!Objects.equals(response.status(), RestStatus.OK) &&
!Objects.equals(response.status(), RestStatus.CREATED)){
throw new RuntimeException(response.toString());
}
return true;
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* 根据id查询
*/
public MqLogModel getMqLogById(String messageId) {
try {
GetRequest getRequest = new GetRequest().index(MQ_LOG_INDEX).id(messageId);
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
if (!getResponse.isExists()){
return null;
}
String modelJson = getResponse.getSourceAsString();
return JSONUtil.toBean(modelJson, MqLogModel.class);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* 根据id修改
*/
public boolean updateById(MqLogModel mqLogModel) {
UpdateRequest request = new UpdateRequest(MQ_LOG_INDEX, mqLogModel.getId());
request.timeout(TimeValue.timeValueSeconds(10));
request.doc(JSONUtil.toJsonStr(mqLogModel), XContentType.JSON);
try{
UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
return Objects.equals(response.status(), RestStatus.OK);
}catch (IOException e){
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
/**
* 获取ip地址
*/
public static String getHost(){
InetAddress address = null;
try {
address = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return address == null ? "" : address.getHostAddress();
}
}
(3)ExceptionUtil
public class ExceptionUtil {
/**
* 将 异常的栈帧 转化为字符串
*/
public static String getStackTrace(Exception e) {
if (e == null){
return "";
}
Throwable throwable = e.getCause();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
throwable.printStackTrace(pw);
return sw.getBuffer().toString();
}
}
1、消息发送的可靠性
下面是三种前置增强的实现方法,第一种写的较为完整,其他两种均提供一个思路
(1)自定义RabbitTemplate子类
/**
* 自定 RabbitTemplate 子类
* 1、通过 doSend 方法重写的方式,在消息发送前存储 es 日志数据
* 2、通过 ConfirmCallback 回调,在消息到达交换机时,修改 es 日志数据
* 3、通过 ReturnCallback 回调,在消息路由到队列失败时,修改 es 日志数据
*/
public class MyRabbitTemplate extends RabbitTemplate implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final static String SUCCESS_STATUS = "1";
private final static String ERROR_STATUS = "0";
private final MqLogService mqLogService;
public MyRabbitTemplate(ConnectionFactory connectionFactory,
MqLogService mqLogService) {
super(connectionFactory);
this.mqLogService = mqLogService;
}
/**
* 重写父类方法,在消息发送前存储 es 日志数据
*/
@Override
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, boolean mandatory, CorrelationData correlationData) {
MqLogModel model = null;
try{
System.out.println("preSend...");
model = mqLogService.preSend(exchangeArg, routingKeyArg, message, correlationData, getMessageConverter());
if (correlationData == null){
correlationData = new CorrelationData(model.getId());
}
message.getMessageProperties().setMessageId(model.getId());
super.doSend(channel, exchangeArg, routingKeyArg, message, mandatory, correlationData);
}catch (Exception e){
e.printStackTrace();
if (model != null){
// 发送消息失败
handleSendException(model.getId(), e);
}
}
}
/**
* 消息发送出现异常
*/
private void handleSendException(String messageId, Exception e) {
MqLogModel updateModel = new MqLogModel();
updateModel.setId(messageId);
updateModel.setSendStatus(ERROR_STATUS);
updateModel.setSendExceptionMsg(ExceptionUtil.getStackTrace(e));
boolean res = mqLogService.updateById(updateModel);
if (!res){
throw new RuntimeException("更新ES日志失败");
}
}
/**
* 到达交换机回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("到达交换机回调");
if (correlationData == null || StringUtils.isEmpty(correlationData.getId())){
return;
}
String messageId = correlationData.getId();
MqLogModel model = new MqLogModel();
model.setId(messageId);
if (ack){
// 成功到达交换机
model.setSendStatus(SUCCESS_STATUS);
}else {
model.setSendStatus(ERROR_STATUS);
model.setSendExceptionMsg(cause);
}
// 存在问题,可能是消费方先修改为消费成功,在执行此处的发送成功导致错误
// 所以发送状态和消费状态为两个字段,避免更新问题
mqLogService.updateById(model);
}
/**
* 消息路由到队列失败时被调用
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息路由到队列失败时被调用");
String messageId = message.getMessageProperties().getMessageId();
MqLogModel updateModel = new MqLogModel();
updateModel.setId(messageId);
updateModel.setSendStatus(ERROR_STATUS);
updateModel.setSendExceptionMsg(replyText);
mqLogService.updateById(updateModel);
}
}
配置类
@Configuration
public class RabbitBindingConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/* --------------- 以下代码均 copy 自源码 ---------------------*/
@Bean
//@ConditionalOnSingleCandidate(ConnectionFactory.class)
//@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers,
ConnectionFactory connectionFactory,
MqLogService mqLogService) {
PropertyMapper map = PropertyMapper.get();
// -----------------------------需要修改的地方--------------------------------------
// 只需将此处创建的对象改为自定义的类型即可
MyRabbitTemplate template = new MyRabbitTemplate(connectionFactory, mqLogService);
messageConverter.ifUnique(template::setMessageConverter);
template.setMandatory(determineMandatoryFlag(properties));
// 发布确认机制回调, 消息成功到达交换机后执行
template.setConfirmCallback(template);
// 退回模式回调, 消息从交换机路由到队列失败时执行
template.setReturnCallback(template);
// --------------------------------------------------------------------------------
RabbitProperties.Template templateProperties = properties.getTemplate();
if (templateProperties.getRetry().isEnabled()) {
template.setRetryTemplate(
new RetryTemplateFactory(retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))
.createRetryTemplate(templateProperties.getRetry(),
RabbitRetryTemplateCustomizer.Target.SENDER));
}
map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(templateProperties::getExchange).to(template::setExchange);
map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return (RabbitTemplate) template;
}
private boolean determineMandatoryFlag(RabbitProperties properties) {
Boolean mandatory = properties.getTemplate().getMandatory();
return (mandatory != null) ? mandatory : properties.isPublisherReturns();
}
static class RetryTemplateFactory {
private final List<RabbitRetryTemplateCustomizer> customizers;
RetryTemplateFactory(List<RabbitRetryTemplateCustomizer> customizers) {
this.customizers = customizers;
}
RetryTemplate createRetryTemplate(RabbitProperties.Retry properties, RabbitRetryTemplateCustomizer.Target target) {
PropertyMapper map = PropertyMapper.get();
RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy policy = new SimpleRetryPolicy();
map.from(properties::getMaxAttempts).to(policy::setMaxAttempts);
template.setRetryPolicy(policy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
map.from(properties::getInitialInterval).whenNonNull().as(Duration::toMillis)
.to(backOffPolicy::setInitialInterval);
map.from(properties::getMultiplier).to(backOffPolicy::setMultiplier);
map.from(properties::getMaxInterval).whenNonNull().as(Duration::toMillis).to(backOffPolicy::setMaxInterval);
template.setBackOffPolicy(backOffPolicy);
if (this.customizers != null) {
for (RabbitRetryTemplateCustomizer customizer : this.customizers) {
customizer.customize(target, template);
}
}
return template;
}
}
}
(2)beforePublishPostProcessors
// 发送消息前回调,在消息发送前执行
template.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("Before-" + new String(message.getBody()));
return message;
}
});
// 发送消息前回调,在上面BeforePublishPostProcessor 之后执行
// 可用于更新、替换或创建用于发布者确认的相关数据(CorrelationData)
template.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData) {
System.out.println("CorrelationDataPostProcessor" + correlationData);
if (correlationData == null){
correlationData = new CorrelationData(UUID.randomUUID().toString());
}
return correlationData;
}
});
// 接收到消息之后进行回调,用于拉模式的接收消息rabbitTemplate.receiveAndConvert()
// 对于 @RabbitListener 推模式的接收消息不生效
template.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("After-" + new String(message.getBody()));
return message;
}
});
(3)代理实现增强
@Configuration
public class RabbitTemplateBeanPostProcessor implements BeanPostProcessor {
@Autowired
private MqLogService mqLogService;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
Enhancer enhancer = new Enhancer();
// 用来设置父类型,(被代理类)
enhancer.setSuperclass(RabbitTemplate.class);
final RabbitTemplate template = (RabbitTemplate) bean;
MessageConverter messageConverter = template.getMessageConverter();
ConnectionFactory connectionFactory = template.getConnectionFactory();
enhancer.setCallback(new MethodInterceptor() {
// 方法拦截器
// 参数: 被代理对象,被代理的方法(当前执行的方法),方法的参数,代理方法
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
if ("send".equals(method.getName()) && objects.length == 4) {
String exchange = (String) objects[0];
String routingKey = (String) objects[1];
Message message = (Message) objects[2];
CorrelationData correlationData = (CorrelationData) objects[3];
// 往ES中存入日志,发送状态默认为失败
mqLogService.preSend(exchange, routingKey, message, correlationData, messageConverter);
// 正确调用方式,内部不是使用反射调用的,spring使用的方式,传入目标对象
methodProxy.invoke(template, objects);
// 修改日志发送状态为成功
// 返回方法执行的返回值
return null;
}
return methodProxy.invoke(template, objects);
}
});
// 创建增强对象的,其提供了很多不同参数的方法用来匹配被增强类的不同构造方法。
RabbitTemplate rabbitTemplate = (RabbitTemplate) enhancer.create(new Class[]{ConnectionFactory.class}, new Object[]{connectionFactory});
// 调用这个因为 RabbitTemplate 的空参构造会重新初始化 messageConverter
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
return bean;
}
}
2、自动ACK下消息可靠性
重点:使用这个的情况下,@RabbitListener
的方法必须有 Message 类型的参数
/**
* Bean后处理器
* 通过在 SimpleRabbitListenerContainerFactory 加入切面拦截器链
* 实现在 @RabbitListener 消费消息方法前后的增强
*
* 在消息处理完成后,Spring根据是否抛出异常确定消息,来确定消息是否 ack,这里也是这样的机制,来修改es消息日志的消费状态
*/
@Component
public class RabbitListenerPostProcess implements BeanPostProcessor {
@Autowired
private MqLogService mqLogService;
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof SimpleRabbitListenerContainerFactory) {
SimpleRabbitListenerContainerFactory factory = (SimpleRabbitListenerContainerFactory)bean;
factory.setAdviceChain(new MethodInterceptor(){
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
System.out.println("Listener Advice");
Object[] args = methodInvocation.getArguments();
Exception ex = null;
Object resultObj = null;
String messageId = null;
try{
for (Object arg : args) {
if (arg instanceof Message){
Message message = (Message) arg;
messageId = message.getMessageProperties().getMessageId();
}
}
resultObj = methodInvocation.proceed();
} catch (Exception e){
e.printStackTrace();
ex = e;
throw e;
} finally {
if (messageId != null){
// 只更新需要更新的字段
MqLogModel updateModel = new MqLogModel();
updateModel.setId(messageId);
updateModel.setConsumeStatus(ex == null ? MqLogService.SUCCESS_STATUS : MqLogService.ERROR_STATUS);
updateModel.setConsumeTime(new Date());
updateModel.setConsumer(MqLogService.getHost());
if (ex != null){
updateModel.setExceptionMsg(ExceptionUtil.getStackTrace(ex));
}
updateModel.setEndTime(new Date());
mqLogService.updateById(updateModel);
}
}
return resultObj;
}
});
bean = factory;
}
return bean;
}
}
3、测试接口
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息
@GetMapping("/send")
public String test(){
String uuid = UUID.randomUUID().toString();
HashMap<String, Object> map = new HashMap<>();
map.put("name", uuid);
map.put("date", new Date());
rabbitTemplate.convertAndSend("test", map);
return uuid;
}
// 消息重试
@GetMapping("/retry/{id}")
public String test4(@PathVariable("id") String id){
MqLogModel model = mqLogService.getMqLogById(id);
HashMap map = JSONUtil.toBean(model.getBody(), HashMap.class);
CorrelationData correlationData = new CorrelationData(model.getId());
rabbitTemplate.convertAndSend("test", map, correlationData);
return "200";
}
}