Kafka

一、kafka架构

kafka 是一个分布式的基于发布和订阅模式消息队列(message queue),主要用于大数据实时处理领域。

kafka图标

1.1 消息队列

同步通信

同步通信

异步通信

异步通信

使用消息队列的好处如下:

  • 解耦:允许独立的扩展或修改两边的处理过程
  • 可恢复性:系统的一部分组件失效时,不会影响到整个系统
  • 缓冲:控制和优化数据经过系统的速度
  • 灵活性和峰值处理能力:在特殊情况下,通信中的请求量会急剧增大(如双十一等活动),但这种情况并不常见,如果投入资源来待命是很浪费的,使用消息队列就可以顶住突发的访问压力,而保护集群不会因为请求量过多而崩溃
  • 异步通信:有时候用户不想也不需要立即处理消息,这种时候就允许用户把消息放在队列中,等待处理

消息队列分为点对点模式发布/订阅模式

点对点模式

消息生产者生产消息发送到 queue 中,消费者再从 queue 拉取消息并消费,消息被消费之后就不再存在于 queue 中,queue 可以有多个消费者,但对于消息而言只能有一个消费者。

点对点模式

发布/订阅模式

消息生产者将消息发送到 topic 中,可以同时有多个消费者订阅该消息,发布到 topic 的消息可以被所有消费者订阅。

发布订阅模式

1.2 kafka基础架构

kafka 有四个核心的 API:

  • The Producer API:允许一个应用程序发布一串流式的数据到一个或者多个 topic
  • The Consumer API:允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理
  • The Streams API:允许一个应用程序作为一个流处理器,消费一个或者多个 topic 产生的输入流,然后生产一个输出流到一个或多个 topic 中去,在输入输出流中进行有效的转换
  • The Connector API:允许构建并运行可重用的生产者或者消费者,将 topics 连接到已存在的应用程序或者数据系统

kafka架构

  • producer:消息生产者
  • consumer:消息消费者
  • topic:数据主题,是数据订阅/发布的地方,可以被多个消费者订阅
  • partition:分区,每个 topic 包含一个或多个分区,分区是一个有序的队列,分区中的消息都会被分配一个 offset,kafka 保证按每一个分区中的顺序将消息发送给消费者,但不保证按每一个 topic 中的顺序将消息发送给消费者;且 partition 是消费者和生产者操作的最小单元
  • offset(偏移量):kafka 的存储文件都是按照 offset.kafka 来命名,方便查找数据
  • leader 副本:消息的订阅/发布都由 leader 来完成
  • follower 副本:进行消息数据的备份,当 leader 挂了之后,就成为新的 leader 来代替旧的 leader
  • broker:一个 kafka 就是一个 broker,一个集群由多个 broker 组成
  • consumer group:消费者组,组中有多个消费者,组中的消费者都只能够接收一个分区的消息
  • zookeeper:保存 kafka 集群状态的信息,2.8.0版本开始支持 kraft 模式,可不依赖 zk 运行 kafka 集群
  • replicas:副本数

1.3 kafka 存储机制

kafka 内部会自己创建一个 _consumer_offsets 并包含 50 个分区,这些主题用来保存消费者消费某个 topic 的偏移量。

每个 partitions 都被切割成相同大小的 segment,segment 由四个部分构成,具体如下:

kafka存储机制

  • log:数据文件
  • index:索引文件,用来保存消息的索引,能够使查找数据更加高效
  • timeindex:具体时间日志
  • leader-epoch-checkpoint:保存了每一任 leader 开始写入消息时的 offset,会定时更新

1.4 生产者分区策略

kafka 允许为每条消息定义消息 key,可以根据 key 来为消息选择分区。

分区策略即决定生产者将消息发送到哪个分区的算法,主要有以下几种:

  • 轮询策略(没给定分区号和 key 值):可以提供优秀的负载均衡能力,保证消息被平均的分配到各个分区上,但不能保证消息的有序性。
  • 随即策略:瞎分,基本不用。
  • hash 策略(没给定分区号,但给了 key 值):将 key 的 hash 值与 topic 的 partition 数进行取余得到
    partition值
  • 自定义分区

