Kafka


一、Kafka概述

1、基本介绍

官网:https://kafka.apache.org/

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

传统的消息队列的主要应用场景包括:缓存消峰解耦异步通信。

2、消息队列两种模式

image-20220902124921601

区别: 点对点消费 -> 消息只能发布到一个主题, 消费完成就删除消息,且只有一个消费者

发布订阅模式 -> 消息可以发布到多个主题, 消息一般保留七天,且有多个消费者

3、基础架构

image-20220902125656203

4、概念

(1)broker

即 kafka 节点

  • 一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错

  • broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态

  • 一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能

(2)zookeeper

  • ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

  • ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的broker。

PS:Kafka正在逐步想办法将ZooKeeper剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉ZooKeeper的依赖。“Kafka on Kafka”——Kafka自己来管理自己的元数据

(3)consumer group(消费者组)

消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

一个分区只能被一个组的某一个消费者消费,其他组的也可以同时消费

案例:存在一个主题有两个分区,两个消费者的情况下,发送100条消息

两个消费者属于同一个消费者组,两个消费者分别消费两个分区的数据,分别50次,总计100次,不重复

两个消费者不属于同一个消费者组,两个消费者都消费了100条消息,总计消费200次,即不同消费者组可重复消费消息

(4)Partition分区

分区,为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。

默认一个topic有一个分区(partition),自己可设置多个分区(分区分散存储在服务器不同节点上)解决了一个海量数据如何存储的问题

例如:有2T的数据,一台服务器有1T,一个topic可以分多个区,分别存储在多台服务器上,解决海量数据存储问题。

(5)Replica副本

image-20231016151248236

副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

  • 副本可以确保某个服务器出现故障时,确保数据依然可用
  • 在Kafka中,一般都会设计副本的个数>1

Leader 和Follower

  • Leader(主副本):每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

  • Follower(备份副本):每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

(6)topic主题

  • 主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据

  • Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制

  • 在主题中的消息是有结构的,一般一个主题包含某一类消息

  • 一旦生产者发送消息到主题中,这些消息就不能被更新(更改)

(7)offset偏移量

image-20231016152154276

offset 是 Kafka 为每条消息分配的一个唯一的编号(同一主题同一分区中唯一),它表示消息在分区中的顺序位置。offset 是从 0 开始的,每当有新的消息写入分区时,offset 就会加 1。offset 是不可变的,即使消息被删除或过期,offset 也不会改变或重用。

  • 在Kafka2.8版本前,Zookeeper的Consumer文件中存放消息被消费的记录(offset)

  • 在Kafka2.8版本之后,消息被消费的记录(offset)存放在Kafka 的 _consumer_offsets 主题中。

  • 在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏移量offset

  • 偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的

offset 的作用主要有两个:

  • 一是用来定位消息。通过指定 offset,消费者可以准确地找到分区中的某条消息,或者从某个位置开始消费消息。
  • 二是用来记录消费进度。消费者在消费完一条消息后,需要提交 offset 来告诉 Kafka broker 自己消费到哪里了。这样,如果消费者发生故障或重启,它可以根据保存的 offset 来恢复消费状态。

二、Kafka安装

官网安装教程:https://kafka.apache.org/quickstart

下载:https://kafka.apache.org/downloads

1、配置文件

tar -xzf kafka_2.12-3.0.0.tgz

解压后,编辑配置文件 ./kafka/config/server.properties

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
# /tmp/ 是 Linux 临时目录,所以必须修改为其他目录
log.dirs=/tmp/kafka-logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824

# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
#zookeeper.connect=localhost:2181/kafka
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

# 需要修改为客户端可以访问的ip,否则java客户端无法连接
advertised.listeners=PLAINTEXT://外网ip:9092

如果只是简单启动,只需要修改 broker.idlog.dirszookeeper.connect

  • broker.id 不得重复,整个集群中唯一

后续新增节点,只需加入 Zookeeper 集群即可

kafka 默认启动端口 9092

