一、Java操作Kafka
1、依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
2、生产者
Properties props = new Properties();
//指定kafka集群
props.put("bootstrap.servers", "127.0.0.1:9092");
//消息确认的级别 acks 配置设置为 all
//在此配置下,生产者将等待所有副本都成功写入后才会认为消息发送成功。这种配置级别可以确保数据不会丢失,但可能会影响性能。
props.put("acks", "all");
//指定 key 序列化方式为 StringSerializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定 value 序列化方式为 StringSerializer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 kafka 生产者, 生产者需要调用 close() 关闭
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props);) {
// 创建生产者消息对象
// 参数1: 主题topic
// 参数2: key。在 Kafka 中,每条消息都可以有一个键值对,键是一个可选参数,可为 null。
// 参数3: 消息内容
ProducerRecord<String, String> record = new ProducerRecord<>("topic-test", null, "消息内容");
// 发送消息,异步
Future<RecordMetadata> future = producer.send(record);
// 发送消息,异步,传入callback回调
Future<RecordMetadata> future2 = producer.send(record, (RecordMetadata metadata, Exception e) -> {
System.out.println(metadata);
System.out.println(e);
});
// 阻塞等待获取结果
RecordMetadata metadata = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
- acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
- acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
- acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
3、消费者
Properties props = new Properties();
// kafka ip port
props.setProperty("bootstrap.servers", "175.178.14.202:9092");
//消费者组的唯一标识符。所有属于同一组的消费者将共享一个消费者组ID。
props.setProperty("group.id", "test");
//消费者是否应该自动提交偏移量
props.setProperty("enable.auto.commit", "true");
//消费者自动提交偏移量的时间间隔,以毫秒为单位。
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
//订阅要消费的主题
//指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Collections.singletonList("topic-test"));
//使用一个while循环,不断从kafka的topic中拉取消息
while(true){
//kafka一次拉取一批数据
ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofSeconds(5));
//将记录record的offset、key、value都打印出来
for (ConsumerRecord<String,String> consumerRecord:poll){
//主题
String topic = consumerRecord.topic();
//offset:这条消息处于kafka分区中的哪个位置
long offset=consumerRecord.offset();
//key\value
String key=consumerRecord.key();
String value=consumerRecord.value();
System.out.println("topic:"+topic+"offset:"+offset+"key:"+key+"value:"+value);
}
}
二、SpringBoot整合Kafka
1、依赖
spring-boot
版本为 2.2.4.RELEASE
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.13.RELEASE</version>
</dependency>
依赖版本
spring-kafka | Spring版本 | kafka-clients |
Spring Boot |
---|---|---|---|
3.1.x (pre-release) | 6.2.x (pre-release) | 3.5.1 | 3.2.x (pre-release) |
3.0.x | 6.0.x/6.1.x | 3.3.2/3.4.1/3.5.1 | 3.0.x/3.1.x |
2.9.x | 5.5.x | 3.2.3/3.4.1/3.5.1 | 2.7.x (not managed) |
2、配置
最简配置
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
常用配置
spring:
kafka:
# 集群以,间隔
bootstrap-servers: 175.178.14.202:9092
producer:
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
# 默认Integer.MAX_VALUE
retries: 0
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: earliest
# 设置自动提交offset
enable-auto-commit: true
# 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 100
max-poll-records: 5
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、发送消息
(1)API
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data){}
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data){}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data){}
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data){}
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record){}
public ListenableFuture<SendResult<K, V>> send(Message<?> message){}
(2)示例
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("send")
public String send() throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic-test", "123456");
SendResult<String, String> result = future.get();
System.out.println(result.getProducerRecord().toString());
System.out.println(result.getRecordMetadata().toString());
return "1";
}
(3)异步回调
kafkaTemplate.send("topic-test", "123456")
.addCallback(
// 发送成功回调
success ->{
RecordMetadata recordMetadata = success.getRecordMetadata();
ProducerRecord<String, String> producerRecord = success.getProducerRecord();
System.out.println(producerRecord.value());
}
// 发送失败回调
,failure -> {
System.out.println(failure);
}
);
或下面这种写法
kafkaTemplate.send("topic-test", "123456")
.addCallback(
new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
}
}
);
4、监听消息
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestTopicListener {
@KafkaListener(topics = {"topic-test"})
public void test1Listener(String message){
System.out.println(message);
}
@KafkaListener(topics = {"topic-test"})
public void test2Listener(ConsumerRecord<?, ?> record){
System.out.println(record.value());
}
}