1.5 生产者ISR

ISR 即同步副本,leader 维护了一个动态的 ISR,为 leader 保持同步的 follower 集合,当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 acks,如果 follower 长时间没有向 leader 同步数据,则该 follower 将被踢出 ISR。leader 发生故障之后就会在 ISR 中选取出新的 leader。

1.6 生产者ACKS

kafka 发送确认参数 acks 的几种模式:

  • acks = 0:意味着生产者能够通过网络把消息发送出去,broker 接收到数据还没写入就发送,容易丢数据
  • acks = 1:等待 leader 写入数据(不等待 follower 同步),就可以发送数据,可能会丢数据
  • acks = all/-1:等待 leader 和 follower(ISR 中的)写入数据,就可以发送数据,数据不容易丢,但效率底

1.7 kafka数据一致性原理

  • HW:所有副本中的最小 LEO
  • LEO:每个副本的最后一个(最大的) offset

假设分区的副本为3,副本0为 leader,副本1、2为 follower,并且都在 ISR 列表中。虽然 leader 已经写入了 message3,但消费者只能读到 message1,因为所有的 ISR 都同步了 message1,只有在 HW 之上的消息才支持被消费,而 HW 取决于 LEO,原理类似于木桶原理。

这么做的原因是如果 leader 崩溃了,一个 follower 成为新的 leader 后,由于该新的 leader 没有做好数据的同步,如果允许消费者读取 HW 之下的数据的话,那么就会出现数据不一致的问题。

kafka数据一致性原理

1.7 Exactly-once

当生产者发送消息到 topic 时,很可能会出现宕机的情况,或者出现了网络故障,导致生产者消息发送失败,而生产者要如何处理这样的错误,就产生了如下几种不同的语义:

  • At least once:至少一次,如果生产者收到了 broker 发送过来的 acks,且 acks 设置为了 all/-1,这就代表消息已经被写入 topic 且同步好了。如果生产者在时间内没受到 acks 或收到的 acks 有误,那么就会重新发送消息。如果 broker 恰好在消息已经写入 topic 时,发送 acks 前出了故障,那么就会导致生产者发送同样的消息两次,消费者消费两次。
  • At more once:最多一次,如果生产者在接收 acks 超时或返回有问题的时候不重新发送消息,那么消息很可能没写入 topic 中,因此消费者也不会消费该条消息,不会出现数据重复的现象,但很容易缺数据。
  • Exactly once:精确一次,即使生产者重新发送消息,也只会被消费者消费一次,实际上就是在 kafka 集群中做一次去重的操作。kafka 集群会给生产者分配一个 PID,生产者每次发送消息到 broker 时都会附带 PID、分区以及序列号,broker 就会对数据做一个保存,如果生产者再发送同样消息,那么 broker 就会对数据进行去重,但如果生产者宕机重启了,就会被分配一个新的 PID,所以去重无法做到精准的数据写入。要开启 exactly-once,需要在 broker 中配置 enable.idempotence = true ,这时候 acks 默认被设置为 -1。

1.8 消费者分区分配策略

消费者是采用 pull 的方式在 kafka 集群中获取消息的。

push 模式很难适应消费速率不同的消费者,因为消息的发送速率是由 broker 决定的;而 pull 方式当 kafka 集群没有数据的时候,消费者可能会陷入循环之中,一直取到空数据。

一个消费者组里由多个消费者,一个 topic 里由多个分区,那么数据在消费的时候就会涉及到分配的问题,即确定那些分区由消费者组里的哪些消费者来消费。

