RabbitMQ概念和运维


一、消息中间件概述

1、什么是消息中间件

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

  • 为什么使用MQ

    在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高系统吞吐量

  • 开发中消息队列通常有如下应用场景:

    1、任务异步处理

    将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。

    2、应用程序解耦合

    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

    3、削峰填谷

    如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。
      消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。
      但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”

2、AMQP 和 JMS

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。

(1)AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

(2)JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

(3)AMQP 与 JMS 区别

  • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
  • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
  • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

3、RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

安装:https://blog.csdn.net/unique_perfect/article/details/109380996

RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式

官网对应模式介绍:https://www.rabbitmq.com/getstarted.htm
在这里插入图片描述

二、AMQP

1、基本概念

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。

RabbitMQ是AMQP协议的Erlang的实现。

概念 说明
连接Connection 一个网络连接,比如TCP/IP套接字连接。
会话Session 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
信道Channel 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
客户端Client AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
服务节点Broker 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。
端点 AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
消费者Consumer 一个从消息队列里请求消息的客户端程序。
生产者Producer 一个向交换机发布消息的客户端应用程序。

2、RabbitMQ运转流程

在入门案例中:

  • 生产者发送消息
    1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
    2. 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
    3. 将路由键(空字符串)与队列绑定起来;
    4. 发送消息至RabbitMQ Broker;
    5. 关闭信道;
    6. 关闭连接;
  • 消费者接收消息
    1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
    2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
    3. 等待Broker回应相关投递响应队列中的消息,消费者接收消息;
    4. 确认(ack,自动确认)接收到的消息;
    5. RabbitMQ从队列中删除相应已经被确认的消息;
    6. 关闭信道;
    7. 关闭连接;

3、生产者流转过程说明

  1. 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

4、消费者流转过程说明

  1. 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
  3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。

三、安装

1、docker下安装

创建挂载目录

mkdir /opt/rabbitmq/data

运行docker容器

docker run -d --hostname my-rabbit --name rabbitmq     \
    --restart=always \
    -e RABBITMQ_DEFAULT_VHOST=my_vhost  \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=admin \ 
    -v /opt/rabbitmq/data:/var/lib/rabbitmq\
    -p 15672:15672 -p 5672:5672 \
    rabbitmq:3-management
  • -d:表示在后台运行容器;
  • –restart=always:随docker启动自动运行
  • -p:将容器的端口 5672(应用访问端口)和 15672 (控制台Web端口号)映射到主机中;
  • -e:指定环境变量:
    • RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
    • RABBITMQ_DEFAULT_USER:默认的用户名;
    • RABBITMQ_DEFAULT_PASS:默认的用户密码;
  • -v :挂载数据卷
  • –hostname:指定主机名(RabbitMQ 的一个重要注意事项是它根据所谓的 节点名称 存储数据,默认为主机名);
  • –name rabbitmq:设置容器名称;
  • rabbitmq:容器使用的镜像名称;

15672:web管理页面端口

5672:通信端口

然后访问 ip:15672 即可看到web 管理页面

日志地址

进入容器

docker exec -it 容器id /bin/bash

查看日志

tail -f /var/log/rabbitmq/rabbit@主机名.log

2、原生安装

(0)环境

保证主机名不包含数字

查看主机名

hostname

修改主机名

vi /etc/hostname

(1)安装

1、RabbitMQ版本 和 Erlang 版本兼容性关系

https://www.rabbitmq.com/which-erlang.html

2、官方安装包下载地址

【erlang下载地址】:https://github.com/rabbitmq/erlang-rpm/releases

【socat下载地址】:http://www.rpmfind.net/linux/rpm2html/search.php?query=socat(x86-64)

【rabbitmq下载地址】:https://github.com/rabbitmq/rabbitmq-server/releases

https://share.weiyun.com/FDiX7OFy

3、安装包中说明,请下载对应的安装包

​ el6:CentOS 6.x 的下载

​ el7:CentOS 7.x 的下载

​ el8:CentOS 8.x 的下载

安装C++依赖环境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

准备安装包

erlang-23.3.4.8-1.el7.x86_64.rpm
rabbitmq-server-3.9.11-1.el7.noarch.rpm
socat-1.7.3.2-2.el7.x86_64.rpm(本文用yun install)

