9-Stream消息驱动


一、基本概念

1、概念

Spring Cloud Stream是构建消息驱动微服务的框架。

2、解决问题

消息中间件太多了,RabbitMQkafkaActiveMQRocketMQ等。导致学习使用成本过大。

3、作用

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。但Stream只支持RabbitMQkafka

4、工作原理

应用程序通过 inputs 和 outputs 来与 Spring Cloud Stream 中的 binder 对象交互。而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。

Binder

  1. Input:消费者
  2. Output:生产者

5、工作流程

  1. Binder:很方便的连接中间件,屏蔽差异
  2. Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  3. Source和Sink:简单的可以理解为参照对象是Spring Cloud Stream 自身,从Stream发布消息就是输出,接受消息就是输入

6、常用的API和注解

二、模块构建

1、消息的生产者

(1)pom

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--如果使用其他消息中间件,更换这个依赖即可-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

(2)application.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置,会爆红(不管)

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

(3)主启动类无需其他配置

(4)ServiceImpl

接口自己写

//不再试用@Service了
@EnableBinding(Source.class)//定义消息的推送管道,Source是源
public class MessageProviderImpl implements MessageProdvider {

    @Resource
    private MessageChannel output;//消息发送管道

    @Override
    public String send() {

        //流水号
        String serial = UUID.randomUUID().toString();
        String value = "表头"+serial;

        Message<String> build = MessageBuilder
                .withPayload(serial)//设置消息内容
                .setHeader("partitionKey",value)//设置表头
                .build();//构建

        output.send(build);//发送消息

        System.out.println(value);

        //因为构建了message对象,吧他传给source,然后在给MQ,所以没用到返回值
        return null;
    }
}

(5)controller

@RestController
public class SendMessageController {

    @Resource
    private MessageProdvider messageProdvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(){
        return messageProdvider.send();
    }
}

(6)测试

访问http://localhost:8801/sendMessage

即可在http://localhost:15672/观察到消息

2、消息消费者

(1)application.yml

pom和主启动类与生产者相同

只需将output改为input,其他就是端口、微服务名、instance-id自己改喽。

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置



eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

(2)controller

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String servicePort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1,收到" +
                message.getPayload() + message.getHeaders()+"\t"+servicePort);
    }
}

(3)测试

当访问http://localhost:8801/sendMessage时

8802输出

消费者1,收到b3736ba1-c2bf-4df1-b75b-b3e6dd141f52    8802

三、消息重复消费问题(重点)

同组微服务不会出现重复消费问题,不设置分组的话默认不会出现同组的情况。

可以通过application.yml配置分组

spring:
  cloud:
    bindings: # 服务的整合处理
      input: # 这个名字是一个通道的名称
        destination: studyExchange # 表示要使用的Exchange名称定义
        content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
        binder: defaultRabbit # 设置要绑定的消息服务的具体设置
        group: group1 #设置分组,解决重复消费问题

四、消息持久化(重点)

当8802未启动时,8801生产了几条消息:

  1. 当8802没有设定分组时,8802启动后,不会消费以存在的消息
  2. 当8802有设定分组时,8802启动后,自动获取未消费的消息

总结:
也就是,当我们没有配置分组时,会出现消息漏消费的问题

​ 而配置分组后,我们可以自动获取未消费的数据

五、重复消费和持久化都是通过分组解决(重点)


  目录