kakfa 有三种分配策略,如下:

  • RangeAssignor(默认):RangeAssignor 是针对 topic 而言的,首先会对 topic 中的分区按照序号进行排列,并对消费者根据字典序排列,然后用分区个数除以消费者线程的总数来决定分区的分配,但如果除不尽,那么前面的消费者就会多分配一些分区。
  • RoundRobin:将消费组内所有消费者以及消费者所订阅的所有 topic 的 partition 按照字典序排序,然后通过轮询的方式逐个分配给组内的消费者。如果同时消费多个 topic,那么消费者会将这些 topic 视为一个。但如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。
  • StickyAssignor:StickyAssignor 要实现的是分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个,假设一个消费者组有三个消费者,都订阅了四个 topic,且每个 topic 中有二个分区,那么这时候分配的结果与 RoundRobin 会很相似,即消费者A三个分区、消费者B三个分区、消费者C两个分区,假设消费者C退出了消费者组,这时候 StickyAssignor 会保留之前消费者A和B分配到的分区,然后再将消费者C之前分配到的分区再分配给消费者A和B,即体现了粘性,不需要消费者将之前处理过的数据送到新的消费者再处理一次。

1.9 offset存储

kafka 在0.9版本之前,offset 是存储在 zk 中,在之后的版本则是存储在 kafka 内置的一个 topic 中,即 _consumer_offsets,一个 offset 提交的信息包括以下:

Fields Content
Key Consumer Group、Topic、Partition
Payload Offset、Metadata、Timestamp

offset 的提交会根据消费者组的 key(GTP) 进行分区,对于一个给定的消费者,它所有的消息都会发送到唯一的 broker,这样消费者就不需要对多个 broker 拉取数据,但如果消费者组消费很多数量的分区,会对 broker 造成性能瓶颈。

1.10 kafka写机制

  • 顺序写入磁盘
  • 零拷贝

1.11 kafka-controller

kafka 集群中会有一个 broker 被选举为 controller,用来负责管理 broker 的上下线,所有 topic 的分区、副本分配和 leader 选举等。

1.12 kafka事务

事务可以保证在 Exactly-Once 的基础上,生产和消费可以跨分区跨会话,即生产者在同一个事务内提交到多个分区的消息,要么同时提交成功,要么同时失败,这样就保证了生产者在运行时出现异常或宕机之后仍然成立。

生产者事务

为了实现跨分区跨会话的事务,需要引入全局唯一的 Transaction ID(由客户端给定),并将生产者的 PID 与之绑定,这样以来生产者即使宕机重启也可以与原来的 PID 进行绑定。

为了管理 Transaction ID,kafka 中引入了一个新的组件 Transaction Coordinator,生产者就是与 Transaction Coordinator 交互来获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务写入 topic,这样即使服务重启,也可以得到恢复。

消费者事务

事务主要是为生产者作保证,无法保证消费者的精准消费,是因为消费者可以根据 offset 来访问消息。

二、kafka基础

2.1 kafka安装

通过二进制包部署

  1. 下载 kafka
1
2
curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xf kafka_2.13-2.8.0.tgz
  1. 修改 server.properties
1
2
3
4
# broker的id必须为唯一的整数,设置为-1时随机生成
broker.id=-1
# 修改为实际zookeeper节点地址+2181端口
zookeeper.connect=...
  1. 开启 zookeeper 和 kafka
1
2
3
# daemon参数:不输出启动日志信息
./bin/zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties

通过 docker-compose 部署

  1. docker-compose.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
version: "3.8"
services:
zookeeper:
container_name: zookeeper
image: docker.io/bitnami/zookeeper:3.7
ports:
- "12181:2181"
volumes:
- "./data/zookeeper_data:/zookeeper_data"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
container_name: kafka1
image: docker.io/bitnami/kafka:2.8.0
ports:
- "19092:9092"
volumes:
- "./data/kafka1_data:/data"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
kafka2:
container_name: kafka2
image: docker.io/bitnami/kafka:2.8.0
ports:
- "19093:9092"
volumes:
- "./data/kafka2_data:/data"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
kafka3:
container_name: kafka3
image: docker.io/bitnami/kafka:2.8.0
ports:
- "19094:9092"
volumes:
- "./data/kafka3_data:/data"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
  1. 进入 zookeeper 内部查看 brokers 是否存在