将下载的安装包上传到自己创建的 rabbitmq 文件夹下,cd到对应的文件夹,安装el

rpm -ivh erlang-23.3.4.8-1.el7.x86_64.rpm

检查Erlang是否安装成功

******************命令如下******************
# 安装成功,按两次ctrl+c退出命令模式
erl -v
yum install -y socat
# 安装命令
rpm -ivh rabbitmq-server-3.9.11-1.el7.noarch.rpm

# 检查是否安装成功命令
rpm -qa|grep rabbitmq

# 开启管理界面命令
rabbitmq-plugins enable rabbitmq_management

添加配置文件,解决只能localhost访问的问题

# 进入【/etc/rabbitmq】文件夹下
cd /etc/rabbitmq

# 编辑【rabbitmq.config】文件
vim rabbitmq.config

在rabbitmq.config文件中写入下面的命令,不要忘了后面的点

[{rabbit,[{loopback_users,[]}]}].

修改默认密码

rabbitmqctl change_password guest 123456

(2)启动命令

# 启动rabbitmq命令:
systemctl start rabbitmq-server

# 查看启动状态命令:
systemctl status rabbitmq-server

# 停止rabbitmq命令:
systemctl stop rabbitmq-server

# 重启rabbitmq命令:
systemctl restart rabbitmq-server

(3)卸载

1、卸载rabbitmq相关文件

  • 1.1、卸载前先停止rabbitmq服务
systemctl stop rabbitmq-server
  • 1.2、查看rabbitmq安装的相关列表
yum list | grep rabbitmq
  • 1.3、卸载rabbitmq-server.noarch
yum -y remove rabbitmq-server.noarch

2、卸载erlang

  • 2.1、查看erlang安装的相关列表
yum list | grep erlang
  • 2.2、卸载erlang已安装的相关内容
yum -y remove erlang-*

3、删除有关的所有文件

rm -rf /usr/lib64/erlang 
rm -rf /var/lib/rabbitmq
rm -rf /usr/local/erlang
rm -rf /usr/local/rabbitmq

四、web 管理页面

0、开启管理页面

再来说说如何开启 Web 管理页面,整体上来说,我们有两种方式开启 Web 管理页面:

  1. 安装 RabbitMQ 的时候,直接选择 rabbitmq:3-management 镜像,安装命令如下:
docker run -d --rm --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

这样安装好的 RabbitMQ 就可以直接使用 Web 管理页面了。

  1. 安装的时候就选择正常的普通镜像 rabbitmq:3,安装命令如下:
docker run -d --hostname my-rabbit --name some-rabbit2 -p 5673:5672 -p 25672:15672 rabbitmq:3

这个安装好之后,需要我们进入到容器中,然后手动开启 Web 管理插件,命令如下:

docker exec -it some-rabbit2 /bin/bash
rabbitmq-plugins enable rabbitmq_management

第一条命令是进入到容器中,第二条命令开启 Web 管理插件

首先一共有六个选项卡:

  1. Overview:这里可以概览 RabbitMQ 的整体情况,如果是集群,也可以查看集群中各个节点的情况。包括 RabbitMQ 的端口映射信息等,都可以在这个选项卡中查看。
  2. Connections:这个选项卡中是连接上 RabbitMQ 的生产者和消费者的情况。
  3. Channels:这里展示的是“通道”信息,关于“通道”和“连接”的关系,松哥在后文再和大家详细介绍。
  4. Exchange:这里展示所有的交换机信息。
  5. Queue:这里展示所有的队列信息。
  6. Admin:这里展示所有的用户信息。

1、Overview

Totals:

Totals 里面有 准备消费的消息数、待确认的消息数、消息总数以及消息的各种处理速率(发送速率、确认速率、写入硬盘速率等等)。

Nodes:

Nodes 其实就是支撑 RabbitMQ 运行的一些机器,相当于集群的节点。

Churn statistics:

里边展示的是 Connection、Channel 以及 Queue 的创建/关闭速率。

Export definitions && Import definitions:

最后面这两个可以导入导出当前实例的一些配置信息:

2、Connections

这里主要展示的是当前连接上 RabbitMQ 的信息,无论是消息生产者还是消息消费者,只要连接上来了这里都会显示出来。