2、ZK方式启动

ZK启动

nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>$1 &

Kafka 启动

bin/kafka-server-start.sh config/server.properties

3、KRaft方式启动

生成集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动 Kafka

bin/kafka-server-start.sh config/kraft/server.properties

4、脚本统一启动关闭

为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

配置KAFKA_HOME环境变量

vim /etc/profile

export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}
#源文件无下面这条需手动添加
export PATH

#每个节点加载环境变量
source /etc/profile

准备slave配置文件,用于保存要启动哪几个节点上的kafka

10.211.55.8
10.211.55.9
10.211.55.7

编写 start-kafka.sh 脚本

#!/bin/bash

cat /export/onekey/slave | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
 wait
}&
done

编写 stop-kafka.sh 脚本

#!/bin/bash

cat /export/onekey/slave | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
 wait
}&
done

start-kafka.shstop-kafka.sh 配置执行权限

chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh

执行shell脚本需实现服务期间ssh免密登录

5、kafka tool

kafka tool 工具下载地址

https://www.kafkatool.com/download.html

创建连接

image-20231011154004895

查看消息

image-20231011154531959

三、命令行操作

1、主题操作

(1)查看操作主题命令参数

 bin/kafka-topics.sh

image-20220902153652112

(2)查看当前服务器中的所有 topic

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

(3)创建 first topic

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

  • --topic 定义 topic 名
  • --replication-factor 定义副本数
  • --partitions 定义分区数

(4)修改分区数

注意:分区数只能增加,不能减少

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic first --partitions 3

(5)查看 first 主题的详情

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic first

(6)删除 topic

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic first

2、生产者操作

(1)连接kafka生产者

kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic first

参数 描述

  • --bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
  • --topic <String: topic> 操作的 topic 名称。

(2)发送消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --
bootstrap-server 127.0.0.1:9092 --topic first
>hello world
>Hi HI

3、消费者操作

(1)查看操作消费者命令参数

kafka-console-consumer.sh

参数 描述

  • –bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
  • –topic <String: topic> 操作的 topic 名称。
  • –from-beginning 从头开始消费。
  • –group <String: consumer group id> 指定消费者组名称。

(2)消费 first 主题中的数据

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first

阻塞等待生产者数据并消费,不会消费历史数据(即消费者启动之前生产者发送的数据)

(3)把主题中所有的数据读取出来

(包括历史数据)

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic first

四、Broker 原理

1、Broker设计

以下主题 TopicA 为 三节点、两分区、三副本的架构图

image-20231016160944698

为了提高吞吐量,每个topic也会都多个分区,同时为了保持可靠性,每个分区还会有多个副本。这些分区副本被均匀的散落在每个broker上,其中每个分区副本中有一个副本为leader,其他的为follower。

2、Zookeeper

(1)ZK作用

Zookeeper在Kafka中扮演了重要的角色,kafka使用zookeeper进行元数据管理,保存broker注册信息,包括主题(Topic)、分区(Partition)信息等,选择分区leader。

image-20220902200249692

(2)Broker选举Leader

这里需要先明确一个概念leader选举,因为kafka中涉及多处选举机制,容易搞混,Kafka由三个方面会涉及到选举:

  • broker(控制器)选leader
  • 分区多副本选leader
  • 消费者选Leader

在kafka集群中由很多的broker(也叫做控制器),但是他们之间需要选举出一个leader,其他的都是follower。broker的leader有很重要的作用,诸如:创建、删除主题、增加分区并分配leader分区;集群broker管理,包括新增、关闭和故障处理;分区重分配(auto.leader.rebalance.enable=true,后面会介绍),分区leader选举。

每个broker都有唯一的brokerId,他们在启动后会去竞争注册zookeeper上的Controller结点,谁先抢到,谁就是broker leader。而其他broker会监听该结点事件,以便后续leader下线后触发重新选举。

https://blog.csdn.net/weixin_46075832/article/details/126703474


  目录