1
2
3
zkCli.sh
ls /brokers/ids
...

kraft 模式部署

  1. kraft/server.properties
1
2
3
4
# 根据节点数改
node.id=1
# 控制节点
controller.quorum.voters=1@master:9093,2@slave1:9093,3@slave2:9093
  1. 生成集群ID
1
./bin/kafka-storage.sh random-uuid
  1. 生成 /tmp/kraft-combined-logs 目录
1
./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
  1. 各节点启动 kafka
1
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

kraft 模式 docker-compose 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
version: "3.8"
services:
kafka1:
container_name: kafka1
image: docker.io/bitnami/kafka:2.8.0
command:
- /bin/bash
- -c
- |
kafka-storage.sh format -t <uuid> -c /opt/bitnami/kafka/config/kraft/server.properties
kafka-server-start.sh /opt/bitnami/kafka/config/kraft/server.properties
ports:
- "19092:9092"
volumes:
- "./conf/kafka1:/opt/bitnami/kafka/config/kraft"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
kafka2:
container_name: kafka2
image: docker.io/bitnami/kafka:2.8.0
command:
- /bin/bash
- -c
- |
kafka-storage.sh format -t <uuid> -c /opt/bitnami/kafka/config/kraft/server.properties
kafka-server-start.sh /opt/bitnami/kafka/config/kraft/server.properties
ports:
- "19093:9092"
volumes:
- "./conf/kafka2:/opt/bitnami/kafka/config/kraft"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
kafka3:
container_name: kafka3
image: docker.io/bitnami/kafka:2.8.0
command:
- /bin/bash
- -c
- |
kafka-storage.sh format -t <uuid> -c /opt/bitnami/kafka/config/kraft/server.properties
kafka-server-start.sh /opt/bitnami/kafka/config/kraft/server.properties
ports:
- "19094:9092"
volumes:
- "./conf/kafka3:/opt/bitnami/kafka/config/kraft"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes

2.2 kafka基本使用

  1. 列出某个 zookeeper 中的 topic
1
./kafka-topics.sh --list --zookeeper zookeeper_ip:2181
  1. 创建 topic
1
2
3
# --replication-factor:副本数,总副本数为(分区数 * 副本数参数),下面的例子总副本数为4
# --partitions:分区数
./kafka-topics.sh --create --zookeeper zookeeper_ip:2181 --replication-factor 2 --partitions 2 --topic test
  1. 删除 topic
1
2
# delete.topic.enable为true时才会真正删除
./kafka-topics.sh --delete --zookeeper zookeeper_ip:2181 --topic test
  1. 查看 topic
1
./kafka-topics.sh --describe --zookeeper zookeeper_ip:2181 --topic test
  1. 启动 producer
1
2
# --broker-list:针对生产者使用,指定集群中的一个或者多个kafka服务器
./kafka-console-producer.sh --broker-list kafka_id:9092 --topic test
  1. 启动 consumer
1
2
3
# --bootstrap-server:针对消费者使用,指定集群中的一个或者多个kafka服务器
# --from-beginning:查看所有消息数据
./kafka-console-consumer.sh --bootstrap-server kafka_id:9092 --topic test --from-beginning

2.3 单播消息

如果多个消费者在同一个消费者组,只有一个消费者可以收到同一个订阅的 topic 的消息。

如果 topic 进行了分区,那么一个 partition 只能被消费者组内的一个消费者消费,但消费者可以消费多个不同的 partition,而这些 partition 也可以被不同消费者组的消费者消费。

1
2
# --consumer-property group.id:指定该消费者从属于哪个消费者组
./kafka-console-consumer.sh --bootstrap-server kafka_id:9092 --consumer-property group.id=testgroup --topic test

2.4 多播消息

如果多个不同消费者组中的消费者消费同一个订阅的 topic 的消息,那么这些消费者都可以消费同样的消息。