注意协议中的 AMQP 0-9-1 指的是 AMQP 协议的版本号。

其他属性含义如下:

  • User name:当前连接使用的用户名。
  • State:当前连接的状态,running 表示运行中;idle 表示空闲。
  • SSL/TLS:表示是否使用 ssl 进行连接。
  • Channels:当前连接创建的通道总数。
  • From client:每秒发出的数据包。
  • To client:每秒收到的数据包。

点击连接名称可以查看每一个连接的详情。

在详情中可以查看每一个连接的通道数以及其他详细信息,也可以强制关闭一个连接。

3、Channels

这个地方展示的是通道的信息

那么什么是通道呢?

一个连接(IP)可以有多个通道,这个多个通道通过多线程实现,一般情况下,我们在通道中创建队列、交换机等。

生产者的通道一般会立马关闭;消费者是一直监听的,通道几乎是会一直存在。

上面各项参数含义分别如下:

  • Channel:通道名称。
  • User name:该通道登录使用的用户名。
  • Model:通道确认模式,C 表示 confirm;T 表示事务。
  • State:通道当前的状态,running 表示运行中;idle 表示空闲。
  • Unconfirmed:待确认的消息总数。
  • Prefetch:Prefetch 表示每个消费者最大的能承受的未确认消息数目,简单来说就是用来指定一个消费者一次可以从 RabbitMQ 中获取多少条消息并缓存在消费者中,一旦消费者的缓冲区满了,RabbitMQ 将会停止投递新的消息到该消费者中直到它发出有消息被 ack 了。总的来说,消费者负责不断处理消息,不断 ack,然后只要 unAcked 数少于 prefetch * consumer 数目,RabbitMQ 就不断将消息投递过去。
  • Unacker:待 ack 的消息总数。
  • publish:消息生产者发送消息的速率。
  • confirm:消息生产者确认消息的速率。
  • unroutable (drop):表示未被接收,且已经删除了的消息。
  • deliver/get:消息消费者获取消息的速率。
  • ack:消息消费者 ack 消息的速率。

4、Exchange

这里会展示交换机的各种信息。

Type 表示交换机的类型。

Features 有两个取值 D 和 I。

  • D 表示交换机持久化,将交换机的属性在服务器内部保存,当 MQ 的服务器发生意外或关闭之后,重启 RabbitMQ 时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。

  • I 表示这个交换机不可以被消息生产者用来推送消息,仅用来进行交换机和交换机之间的绑定。

Message rate in 表示消息进入的速率。

Message rate out 表示消息出去的速率。

点击下方的 Add a new exchange 可以创建一个新的交换机。

5、Queue

这个选项卡就是用来展示消息队列的:

各项含义如下:

  • Name:表示消息队列名称。
  • Type:表示消息队列的类型,classic、Quorum。
  • Features:表示消息队列的特性,D 表示消息队列持久化。
  • State:表示当前队列的状态,running 表示运行中;idle 表示空闲。
  • Ready:表示待消费的消息总数。
  • Unacked:表示待应答的消息总数。
  • Total:表示消息总数 Ready+Unacked。
  • incoming:表示消息进入的速率。
  • deliver/get:表示获取消息的速率。
  • ack:表示消息应答的速率。

点击下方的 Add a new queue 可以添加一个新的消息队列。

点击每一个消息队列的名称,可以进入到消息队列中。进入到消息队列后,可以完成对消息队列的进一步操作,例如:

  • 将消息队列和某一个交换机进行绑定。
  • 发送消息。
  • 获取一条消息。
  • 移动一条消息(需要插件的支持)。
  • 删除消息队列。
  • 清空消息队列中的消息。

6、Admin

这里是做一些用户管理操作,各项属性含义如下:

  • Name:表示用户名称。
  • Tags:表示角色标签,只能选取一个。
  • Can access virtual hosts:表示允许进入的虚拟主机。
  • Has password:表示这个用户是否设置了密码。

常见的两个操作是管理用户和虚拟主机。

