RabbitMQ高级特性


一、消息的投递可靠性

在使用RabbitMQ时,应杜绝消息任何消息的丢失或投递失败的场景。RabiitMQ提供2种方式控制消息的投递可靠性模式:

  1. confirm 确认模式
  2. return 退回模式

RabbitMQ的整个消息的投递路径:
producer –> rabbitmq broker –> exchange –> queue –> consumer

  1. 消息从producer –> exchange 会返回一个 confirmcallback
  2. 消息从 exange –> queue 会返回一个 returncallback

可利用者两个callback来控制消息投递的可靠性。

这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

1、确认模式(confirmcallback):

通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。

实现步骤

  1. 确认模式开启:ConnectionFactory中开启publisher-confirms=”true”

    <!-- 定义rabbitmq connectionFactory -->
     <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                port="${rabbitmq.port}"
                                username="${rabbitmq.username}"
                                password="${rabbitmq.password}"
                                virtual-host="${rabbitmq.virtual-host}"
                                publisher-confirms="true"
                                publisher-returns="true"
     />
  2. 在rabbitTemplate定义ConfirmCallBack回调函数

    @Test
     public void testConfirm() {
    
         //2. 定义回调
         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
             /**
              *
              * @param correlationData 相关配置信息
              * @param ack   exchange交换机 是否成功收到了消息。true 成功,false代表失败
              * @param cause 失败原因
              */
             @Override
             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                 System.out.println("confirm方法被执行了....");
    
                 if (ack) {
                     //接收成功
                     System.out.println("接收成功消息" + cause);
                 } else {
                     //接收失败
                     System.out.println("接收失败消息" + cause);
                     //做一些处理,让消息再次发送。
                 }
             }
         });
    
         //3. 发送消息
         rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm....");
     }

2、退回模式(returncallback):

通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)。

实现步骤

  1. 开启回退模式:publisher-returns=”true”
    同确认模式的1

  2. 设置ReturnCallBack

  3. 设置Exchange处理消息的模式:

    1. 如果消息没有路由到Queue,则丢弃消息(默认)
    2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack
 @Test
    public void testReturn() {

        //3. 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        //2.设置ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");

                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                //处理
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
    }

在这里插入图片描述

二、consumer Ack 消费者确认

ack指acknowledge,确认。

确认模式

Acknowledge = “NONE”:不确认

Acknowledge = “MANUAL”:手动确认

Acknowledge = “AUTO” :自动确认

实现步骤

  1. 设置手动签收。acknowledge=”manual”
    <!--定义监听器容器-->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
     <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
    </rabbit:listener-container>
    
2. 让监听器类实现ChannelAwareMessageListener接口
3. 如果消息成功处理,则调用channel的 basicAck()签收
4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer

```java
@Component
public class AckListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
            //channel.basicReject(deliveryTag,true);
        }
    }
}

三、消费端限流

设置消费端一次最多拉取多少条消息,当消息确认后才能继续拉取。

Consumer 限流机制

  1. 确保ack机制为手动确认。acknowledge=”manual”
  2. listener-container配置属性

perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。

  <!--定义监听器容器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" >
    <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
@Component
public class QosListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));

        //2. 处理业务逻辑

        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }
}

四、TTL(过期时间)

TTL全称time to live,过期时间/存活时间。

当消息到过期时间还没有被消费时,会自动删除。

  1. 队列统一过期

  2. 消息单独过期

如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。

队列过期后,会将队列所有消息全部移除。(常用)

消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)

1、队列统一过期时间

 <!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
    <!--设置queue的参数-->
        <rabbit:queue-arguments>
            <!--x-message-ttl指队列的过期时间-->
            <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
    </rabbit:queue-arguments>

</rabbit:queue>

<!--绑定交换机-->    
<rabbit:topic-exchange name="test_exchange_ttl" >
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
        </rabbit:bindings>
</rabbit:topic-exchange>    
for (int i = 0; i < 10; i++) {
      // 发送消息
      rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....");
}

在这里插入图片描述

2、消息单独过期

// 消息后处理对象,设置一些消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        //1.设置message的信息
        message.getMessageProperties().setExpiration("5000");//消息的过期时间
        //2.返回该消息
        return message;
    }
};


//消息单独过期
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor);

五、DLX(死信队列)

DLX全称Dead Letter Exchange,之所以叫死信队列,是因为其他的MQ产品没有交换机。

当消息成为死信后,可以被重新发送到另一个交换机,这和个交换机就是DLX,死信交换机。

消息成为死信的方式:

  1. 队列消息长度达到限制
  2. 消费者拒收消息,basicNack/basicReject ,并且把不把消息重新放回原目标队列 requeue = false
  3. 原队列存在消息过期设置,消息到达超时间未被消费。

实现步骤

死信队列:

  1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
  2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
  3. 正常队列绑定死信交换机
    设置两个参数:
     x-dead-letter-exchange:死信交换机名称
     x-dead-letter-routing-key:发送给死信交换机的routingkey
 <!--
        1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
    -->

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--3. 正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--3.1 x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />

            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />

            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <!--
       2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
   -->

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

测试

 /**
     * 发送测试死信消息:
     *  1. 过期时间
     *  2. 长度限制
     *  3. 消息拒收
     */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

        //2. 测试长度限制后,消息死信
       for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        }

        //3. 测试消息拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");

    }

在这里插入图片描述

六、延迟队列(重点)

延迟队列:即消息进入队列后不会立即被删除,只有到达指定的时间,之后才会被消费。

在RabbitMQ中没有提供延迟队列的功能,但是可以使用 TTL(过期时间)+ DLX(死信队列)来实现。

案例:客户下单,30分钟未支付,取消订单,回滚库存。
在这里插入图片描述

在这里插入图片描述

实现步骤

  1. 定义正常交换机(order_exchange)和队列(order_queue)
  2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
  3. 绑定,设置正常队列过期时间为30分钟(为了便于查看,时间改为10s)
 <!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
        <!-- 3. 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />

        </rabbit:queue-arguments>

    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--  2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

发送消息

@Test
    public  void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
        //2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }
    }

消费方

@Component
public class OrderListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));

            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }
}

七、日志

默认日志路径

/var/log/rabbitmq/rabbitmq@主机名.log

日志包含Rabbitmq版本号、Erlang版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ的配置文件地址、内存限制、默认账户guest的创建、权限的配置

八、RabbitMQ相关命令

在这里插入图片描述

九、消息的追踪

在任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是使用因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。

在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息。

注意:打开 trace 会影响消息写入功能,适当打开后请关闭。
rabbitmqctl trace_on:开启Firehose命令

rabbitmqctl trace_off:关闭Firehose命令

rabbitmq_tracing和Firehose在实现上如出一辙,只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装,更容易使用和管理。

启用插件:rabbitmq-plugins enable rabbitmq_tracing

十、消息的可靠性保障

在这里插入图片描述

十一、消息的幂等性保障

幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

在这里插入图片描述


  目录