Java操作Kafka


一、Java操作Kafka

1、依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

2、生产者

Properties props = new Properties();
//指定kafka集群
props.put("bootstrap.servers", "127.0.0.1:9092");
//消息确认的级别 acks 配置设置为 all
//在此配置下,生产者将等待所有副本都成功写入后才会认为消息发送成功。这种配置级别可以确保数据不会丢失,但可能会影响性能。
props.put("acks", "all");
//指定 key 序列化方式为 StringSerializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定 value 序列化方式为 StringSerializer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 创建 kafka 生产者, 生产者需要调用 close() 关闭
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props);) {

    // 创建生产者消息对象
    // 参数1: 主题topic
    // 参数2: key。在 Kafka 中,每条消息都可以有一个键值对,键是一个可选参数,可为 null。
    // 参数3: 消息内容
    ProducerRecord<String, String> record = new ProducerRecord<>("topic-test", null, "消息内容");

    // 发送消息,异步
    Future<RecordMetadata> future = producer.send(record);
    // 发送消息,异步,传入callback回调
    Future<RecordMetadata> future2 = producer.send(record, (RecordMetadata metadata, Exception e) -> {
        System.out.println(metadata);
        System.out.println(e);
    });

    // 阻塞等待获取结果
    RecordMetadata metadata = future.get();
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:

  • acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
  • acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
  • acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。

3、消费者

Properties props = new Properties();
// kafka ip port
props.setProperty("bootstrap.servers", "175.178.14.202:9092");
//消费者组的唯一标识符。所有属于同一组的消费者将共享一个消费者组ID。
props.setProperty("group.id", "test");
//消费者是否应该自动提交偏移量
props.setProperty("enable.auto.commit", "true");
//消费者自动提交偏移量的时间间隔,以毫秒为单位。
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//创建kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
//订阅要消费的主题
//指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Collections.singletonList("topic-test"));
//使用一个while循环,不断从kafka的topic中拉取消息
while(true){
    //kafka一次拉取一批数据
    ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofSeconds(5));
    //将记录record的offset、key、value都打印出来
    for (ConsumerRecord<String,String> consumerRecord:poll){
        //主题
        String topic = consumerRecord.topic();
        //offset:这条消息处于kafka分区中的哪个位置
        long offset=consumerRecord.offset();
        //key\value
        String key=consumerRecord.key();
        String value=consumerRecord.value();
        System.out.println("topic:"+topic+"offset:"+offset+"key:"+key+"value:"+value);
    }
}

二、SpringBoot整合Kafka

1、依赖

spring-boot 版本为 2.2.4.RELEASE

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.13.RELEASE</version>
</dependency>

依赖版本

spring-kafka Spring版本 kafka-clients Spring Boot
3.1.x (pre-release) 6.2.x (pre-release) 3.5.1 3.2.x (pre-release)
3.0.x 6.0.x/6.1.x 3.3.2/3.4.1/3.5.1 3.0.x/3.1.x
2.9.x 5.5.x 3.2.3/3.4.1/3.5.1 2.7.x (not managed)
2.8.x 5.5.x 3.0.0 2.6.x or 2.7.x (End of Life)
2.7.x 5.5.x 2.7.0 - 2.8.1 2.4.x or 2.5.x (End of Life)
2.6.x 5.3.x or 5.4.x 2.6.0 - 2.8.1 2.3.x or 2.4.x (End of Life)
2.5.x 3.3.x 2.5.1 - 2.8.1 2.3.x (End of Life)
2.4.x 3.2.x 2.4.1 2.2.x (End of Life)
2.3.x 3.2.x 2.3.1 2.2.x (End of Life)
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x 2.1.x (End of Life)
2.1.x 3.0.x 1.0.2 2.0.x (End of Life)
1.3.x 2.3.x 0.11.0.x, 1.0.x 1.5.x (End of Life)

2、配置

最简配置

spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

常用配置

spring:
  kafka:
    # 集群以,间隔
    bootstrap-servers: 175.178.14.202:9092
    producer:
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      # 默认Integer.MAX_VALUE
      retries: 0
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: earliest
      # 设置自动提交offset
      enable-auto-commit: true
      # 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 100
      max-poll-records: 5
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

(1)API

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data){}

public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data){}

public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data){}

public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data){}

public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record){}

public ListenableFuture<SendResult<K, V>> send(Message<?> message){}

(2)示例

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("send")
public String send() throws ExecutionException, InterruptedException {

    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic-test", "123456");
    SendResult<String, String> result = future.get();
    System.out.println(result.getProducerRecord().toString());
    System.out.println(result.getRecordMetadata().toString());
    return "1";
}

(3)异步回调

kafkaTemplate.send("topic-test", "123456")
    .addCallback(
        // 发送成功回调
        success ->{
            RecordMetadata recordMetadata = success.getRecordMetadata();
            ProducerRecord<String, String> producerRecord = success.getProducerRecord();
            System.out.println(producerRecord.value());
        }
        // 发送失败回调
        ,failure -> {
            System.out.println(failure);
        }
    );

或下面这种写法

kafkaTemplate.send("topic-test", "123456")
    .addCallback(
        new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {

            }

            @Override
            public void onSuccess(SendResult<String, String> stringStringSendResult) {

            }
        }
    );

4、监听消息

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class TestTopicListener {

    @KafkaListener(topics = {"topic-test"})
    public void test1Listener(String message){
        System.out.println(message);
    }

    @KafkaListener(topics = {"topic-test"})
    public void test2Listener(ConsumerRecord<?, ?> record){
        System.out.println(record.value());
    }
}

  目录