点击下方的 Add a user 可以添加一个新的用户,添加用户的时候需要给用户设置 Tags,其实就是用户角色,如下:

  • none:不能访问 management plugin
  • management:用户可以通过 AMQP 做的任何事 列出自己可以通过 AMQP 登入的 virtual hosts 查看自己的 virtual hosts 中的 queues, exchanges 和 bindings 查看和关闭自己的 channels 和 connections 查看有关自己的 virtual hosts 的“全局”的统计信息,包含其他用户在这些 virtual hosts 中的活动
  • policymaker:management 可以做的任何事 查看、创建和删除自己的 virtual hosts 所属的 policies 和 parameters
  • monitoring:management 可以做的任何事 列出所有 virtual hosts,包括他们不能登录的 virtual hosts 查看其他用户的 connections 和 channels 查看节点级别的数据如 clustering 和 memory 使用情况 查看真正的关于所有 virtual hosts 的全局的统计信息
  • administrator:policymaker 和 monitoring 可以做的任何事 创建和删除 virtual hosts 查看、创建和删除 users 查看创建和删除 permissions 关闭其他用户的 connections
  • impersonator(模拟者) 模拟者,无法登录管理控制台。

五、用户权限和限流

image-20210705224849436

1、用户

可用来管理用户,可点击表格中的用户名进入配置 可访问的虚拟主机权限

image-20210705225733028

用户角色分为6类,超级管理员, 监控者, 策略制定者, 普通管理者,模仿者,以及其他。

  • 超级管理员(administrator):
    可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  • 监控者(monitoring):
    可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者(policymaker):
    可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。
  • 普通管理者(management):
    仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
  • 模仿者(Impersonator):
    不能登录后台,通常就是普通的生产者和消费者。
  • 其他(none):
    无法登陆管理控制台。

(1)命令行操作

因为 vhost 通常跟用户一起出现,所以这里我也顺便说下 user 的相关操作。

添加一个用户名为 javaboy,密码为 123 的用户,方式如下:

rabbitmqctl add_user javaboy 123

通过如下命令可以修改用户密码(将 javaboy 的密码改为 123456):

rabbitmqctl change_password javaboy 123456

通过如下命令可以验证用户密码:

rabbitmqctl authenticate_user javaboy 123456

查看所有用户及其角色

rabbitmqctl list_users

给用户设置角色的命令如下(给 javaboy 设置 administrator 角色):

rabbitmqctl set_user_tags javaboy administrator

最后,删除一个用户的命令如下:

rabbitmqctl delete_user javaboy

2、虚拟主机VirtualHost

虚拟主机:多个交换机的集合,用来表示用户可访问的交换机组;一个RabbitMQ 服务内部有多个虚拟主机

本质上,每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突。

简单来说就是多租户的概念(权限管理),和项目中按部门过滤数据的概念一样,即每个虚拟主机有多个交换机,且只能管理自己的交换机,而用户能有多个虚拟主机的权限

  • 虚拟主机 == 部门
  • 交换机 == 数据
  • 用户 == 用户

具体:订单虚拟主机中所有的交换机都是用来处理订单相关的,货运虚拟主机中的所有交换机都是用来处理运输的;订单微服务使用拥有订单虚拟主机的用户进行处理订单,货运同理,而管理员用户则可以查看和处理所有的交换机

(1)命令行操作

其实用web管理页面就可以操作了

首先进入到 docker 容器中:

docker exec -it some-rabbit /bin/bash

新增

创建一个名为 /myvh 的 vhost:

rabbitmqctl add_vhost myvh

查询

查看已有的 vhost:

rabbitmqctl list_vhosts

这个命令也可以添加两个选项 name 和 tracing

  • name 表示展示 vhost 的名称(默认已经展示)
  • tracing 则表示是否使用了 tracing 功能(tracing 可以帮助追踪 RabbitMQ 中消息的流入流出情况)
rabbitmqctl list_vhosts name tracing

删除

可以通过如下命令删除一个 vhost:

rabbitmqctl delete_vhost myvh

当删除一个 vhost 的时候,与这个 vhost 相关的消息队列、交换机以及绑定关系等,统统都会被删除。

给一个用户设置 vhost:

rabbitmqctl set_permissions -p myvh guest ".*" ".*" ".*"
  • myvh:虚拟主机名
  • guest:用户名

前面参数都好说,最后面三个 ".*" 含义分别如下:

  • 用户在所有资源上都拥有可配置权限(创建/删除消息队列、创建/删除交换机等)。
  • 用户在所有资源上都拥有写权限(发消息)。
  • 用户在所有资源上都拥有读权限(消息消费,清空队列等)。