2.5 查看消费者组信息

  1. 查看指定节点有哪些消费者组
1
./kafka-consumer-groups.sh --bootstrap-server kafka_id:9092 --list
  1. 查看消费者组详细信息
1
2
3
4
./kafka-consumer-groups.sh --bootstrap-server kafka_id:9092 --describe --group testgroup
# CURRENT-OFFSET:上次消费消息偏移量
# LOG-END-OFFSET:当前topic最后消息偏移量
# LAG:未消费信息的数量

2.6 kafka集群操作

  1. 消息发送
1
./kafka-console-producer.sh --broker-list kafka_01:9092 kafka_02:9092 kafka_03:9092 --topic test
  1. 消息消费
1
./kafka-console-consumer.sh --bootstrap-server kafka_01:9092 kafka_02:9092 kafka_03:9092 --from-beginning --topic test
  1. 消费者组消费信息
1
./kafka-consumer-groups.sh --bootstrap-server kafka_01:9092 kafka_02:9092 kafka_03:9092 --from-beginning --consumer-property group.id=testgroup --topic test

三、kafka优化

3.1 如何防止消息丢失

  • 发送方:设置 ack 的值为 1 或 -1/all
  • 消费方:设置 offset 为手动提交

3.2 如何防止消息的重复消费

如果 broker 收到了消息并发送了 ack 给生产者,因为网络原因生产者在一定时间内没有收到 ack 就会进行消息的重新发送,这样 broker 就会有两条一样的消息,就可能会造成消费者重复消费。

在消费者端进行非幂等性(多次访问的结果是一样的)消费问题,就可以解决。

  • 方法一:在数据库中创建一个联合主键(id,uuid),这样只有联合主键匹配成功才能写入数据
  • 方法二:使用分布式锁,例如 Redission.lock(uuid)

3.3 如何做到顺序消费

顺序消费的使用场景并不多,因为会牺牲较多的性能。

  • 发送方:ack 不能设置为 0(否则可能会丢失消息),关闭重试并使用同步发送(重试可能会导致消息重复,异步发送会导致消息发送顺序不一致),消息发送成功后才会发送下一条,以保证消息的顺序发送
  • 消费方:消息都是发送到一个 partition 中,只能有一个消费者来消费该 partition 的消息

3.4 消息积压

当消费者的消费速度远远跟不上生产者的消息生产速度,那么 kafka 中就会有大量的消息没有被消费,消费者寻址的性能也会越来越差,最后导致服务雪崩。

  • 方法一:使用多线程
  • 方法二:创建多个消费者组和多个消费者,一起消费
  • 方法三:创建一个消费者,该消费者新建一个 topic,并划分多个分区,多个分区再划分给多个消费者,这时候该消费者将消息拉取下来但不进行消费,而是放在新的 topic 上让多个消费者进行消费。

3.5 延时队列

如果在订单创建成功后 30 分钟内没有付款,则自动取消订单,在这就可以通过延时队列来实现。

方法:

  • kafka 创建相应 topic
  • 消费者轮询消费该 topic 的消息
  • 消费者判断该 topic 的创建时间与当前时间是否超过 30 分钟(前提是订单未支付)
    • 如果是:在数据库中修改订单状态为取消
    • 如果不是:记录当前消息的 offset,并不再继续消费

四、kafka-eagle监控平台

  1. 下载 kafka-eagle
1
curl -O https://github.com/smartloli/kafka-eagle-bin/archive/v2.0.8.tar.gz
  1. 修改 system-config.properties
1
2
3
4
5
6
7
8
# zk地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:12181
# mysql地址
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=toortoor
  1. 修改环境变量
1
2
3
4
5
6
7
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.el7_9.x86_64
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar.:$JAVA_HOME/lib/dt.jar.:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin
export KE_HOME=/root/kafka/kafka-eagle/efak-web-2.0.8
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
  1. 访问

kafka-eagle