禁止一个用户访问某个 vhost:

rabbitmqctl clear_permissions -p myvh guest

3、权限

这里涉及到三种不同的权限:

  • 读:和消息消费有关的所有操作,包括清除整个队列的消息。
  • 写:发布消息。
  • 配置:消息队列、交换机等的创建和删除。

接下来,下图展示了操作和权限的对应关系:

图片

权限操作命令

RabbitMQ 中权限操作命令格式如下:

rabbitmqctl set_permissions [-p vhosts] {user} {conf} {write} {read}

这里有几个参数:

  • [-p vhost]:授予用户访问权限的 vhost 名称,如果不写默认为 /
  • user:用户名。
  • conf:用户在哪些资源上拥有可配置权限(支持正则表达式)。
  • write:用户在哪些资源上拥有写权限(支持正则表达式)。
  • read:用户在哪些资源上拥有读权限(支持正则表达式)。

假设我们有一个名为 zhangsan 的用户,我们希望该用户在 myvh 虚拟主机下具备所有权限,那么我们的操作命令如下:

rabbitmqctl set_permissions -p myvh zhangsan ".*" ".*" ".*"

接下来执行如下命令可以验证授权是否成功:

rabbitmqctl -p myvh list_permissions

在上面的授权命令中,我们用的都是 ".*"

  • ".*":这个表示匹配所有的交换机和队列。
  • "javaboy-.*":这个表示匹配名字以 javaboy- 开头的交换机和队列。
  • "":这个表示不匹配任何队列与交换机(如果想撤销用户的权限可以使用这个)。

我们可以使用如下命令来移除某一个用户在某一个 vhost 上的权限,例如移除 zhangsan 在 myvh 上的所有权限,如下:

rabbitmqctl clear_permissions -p myvh zhangsan

执行完成后,我们可以通过 rabbitmqctl -p myvh list_permissions 命令来查看执行结果是否生效

如果一个用户在多个 vhost 上都有对应的权限,按照上面的 rabbitmqctl -p myvh list_permissions 命令只能查看一个 vhost 上的权限,此时我们可以通过如下命令来查看 lisi 在所有 vhost 上的权限:

rabbitmqctl list_user_permissions lisi

当然,在网页上还有一个 Topic Permissions,这是 RabbitMQ3.7 开始的一个新功能,可以针对某一个 topic exchange 设置权限,主要针对 STOMP 或者 MQTT 协议,我们日常 Java 开发用上这个配置的机会很少。如果用户不设置的话,相应的 topic exchange 也总是有权限的。

六、管理 rabbit mq

需要开启 web 管理页面

RabbitMQ 的管理页面,点击下方的 HTTP API 按钮,里边有一个完整的文档:

1、REST API

(1)查看队列统计数据

例如我们想查看虚拟主机 myvh 下 hello-queue 队列的数据统计,我们可以通过如下方式来查看:

curl -i -u javaboy:123 http://localhost:15672/api/queues/myvh/hello-queue

-i 表示显示响应头信息。

(2)创建队列

在 /myvh 虚拟主机下创建一个名为 javaboy-queue 的队列,使用 CURL 请求方式如下:

curl -i -u javaboy:123 -XPUT -H "Content-Type:application/json" -d '{"auto_delete":false,"durable":true}' http://localhost:15672/api/queues/myvh/javaboy-queue

注意请求方式是 PUT 请求,请求参数是 JSON 形式,JSON 里边有两个东西,一个 auto_delete 是说如果该队列没有任何消费者订阅的话,该队列是否会被自动删除(如果是一些临时队列,则该属性可以设置为 true);另外一个 durable 则是说队列是否持久化(持久化的队列,在 RabbitMQ 重启之后,队列依然存在),如果大家用 Java 代码创建过队列,这两个参数很好理解,因为我们用 Java 代码创建队列的时候这两个参数也会经常用到。

不过要注意在 Authorization 选项卡中设置用户名/密码

image-20220405162606332

(3)查看当前连接信息

我们可以通过如下请求查看当前连接信息:

请求如下:

curl -i -u javaboy:123 http://localhost:15672/api/connections

(4)查看当前用户信息

curl -i -u javaboy:123 http://localhost:15672/api/users

(5)创建一个用户

创建一个名为 zhangsan,密码是 123 ,角色是 administrator 的用户。

curl -i -u javaboy:123 -H "{Content-Type:application/json}" -d '{"password":"123","tags":"administrator"}' -XPUT http://localhost:15672/api/users/zhangsan

(5)为新用户设置 vhost

将名为 zhangsan 的用户设置到名为 myvh 的 vhost 下:

curl -i -u javaboy:123 -H "{Content-Type:application/json}" -d '{"configure":".*","write":".*","read":".*"}' -XPUT http://localhost:15672/api/permissions/myvh/zhangsan

2、rabbitmqadmin

我们自己平时做练习,一般都会开启 RabbitMQ 的 Web 管理页面,然而在生产环境下,经常是没有 Web 管理页面的,只能通过 CLI 命令去管理 MQ。

其实呀,Web 管理页面虽然友好,但是很多时候没有 CLI 快捷,而且通过 CLI 命令行的操作,我们可以做更多的定制,例如将关键信息查出来后提供给集中的监控系统以触发报警。

直接操作 CLI 命令行有点麻烦,RabbitMQ 提供了 CLI 管理工具 rabbitmqadmin ,其实就是基于 RabbitMQ 的 HTTP API,用 Python 写的一个脚本。因为 REST API 手动写请求还是挺麻烦的,这些脚本刚好替我们简化了这个操作,让这个事情变得更加简单了。

使用 rabbitmqadmin 要先会安装它。

如果我们创建 RabbitMQ 容器的时候使用的是 rabbitmq:3-management 镜像,那么默认情况下,rabbitmqadmin 就是安装好的。

否则可能需要我们自己安装 rabbitmqadmin,安装方式很简单,

首先确认你的设备上安装了 Python,这是最基本的,因为 rabbitmqadmin 这个工具就是 Python 脚本。

然后开启 RabbitMQ 的 Web 管理页面,然后输入如下地址(我的管理页面度那口映射为 25672):

http://localhost:25672/cli/index.html

在打开的页面中就可以看到 rabbitmqadmin 的下载链接。将 rabbitmqadmin 下载下来后,然后赋予其可执行权限即可:

chmod +x rabbitmqadmin

下载后的 rabbitmqadmin 我们可以直接用记事本打开,里边其实就是一堆 Python 脚本。

这套流程操作下来还是挺麻烦的,所以,我建议大家直接使用 rabbitmq:3-management 镜像,一步到位。

功能简介

  • 列出 exchanges, queues, bindings, vhosts, users, permissions, connections and channels。
  • 创建和删除 exchanges, queues, bindings, vhosts, users and permissions。
  • 发布和获取消息,以及消息详情。
  • 关闭连接和清空队列。
  • 导入导出配置。

(1)列出各种信息

查看所有交换机:

rabbitmqadmin list exchanges

查看所有队列:

rabbitmqadmin list queues

查看所有 Binding:

rabbitmqadmin list bindings

查看所有虚拟主机:

rabbitmqadmin list vhosts

查看所有用户信息:

rabbitmqadmin list users

查看所有权限信息:

rabbitmqadmin list permissions

查看所有连接信息:

rabbitmqadmin list connections

查看所有通道信息:

rabbitmqadmin list channels

(2)一个完整的例子

接下来我们用 rabbitmqadmin 来写一个完整的消息收发例子看看。

首先创建一个名为 javaboy-exchange 的交换机:

rabbitmqadmin declare exchange name=javaboy-exchange durable=true auto_delete=false type=direct

接下来创建一个名为 javaboy-queue 的队列:

rabbitmqadmin declare queue name=javaboy-queue durable=true auto_delete=false

接下来再创建一个 Binding,将交换机和消息队列绑定起来:

rabbitmqadmin declare binding source=javaboy-exchange destination=javaboy-queue routing_key=javaboy-routing

这里涉及到到三个概念:

  • source:源,其实就是指交换机。
  • destination:目标,其实就是指消息队列。
  • routing_key:这个就是路由的 key。

接下来发布一条消息:

rabbitmqadmin publish routing_key=javaboy-queue payload="hello javaboy"

查看队列中的消息(只查看,不消费,看完之后消息还在):

rabbitmqadmin get queue=javaboy-queue

清空一个队列中的消息:

rabbitmqadmin purge queue name=javaboy-queue

七、集群

1、普通集群

普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。

此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。

当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。

这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。

大致的流程图如下图:

图片

2、镜像集群

它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。

大致流程图如下图:

图片

3、节点类型

RabbitMQ 中的节点类型有两种:

  • RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
  • Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息

RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。

4、搭建普通集群

使用 docker 来搭建。

  1. 搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。
  2. RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果我们是在不同的服务器上搭建 RabbitMQ 集群,大家需要注意这一点,接下来,我们将通过 Docker 的容器连接 link 来实现容器之间的访问,略有不同)。

(1)创建三个容器

执行如下命令创建三个 RabbitMQ 容器:

docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management

docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management

docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="javaboy_rabbitmq_cookie" rabbitmq:3-management

三个节点现在就启动好了,注意在 mq02 和 mq03 中,分别使用了 --link (容器连接)参数来实现容器连接

接下来进入到 mq02 容器中,首先查看一下 hosts 文件,可以看到我们配置的容器连接已经生效了

图片

将来在 mq02 容器中,就可以通过 mylink01 或者 rabbit01 访问到 mq01 容器了。

接下来我们开始集群的配置。

分别执行如下命令将 mq02 容器加入集群中:

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app

接下来输入如下命令我们可以查看集群的状态:

rabbitmqctl cluster_status

可以看到,集群中已经有两个节点了。

接下来通过相同的方式将 mq03 也加入到集群中:

rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbit01
rabbitmqctl start_app

这个时候,我们也可以通过网页来查看集群信息,在三个 RabbitMQ 实例的 Web 端首页,都可以看到如下内容:

图片

(2)java操作集群

配置 applicaiton.properties,内容如下(注意集群配置):其他操作均不变

spring.rabbitmq.addresses=localhost:5671,localhost:5672,localhost:5673
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

消息发送成功之后,在 RabbitMQ 的 Web 管理端,我们会看到三个 RabbitMQ 实例上都会显示有一条消息,但是实际上消息本身只存在于一个 RabbitMQ 实例。

接下来我们再创建一个消息消费者,消息消费者的依赖以及配置和消息生产者都是一模一样

当消息消费者启动成功后,这个方法中只收到一条消息,进一步验证了我们搭建的 RabbitMQ 集群是没问题的。

反向测试

确保三个 RabbitMQ 实例都是启动状态,关闭掉 Consumer,然后通过 provider 发送一条消息,发送成功之后,关闭 mq01 实例,然后启动 Consumer 实例,此时 Consumer 实例并不会消费消息,反而会报错说 mq01 实例连接不上,这个例子就可以说明消息在 mq01 上,并没有同步到另外两个 MQ 上。相反,如果 provider 发送消息成功之后,我们没有关闭 mq01 实例而是关闭了 mq02 实例,那么你就会发现消息的消费不受影响。

5、镜像集群

所谓的镜像集群模式并不需要额外搭建(基于普通集群),只需要我们将队列配置为镜像队列即可。

这个配置可以通过网页配置,也可以通过命令行配置,我们分别来看。

(1)网页配置镜像队列

先来看看网页上如何配置镜像队列。

点击 Admin 选项卡,然后点击右边的 Policies,再点击 Add/update a policy,如下图:

图片

接下来添加一个策略,如下图:

图片

各参数含义如下:

  • Name: policy 的名称。

  • Pattern: queue 的匹配模式(正则表达式)。

  • Definition:镜像定义,主要有三个参数:ha-mode, ha-params, ha-sync-mode。

    • ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
    • ha-params:ha-mode 模式需要用到的参数。
    • ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
  • priority 为可选参数,表示 policy 的优先级。

配置完成后,点击下面的 add/update policy 按钮,完成策略的添加,如下:

图片

添加完成后,我们可以进行一个简单的测试。

首先确认三个 RabbitMQ 都启动了,然后用上面的 provider 向消息队列发送一条消息。

发完之后关闭 mq01 实例。

接下来启动 consumer,此时发现 consumer 可以完成消息的消费(注意和前面的反向测试区分),这就说明镜像队列已经搭建成功了。

(2)命令行配置镜像队列

命令行的配置格式如下:

rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}

举一个简单的配置案例:

rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

  目录