第一章 概述

1.1 定义

传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

z最新定义:Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

采集流程:

前端浏览、点赞、收藏、评论—>日志服务器(Log—>Flume)—>kafka集群(按照topic进行存储)—>Hadoop(上传速度100m/s左右)

日常:Flume采集速度,小于100m/s。

11.11 活动:Flume采集速度,大于200m/s。

1.2 消息队列

目前主要常用的消息队列有:RabbitMQ、RocketMQ、ActiveMQ、Kafka。

在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

image.png

1.3 传统消息队列的应用场景

传统消息队列主要应用场景:缓存/削峰填谷、解耦、异步通信。

缓存/削峰填谷

缓存/削峰填谷:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

image.png

解耦

**解耦:**允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

image.png

异步通信

异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

image.png

1.4 消息队列的两种模式

消息队列分为两种模式:点对点模式(不可以重复消费)、发布/订阅模式(可以重复消费)

两种消费模式也能分为两种:push模式(由消息队列推送)和pull模式(由消费者主动询问)

注意:kafka只支持pull模式。RabbitMQ支持push和pull,但是默认是push。

1.4.1 点对点模式

消费者主动拉取数据,消息收到后清除消息。

image.png

1.4.2 发布/订阅模式

  • 可以有多个topic主题(浏览、点赞、收藏、评论)
  • 消费者消费数据后,不删除数据
  • 每个消费者相互独立,都可以消费数据。

image.png

1.5 kafka基础架构

架构图:

image.png

1.Producer:消息生产者,就是向 Kafka broker 发消息的客户端。

2.Consumer:消息消费者,向 Kafka broker 取消息的客户端。

3.Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

4.Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多topic。

5.Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。

6.Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。一个消费者组同时只能有一个消费者消费这个partition。

7.Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower。

8.Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。

9.Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

在kafka2.8.0之前,是需要Zookeeper去存储leader、partition等信息。但是在kafka2.8.0之后,可以选择不使用Zookeeper。

第二章 入门

2.1 安装zookeeper

2.1.1 下载zookeeper

网址为:https://archive.apache.org/dist/zookeeper/

下载的版本为:apache-zookeeper-3.5.7-bin.tar.gz

2.1.2 解压并重命名

在centosC机器上解压到指定目录:

[root@CentosB software]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /usr/local/software/

重命名:

[root@CentosB software]# mv apache-zookeeper-3.5.7-bin/ zk-3.5.7

2.1.3 配置myid

在zk-3.5.7目录下创建zkData文件夹,下面创建myid文件。里面输入3。

[root@CentosB zk-3.5.7]# mkdir zkData
[root@CentosB zk-3.5.7]# cd zkData/
[root@CentosB zkData]# vim myid
[root@CentosB zkData]# cat myid 
3

2.1.4 分发zk到centosA、centosB

 scp -r zk-3.5.7/ root@centosA:/usr/local/software/zk-3.5.7/
 scp -r zk-3.5.7/ root@centosB:/usr/local/software/zk-3.5.7/

修改centosA的myid为1,修改centosB的myid为2.

2.1.5 修改zoo.cfg文件

进入/usr/local/software/zk-3.5.7/conf目录,复制一份zoo.sample.cfg为zoo.cfg。

[root@localhost conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
[root@localhost conf]# ll
总用量 16
-rw-r--r--. 1 root root  535 10月 28 16:04 configuration.xsl
-rw-r--r--. 1 root root 2712 10月 28 16:04 log4j.properties
-rw-r--r--. 1 root root  922 10月 28 16:11 zoo.cfg
-rw-r--r--. 1 root root  922 10月 28 16:04 zoo_sample.cfg

修改zoo.cfg文件,主要修改dataDir目录和添加集群信息。

外网连接的时候,需要修改为:

server.1=ip:2888:3888

本机则要修改为0.0.0.0

server.2=0.0.0.0:2888:3888

dataDir=/usr/local/software/zk-3.5.7/zkData

server.1=centosA:2888:3888
server.2=centosB:2888:3888
server.3=centosC:2888:3888

修改剩余的两台机器。

解读:server.3=centosC:2888:3888

  • 3代表myid
  • centosC:在/etc/hosts中做了映射,主机IP
  • 2888:用来follower和leader交换信息
  • 3888:用来选举leader使用

如果是使用的虚拟机,则需要加下面

quorumListenOnAllIPs=true

2.1.6 修改环境变量

在~/.bashrc中添加:

export ZOOKEEPER=/usr/local/software/zk-3.5.7
export PATH=$PATH:$ZOOKEEPER/bin
source ~/.bashrc

2.1.7 启动命令

注意:由于修改了环境变量,所以直接在控制台输入zk之后。

zkServer.sh help 查看帮助

[root@CentosC zkData]# zkServer.sh help
JMX enabled by default
Using config: /usr/local/xqm/zookeeper/zookeeper-3.4.6/bin/../conf/zoo.cfg
Usage: /usr/local/xqm/zookeeper/zookeeper-3.4.6/bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}

start(启动)、start-foreground(前台启动)、stop(停止)、restart(重启)status(查看状态)

2.2 安装kafka部署

2.2.1 集群规划

机器 zk zookeeper安装路径 kafka kafka安装路径 安装包存放路径
192.168.49.135 zk1 /usr/local/software/zk kafka1 /usr/local/software/kafka /usr/local/software
192.168.49.136 zk2 /usr/local/software/zk kafka2 /usr/local/software/kafka /usr/local/software
192.168.49.137 zk3 /usr/local/software/zk kafka3 /usr/local/software/kafka /usr/local/software

2.2.2 集群安装

1.官方下载地址 & 前置准备

http://kafka.apache.org/downloads.html

1.下载kafka

版本为:kafka_2.12-3.0.0.tgz。3.0.0为kafka版本号,2.12是scala版本,因为kafka是用scala语言编写的。

2.设置三台机器的hostname和hosts

设置hosts:

vim /etc/hosts

分别新增:

192.168.49.135 centosA centosA
192.168.49.136 centosB centosB
192.168.49.137 centosC centosC

设置hostname:

vim /etc/hostname

分别修改为:centosA、centosB、centosC

3.关闭防火墙

systemctl status firewalld
systemctl stop firewalld
systemctl disabled firewalld

4.安装java

通过rpm安装java,版本为jdk-8u191-linux-x64.rpm。

先卸载自带的JDK

rpm -qa | grep jdk
rpm -e --nodeps copy-jdk-configs-3.3-10.el7_5.noarch

安装

rpm -ivh jdk-8u191-linux-x64.rpm
自动安装在/usr/java
ls /usr/

修改jdk的环境变量

vim ~/.bashrc
JAVA_HOME=/usr/java/latest
PATH=$PATH:$JAVA_HOME
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH

加载环境变量

source ~/.bashrc

2.解压缩

先将下载好的压缩包放到/usr/local/software路径下

[root@CentosC software]# pwd
/usr/local/software
[root@CentosC software]# ll
总用量 84460
drwxr-xr-x. 2 root root        6 10月 28 11:16 kafka
-rw-rw-rw-. 1 root root 86486610 2月   9 2022 kafka_2.12-3.0.0.tgz
drwxr-xr-x. 2 root root        6 10月 28 11:16 zk

解压安装包

[root@CentosC software]# tar -zxvf kafka_2.12-3.0.0.tgz -C /usr/local/software/kafka

3.重命名文件夹

[root@CentosC kafka]# mv kafka_2.12-3.0.0/ kafka
[root@CentosC kafka]# ll
总用量 0
drwxr-xr-x. 7 root root 105 9月   9 2021 kafka

4.进入/usr/local/software/kafka/kafka/config目录下,修改配置文件

[root@CentosC kafka]# cd kafka/
[root@CentosC kafka]# ll
总用量 64
drwxr-xr-x. 3 root root  4096 9月   9 2021 bin
drwxr-xr-x. 3 root root  4096 9月   9 2021 config
drwxr-xr-x. 2 root root  8192 10月 28 13:30 libs
-rw-r--r--. 1 root root 14521 9月   9 2021 LICENSE
drwxr-xr-x. 2 root root   262 9月   9 2021 licenses
-rw-r--r--. 1 root root 28184 9月   9 2021 NOTICE
drwxr-xr-x. 2 root root    44 9月   9 2021 site-docs
[root@CentosC kafka]# cd config/
[root@CentosC config]# ll
总用量 72
-rw-r--r--. 1 root root  906 9月   9 2021 connect-console-sink.properties
-rw-r--r--. 1 root root  909 9月   9 2021 connect-console-source.properties
-rw-r--r--. 1 root root 5475 9月   9 2021 connect-distributed.properties
-rw-r--r--. 1 root root  883 9月   9 2021 connect-file-sink.properties
-rw-r--r--. 1 root root  881 9月   9 2021 connect-file-source.properties
-rw-r--r--. 1 root root 2103 9月   9 2021 connect-log4j.properties
-rw-r--r--. 1 root root 2540 9月   9 2021 connect-mirror-maker.properties
-rw-r--r--. 1 root root 2262 9月   9 2021 connect-standalone.properties
-rw-r--r--. 1 root root 1221 9月   9 2021 consumer.properties
drwxr-xr-x. 2 root root  102 9月   9 2021 kraft
-rw-r--r--. 1 root root 4674 9月   9 2021 log4j.properties
-rw-r--r--. 1 root root 1925 9月   9 2021 producer.properties
-rw-r--r--. 1 root root 6849 9月   9 2021 server.properties
-rw-r--r--. 1 root root 1032 9月   9 2021 tools-log4j.properties
-rw-r--r--. 1 root root 1169 9月   9 2021 trogdor.conf
-rw-r--r--. 1 root root 1205 9月   9 2021 zookeeper.properties

修改server.properties文件为:

主要修改broker.id、listeners、log.dirs、zookeeper.connect四个参数

broker.id不能相同

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=2
listeners=PLAINTEXT://centosC:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
#配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=centosA:2181,centosB:2181,centosC:2181/kafka

5.复制到其他两台机器

由于一开始是在centosC上解压的,现在需要把整个kafka文件夹复制到centosA、centosB机器上。

[root@CentosC kafka]# scp -r ./kafka/ root@centosA:/usr/local/software/kafka/kafka
[root@CentosC kafka]# scp -r ./kafka/ root@centosB:/usr/local/software/kafka/kafka

6.修改其他两条机器

首先要在三台机器上的/usr/local/software/kafka/kafka下创建datas目录,作为日志和存储的文件夹。

其次要修改centosA和centosB机器的配置文件中的broker.id分别为0和1。

还有listener这个参数。

注:broker.id 不得重复,整个集群中唯一。

7.配置环境变量

环境变量有系统级和用户级。

## 废弃
这次是在/etc/profile.d目录下新建自己的配置文件:my_env.sh
source /etc/profile

需要在~/.bashrc文件中追加:

输入:

#KAFKA_HOME
export KAFKA_HOME=/usr/local/software/kafka/kafka
export PATH=$PATH:$KAFKA_HOME/bin

刷新环境变量:

[root@localhost profile.d]# source ~/.bashrc

同样在centosA、centsoB、centosC三台机器上进行配置环境变量和刷新环境变量。

测试是否添加成功:

[root@CentosB config]# echo $KAFKA_HOME
/usr/local/software/kafka/kafka

8.启动kafka

在启动kafka之前一定要先启动zookeeper。

先到kafka目录下

[root@localhost kafka]# pwd
/usr/local/software/kafka/kafka

启动

root@localhost kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties 

查看是否启动成功,出现kafka则代表启动成功。

[root@localhost kafka]# jps
37122 Jps
37052 Kafka
36285 QuorumPeerMain

启动和关闭kafka脚本

case $1 in 
"start")
 /usr/local/software/kafka/kafka/bin/kafka-server-start.sh -daemon  /usr/local/software/kafka/kafka/config/server.properties
;;
"stop")
  /usr/local/software/kafka/kafka/bin/kafka-server-stop.sh
;; 
esac

注意:停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

2.3 kafka命令行操作

2.3.1 基础架构

image.png

2.3.2 topic命令行操作

2.3.2.1 查看topic所有命令行

topic的所有命令都是用./bin/kafka-topics.sh文件来实现的。

下面这条命令会列举出所有的命令行命令

[root@centosA kafka]# ./bin/kafka-topics.sh 
参数 描述
–bootstrap-server<String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic<String: topic> 操作的 topic 名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题
–list 查看所有主题。
–describe 查看主题的详细描述
–partitions<Integer: num of partitions> 设置分区数
–replication-factor<Integer: replication factor> 设置分区副本
–config<String: name=value> 更新系统的默认配置

2.3.2.2 显示所有的topic

连接kafka时多台之间可以用逗号隔开,一个集群,只需要连接一个kafka就能查询到所有。

[root@centosA kafka]# pwd
/usr/local/software/kafka/kafka
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092,centosB:9092 --list

2.3.2.3 创建一个topic

创建主题的时候要指定分区数和副本数。

[root@centosA kafka]# pwd
/usr/local/software/kafka/kafka
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic firstTopic --create --partitions 1 --replication-factor 3
Created topic firstTopic.
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --list
firstTopic

2.3.2.4 查看主题的详细描述

能够显示出主题名、分区数、副本数、主题ID、大小配置、leader。

kafka分区存储是按1G的数量不断的切割。假如有10G,会切割成10个1G来存储。

下面就显示了第0个分区的leader是broker为2的机器。

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic firstTopic --describe
Topic: firstTopic       TopicId: WKknbh9vShOcc4LaM-qSng PartitionCount: 1  
 ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: firstTopic       Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

2.3.2.5 修改topic

修改topic使用alter命令,修改分区数只能增加,不能减少。

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic firstTopic  --alter --partitions 2
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic firstTopic  --list
firstTopic
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic firstTopic  --describe
Topic: firstTopic       TopicId: WKknbh9vShOcc4LaM-qSng PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: firstTopic       Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
        Topic: firstTopic       Partition: 1    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

2.3.2.6 删除topic

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic twoTopic --create --partitions 1 --replication-factor 3
Created topic twoTopic.
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic twoTopic --list
twoTopic
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --topic twoTopic --delete
[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092  --list
firstTopic

2.3.3 生产者命令行操作

2.3.3.1 查看生产者所有命令行操作

同样的,生产者命令是通过./bin/kafka-console-producer.sh脚本来实现的。

[root@centosA kafka]# ./bin/kafka-console-producer.sh 
参数 描述
–bootstrap-server<String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic<String: topic> 操作的 topic 名称。

2.3.3.2 发送消息

[root@centosA kafka]# ./bin/kafka-console-producer.sh --bootstrap-server centosA:9092 --topic firstTopic
>hello
>I will send you first message

2.3.4 消费者命令行操作

2.3.4.1 查看所有消费者命令行

消费者命令是通过./bin/kafka-console-consumer.sh脚本来实现的。

[root@centosB kafka]# ./bin/kafka-console-consumer.sh 
参数 描述
–bootstrap-server<String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic<String: topic> 操作的 topic 名称。
–from-beginning 从头开始消费。
–group<String: consumer group id> 指定消费者组名称。
–partition<Integer: partition> 指定从哪个分区进行消费
–offset<String: consume offset> 指定从哪个位置开始消费

注意:消费者默认是从最新latest位置开始消费,如果想要从头消费,那么就必须使用–from-beginning命令。

2.3.4.2 从开始的位置消费

注意:如果设置了多个分区,那么分区之间的消息是无序的,分区的内部是有序的。

下面这个消息因为在不同的分区,因此消费是无序的。

[root@centosB kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server centosA:9092 --topic firstTopic --from-beginning
I will send you first message
hello

2.3.4.3 从指定分区的偏移量开始消费

消息:

[root@centosA kafka]# ./bin/kafka-console-producer.sh --bootstrap-server centosA:9092 --topic firstTopic
>hello
>I will send you first message
>hello
>hello1
>o
>1
>0
>1

分区0的消息为:hello、hello、o、0。

[root@centosB kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server centosA:9092 --topic firstTopic  --partition 0 --offset 2
o
0

第三章 kafka生产者

3.1 生产者消息发送流程

3.1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka Broker。

发送流程:

1.由main线程创建Producer对象,调用send方法发送一条数据,经过拦截器(可选)、序列化器(将消息序列化,自己自带的序列化机制,不用java的序列化)、分区器(负责挑选消息应该发送到哪个分区)。最后记录到(缓存队列)双端队列中。队列默认大小是32M,每个批次的数据默认是16K。

2.如果消息到达了batch.size或者linger.ms,那么就直接由sender线程主动拉取数据,数据以节点的形式发送到Broker。默认每个Broker节点缓存5个请求。

发送数据之后,由Broker进行ack应答。

如果发送失败,可以进行重试,重试的次数retries默认是int的最大值,是可以进行调整的。

如果发送成功,那么就会清理到双端队列中的数据。

image.png

3.1.2 生产者重要的参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地 址 。比如centosA:9092,centosB:9092。<br />这里并不需要所有的broker地址,因为生产者会从指定的broker中寻找其他的Broker消息
key.serializer value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。<br />1:生产者发送过来的数据,Leader 收到数据后应答。<br />-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

3.2 异步发送API

3.2.1 同步发送和异步发送

同步发送:同步就是逐条发送。用户线程选择同步,效果是逐条发送,因为请求队列InFlightRequest中永远最多有一条数据。异步+设置 后台线程的异步发送参数:max.in.flight.requests.per.connection=1 & batch.size=1,效果也是逐条发送。一定是逐条发送的,第一条响应到达后,才会请求第二条

异步发送:异步就是批量发送。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。也就是异步是不需要等响应就可以发送第二批

异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次。

3.2.2 普通异步发送,生产者代码编写

普通的发送方法:send(ProducerRecord)

(1).创建工程

(2).导入kafka客户端依赖

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>

(3).创建生产者类

创建生产者主要步骤:1.连接(创建key和value的序列化) 2.创建生产者 3.创建record,发送消息 4.关闭资源

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record =null;
        for (int i = 1; i <= 10; i++) {
            record = new ProducerRecord<String, String>("firstTopic", "this is send message" + i);
            producer.send(record);
        }
        // 3.关闭资源
        producer.close();
    }
}

测试:

运行java类。在服务器上可以看到下面的信息,说明生产者生产了数据:

注意:必须要先登录消费者,因为是从最新的消息进行消费的。

[root@centosB kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server centosB:9092 --topic firstTopic 
this is send message1
this is send message2
this is send message3
this is send message4
this is send message5
this is send message6
this is send message7
this is send message8
this is send message9
this is send message10

3.2.3 带回调函数的异步发送方法

带回调函数的发送方法:send(ProducerRecord,Callback)

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record = null;
        for (int i = 1; i <= 10; i++) {
            record = new ProducerRecord<String, String>("firstTopic", "this is send message" + i);
            // 带回调函数的异步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.print("发送的主题为" + metadata.topic()+" ");
                        System.out.print("发送的分区为" + metadata.partition()+" ");
                        System.out.print("发送的偏移量为" + metadata.offset()+" ");
                        System.out.println();
                    }
                }
            });
        }
        // 3.关闭资源
        producer.close();
    }
}

控制台接收到的消息为:

this is send message1
this is send message2
this is send message3
this is send message4
this is send message5
this is send message6
this is send message7
this is send message8
this is send message9
this is send message10

回调函数的输出为:

发送的主题为firstTopic 发送的分区为0 发送的偏移量为14 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为15 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为16 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为17 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为18 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为19 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为20 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为21 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为22 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为23 

3.3 同步发送API

3.3.1 同步发送

同步发送和异步发送的代码区别只在于send的时候,加上get就是同步,不加get就是异步。

 producer.send(record).get();

全部代码为:

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 同步发送消息
 */
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record = null;
        for (int i = 1; i <= 10; i++) {
            record = new ProducerRecord<String, String>("firstTopic", "this is send message" + i);
            // 同步发送
            producer.send(record).get();
        }
        // 3.关闭资源
        producer.close();
    }
}

3.4 生产者分区

3.4.1 分区的好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一

块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

image.png

3.4.2 生产者发送消息的分区策略

3.4.2.1默认的分区器 DefaultPartitioner

分为下面三种情况。

image.png

3.4.2.2 第一种-指定分区

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * 指定partition分区
 */
public class CustomProducerCallbackPartition {
    public static void main(String[] args) {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record = null;
        for (int i = 1; i <= 5; i++) {
            // 设置分区为1,key为""
            record = new ProducerRecord<String, String>("firstTopic",1,"", "this is send message" + i);
            // 带回调函数的异步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.print("发送的主题为" + metadata.topic() + " ");
                        System.out.print("发送的分区为" + metadata.partition() + " ");
                        System.out.print("发送的偏移量为" + metadata.offset() + " ");
                        System.out.println();
                    }
                }
            });
        }
        // 3.关闭资源
        producer.close();
    }
}

控制台打印:

发送的主题为firstTopic 发送的分区为1 发送的偏移量为14 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为15 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为16 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为17 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为18 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为19 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为20 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为21 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为22 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为23 

3.4.2.3 第二种-不指定分区,指定key

不指定分区,指定key的情况下,就获得key的hash值,对分区数进行取模,获得的值就是消息进入分区的值。

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * 指定partition分区
 */
public class CustomProducerCallbackPartitionKey {
    public static void main(String[] args) {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record = null;
        for (int i = 1; i <= 5; i++) {
            // 不设置分区,消息发送到分区就是按key的hash值对分区数进行取模
            record = new ProducerRecord<String, String>("firstTopic",String.valueOf(i), "this is send message" + i);
            // 带回调函数的异步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.print("发送的主题为" + metadata.topic() + " ");
                        System.out.print("发送的分区为" + metadata.partition() + " ");
                        System.out.print("发送的偏移量为" + metadata.offset() + " ");
                        System.out.println();
                    }
                }
            });
        }
        // 3.关闭资源
        producer.close();
    }
}

控制台打印的数为:

发送的主题为firstTopic 发送的分区为0 发送的偏移量为34 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为35 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为24 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为25 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为26 

消费者接收到的消息为:

this is send message2
this is send message5
this is send message1
this is send message3
this is send message4

3.4.2.4 自定义分区器

需求:如果发送的消息中包含kafka,那就发往0分区,如果不包含,那就发往1分区。

实现步骤:

  • 1.定义一个类,实现 Partitioner 接口
  • 2.重写partition方法

自定义的分区器为:

package com.xqm.kafka.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {

    /**
     * 返回消息对应的分区
     *
     * @param topic      主题
     * @param key        消息的key
     * @param keyBytes   消息的key序列化后的字节数组
     * @param value      消息的value
     * @param valueBytes 消息的value序列化后的字节数组
     * @param cluster    集群的元数据,可以查看分区等信息
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取数据的value
        String s = value.toString();
        if (s.contains("kafka")) {
            return 0;
        }
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

测试代码为:

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * 使用自己自定义的分区器
 */
public class CustomProducerOwnPartition {
    public static void main(String[] args) {

        // 0.连接
        Properties properties = new Properties();

        // 设置bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092,centosC:9092");

        // 指定key和value的序列化,必须要配置
        // key.serializer,这两个等价,可以使用全限定类名,或者使用getName
        // properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


        // 关联自己自定义的分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.xqm.kafka.config.MyPartitioner");


        // 指定的topic
        // 1.创建kafka生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);


        // 2.发送消息
        // 可以指定topic、partition、timestamp、key、value
        ProducerRecord<String, String> record = null;
        for (int i = 1; i <= 5; i++) {
            String message = "";
            if (i % 2 == 0) {
                message = "kafka";
            }
            record = new ProducerRecord<String, String>("firstTopic", message + i);
            // 带回调函数的异步发送
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.print("发送的主题为" + metadata.topic() + " "+"发送的分区为" + metadata.partition() + " ");
                        System.out.print("发送的偏移量为" + metadata.offset() + " ");
                        System.out.println();
                    }
                }
            });
        }
        // 3.关闭资源
        producer.close();
    }
}

控制台打印的结果为:

发送的主题为firstTopic 发送的分区为0 发送的偏移量为36 
发送的主题为firstTopic 发送的分区为0 发送的偏移量为37 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为27 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为28 
发送的主题为firstTopic 发送的分区为1 发送的偏移量为29 

消费者接收到的数据为:

kafka2
kafka4
1
3
5

可以看到发送的2和4都包含kafka,发送到0分区。其他的发送到1分区。

3.5 生产者如何提高吞吐量

3.5.1 默认

  • RecordAccumulator:缓冲区大小,默认是32M
  • batch.size:批次大小,默认是16K
  • linger.ms:等待时间,默认是0ms
  • compression.type:压缩snappy

3.5.2 提高吞吐量

  • 1.修改RecordAccumulator缓冲区的大小,修改为64M,这样异步发送的时候,消息就能一次性发送更多
  • 2.修改batch.size,变得更大,那么每次发送给Broker的数据就会更多,如果太大,延迟也会更高
  • 3.修改linger.ms,等待时间可以修改为5-100ms,和批次大小同理
  • 4.开启压缩,那么相同的空间,存储的数据也会更多

3.5.3 开启的代码

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProductorParamter {
    public static void main(String[] args) {
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092.centosB:9092,centosC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 设置缓冲区大小 buffer.memory 33554432是32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);

        // 批次大小,默认是16k
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms,默认是0ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,10);

        // 开启压缩,compression.type,默认是none,可以设置为gzip,snappy,lz4,zstd
        //<code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>zstd</code>
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<String, String>("firstTopic","message"+i));
        }

        producer.close();
    }
}

3.6 发送数据的可靠性

3.6.1 ack应答机制

数据发送到Broker有三种ack应答机制:

  • 0:将ack设置为0,代表生产者发送过来的数据,不需要等数据落盘就可以发送下一批数据。这种机制发送数据的效率最高,但是最容易丢数据,一般都不会用的
  • 1:将ack设置为1,代表生产者发送过来的数据,需要等leader收到之后应答,才能继续发送下一批数据
  • -1/all:将ack设置为-1或者all,代表发送过来的数据,必须等leader收到并且所有的follower都同步之后,才能发送下一批数据,这种是最安全的,但是效率最低。因为如果一个follower挂掉之后,会导致整个集群都无法接收数据。

分析:

image.png

当ack=-1时,怎么才能让一个follower挂掉之后不影响整个集群?

使用一个ISR队列来解决,如果一个follower长时间没有通讯,就直接将follower从队列中踢掉。

image.png

可靠性总结:

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,

对可靠性要求比较高的场景。

3.6.2 代码设置ack

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * ack应答机制
 */
public class CustomProductorACK {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092.centosB:9092,centosC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // ack应答,默认是all
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        // 配置重试次数,默认是 int 最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG,3);


        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<String, String>("firstTopic", "message" + i));
        }

        producer.close();
    }
}

3.7 如何解决消息的重复发送

3.7.1 消息重复前景

生产者发送过来数据,leader和follower都已经同步了,这时候将要进行ack应答的时候,突然leader挂掉了,就会重新选举一个leader,继续进行接收上一个数据,就会导致数据的重复发送。

image.png

3.7.2 数据的传递语义

  • 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2。含义就是,生产者发送数据,集群收到至少一次。
  • 最多一次(At Most Once)= ACK级别设置为0。集群收到生产者发送的数据,最多收到一次。
  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务

总结:

At Least Once可以保证数据不丢失,但是不能保证数据不重复;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

3.7.3 幂等性

含义:多次请求和一次请求造成的影响相同。

kafka中的幂等性:在这里就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其

PID(Producer id)是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

注意:幂等性只能保证的是在单分区单会话内不重复

如果要多分区,就必须将所有分区的数拉取过来,进行统一的排序。

image.png

3.7.4 开启幂等性

开启参数 enable.idempotence 默认为 true,false 关闭。

// 幂等性 enable.idempotence,默认为true
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

3.7.5 事务

幂等性只能保证单分区、单会话的消息不重复。

事务:要想开启事务,必须先开启幂等性。

image.png

3.7.6 事务的API

事务一共有五个API。

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

3.7.7 测试事务

记住:必须要设置事务ID.

在第五次的时候,设置1/0,那么就会发生异常,消息也就不会发送到broker了。

但是如果不加事务的话,就会发送前4条到消息队列,第五条才会拒绝发送到消息队列。

package com.xqm.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * 事务
 */
public class CustomProductorTransaction {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092.centosB:9092,centosC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 必须要设置事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_2");


        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // 初始化事务
        producer.initTransactions();
        // // 启动事务
        producer.beginTransaction();

        try {
            // 业务流程
            for (int i = 0; i < 5; i++) {
                if (i == 4) {
                    int a = 1 / 0;
                }
                producer.send(new ProducerRecord<String, String>("firstTopic", "message" + i));
            }
            // 业务完成之后,就提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            //  终止事务
            producer.abortTransaction();
        } finally {
            producer.close();
        }

    }
}

3.8 消息的有序问题

3.8.1 消息的有序性

在同一个分区内,消息是有序的,但是对于不同的分区来说,消息是无序的。

image.png

如果需要全部分区有序,那就得

3.8.2 如何保证顺序的有序性

1.kafka在1.x版本之前保证数据单分区有序,条件如下:

max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

2.kafka在1.x及以后版本保证数据单分区有序,条件如下:

(1)未开启幂等性

max.in.flight.requests.per.connection需要设置为1。

(2)开启幂等性

max.in.flight.requests.per.connection需要设置小于等于5。

原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,

故无论如何,都可以保证最近5个request的数据都是有序的。

image.png

3.9 自动创建topic

生产者使用send方法发送数据到kafka,如果kafka中没有这个主题,那么会自动创建这个主题(默认可以,可以在配置文件中关闭),但是创建的主题,默认是1个分区一个副本。

所以想要改变创建的默认topic的分区数和副本数,需要修改配置文件。

num.partitions=3
auto.create.topics.enable=true
default.replication.factor=3

第四章 Kafka Broker

4.1 kafka Broker工作流程

4.1.1 zookeeper中存储的kafka信息

1.启动zookeeper客户端

通过zkCli.sh或者bin下面的./zkCli.sh来启动zookeeper客户端

[root@centosC bin]# pwd
/usr/local/software/zk-3.5.7/bin
[root@centosC bin]# zkCli.sh 

2.可以通过ls命令查看根节点下信息

[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, brokers, zookeeper, kafka, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

3.通过ls命令,查看kafka下面信息

[zk: localhost:2181(CONNECTED) 1] ls /kafka
[cluster, controller_epoch, controller, brokers, feature, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

4.查看/kafka/brokers/ids下面的主机信息

0,1,2代表三台brokers

[zk: localhost:2181(CONNECTED) 4] ls /kafka/brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
[0, 1, 2]

5.使用zookeeper可视化工具prettyZoo来查看信息

image.png

6.zookeeper中kafka下存储的数据

1./kafka/brokers/ids

[0,1,2] 记录有哪些服务器

2./kafka/brokers/topics/first/partitions/0/state

{“leader”:1 ,“isr”:[1,0,2] } 记录谁是Leader,有哪些服务器可用

3./kafka/controller

{“brokerid”:0}辅助选举Leader

示意图如下:

image.png

4.1.2 broker总体工作流程

1.broker启动的时候,会在zookeeper中注册,存储在/kafka/brokers/ids中。

2.每个broker中都有controller模块,哪个controller模块先抢占注册到zk中,谁就来进行辅助选举。

3.由选举出来的controller监听brokers节点变化。

4.controller决定leader选举。

AR概念:kafka分区中所有副本统称。

选举规则:在isr中存活为前提,按照在AR中排在前面的优先。比如在isr中是[0,1,2],三个副本都存在,在AR中是[1,0,2],那么优先的选举规则就是1号broker是leader,然后问询0号broker,最后2号broker。

5.选举完成后,controller会将节点信息(leader和isr)上传到zookeeper中,其他controller会拉取zookeeper中信息进行同步。

6.假设broker1中leader挂掉,controller会监听到节点变化。

7.controller会从zk中获取到isr信息,然后选举新的leader(还是按照之前的选举规则,isr中存活为前提,AR中按顺序)

总体流程为:

image.png

测试:

如果停掉其中一台机器的kafka,那么zookeeper中/kakka/brokers/ids就会删除这台机器的broker_id

4.1.3 broker中重要参数

参数名称 重要参数
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

4.2 生产环境下新增节点和删除broker

4.2.1 服役新节点

4.2.1.1.节点准备

在这里是使用的虚拟机。

(1).关闭centosC机器,并进行全量复制为centosD。

(2).开启centosD,修改ip地址和hosts、hostname。

[root@hadoop104 ~]# vim /etc/sysconfig/network-scripts/ifcfgens33
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.105
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2
[root@centosD ~]# vim /etc/hostname
centosD

(3).修改centosD上的kafka的broker_id。并删除datas(自己创建的目录)下的log日志

(4).启动kafka集群,单独启动centosD的kafka

4.2.1.2 执行负载均衡操作

1.创建要均衡的主题

[atguigu@hadoop102 kafka]$ vim topics-to-move.json
{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

2.生成一个负载均衡的计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate


Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","
any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}

3.创建副本存储计划

[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","
any","any"]}]}

4.执行副本计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

5.验证副本存储计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify


Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

4.2.2 退役旧节点

4.2.2.1 执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

(1)创建一个要均衡的主题。

[atguigu@hadoop102 kafka]$ vim topics-to-move.json
{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

(2) 创建执行计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate



Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}

(3).创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。

[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

{"version":1,"partitions":[{"topic":"first","partition":0,"replic
as":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","par
tition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"to
pic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","
any","any"]}]}

(4).执行副本存储计划。

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

(5).验证副本存储计划。

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify


Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

4.2.2.2 执行停止命令

[atguigu@hadoop105 kafka]$ bin/kafka-server-stop.sh

4.3 副本

4.3.1 副本基本信息

(1)Kafka 副本作用:提高数据可靠性。

(2)Kafka 默认副本 1 个,生产环境一般配置为 2 个或以上,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。

(4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

AR = ISR + OSR

ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本

4.3.2 leader选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。

Controller 的信息同步工作是依赖于 Zookeeper 的。

image.png

验证:

1.创建first主题,三个分区,三个副本。

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --create  --topic first --partitions 3 --replication-factor 3 
Created topic first.

2.查看创建的first主题的详细信息

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --describe  --topic first 
Topic: first    TopicId: rDdXb2CHQVSbo2GtD7aKrQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: first    Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: first    Partition: 2    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0

3.停掉broker_id为2的机器,然后查看first主题的详细信息

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --describe  --topic first 
Topic: first    TopicId: rDdXb2CHQVSbo2GtD7aKrQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0
        Topic: first    Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1
        Topic: first    Partition: 2    Leader: 1       Replicas: 2,1,0 Isr: 1,0

发现Partition为2的leader由2变成了1.也就是通过controller选举之后,根据Replicas的排列,2 之后是1。

4.恢复broker_id为2的机器,然后查看first主题的详细信息

发现leader并没有改变,只是ISR中机器恢复了。

[root@centosA kafka]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --describe  --topic first 
Topic: first    TopicId: rDdXb2CHQVSbo2GtD7aKrQ PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
        Topic: first    Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2
        Topic: first    Partition: 2    Leader: 1       Replicas: 2,1,0 Isr: 1,0,2

4.3.3 leader和follower故障处理

4.3.3.1 follower故障

LEO和HW(高水位线)

image.png

image.png

消费者能够见到的最大的offset就是LEO-4,也就是4。

处理细节:主要通过高水位线和LEO来进行同步

image.png

4.3.3.2 leader故障

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

image.png

4.3.4 分区副本分配

如果kafka服务器只有4个broker,但是创建的分区超过这个数量。那么在底层如何分配存储副本。

分配的规则就是尽量按照错开均匀分配。

1.创建16个分区,4个副本。创建一个新的topic,名称为second。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --create --partitions 16 --replication-factor 3 --
topic second

2.查看分区和副本情况

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
hadoop102:9092 --describe --topic second
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

每一轮都会错开偏移。

比如第一轮错开一位:

1 2 3 4
L F F
  L F F
F   L F
F F   L

第二轮错开两位,第三轮错开三位。

image.png

4.3.5 手动进行分区分配

如果服务器的配置不一样,那么就得让配置好的服务器,分配的分区更多。这就需要自己手动来分配分区了。

image.png

1.创建一个新的topic,叫three

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --
topic three

2.查看分区副本存储情况

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

3.创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

输入如下内容:

{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}

4.执行副本存储计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

5.验证副本存储计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify

6.再次查看分区副本存储情况

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three

4.3.6 Leader Partition 负载平衡

正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡

image.png

有三个参数能够启影响负载均衡:

  • auto.leader.rebalance.enable,默认是true。自动Leader Partition 平衡
  • leader.imbalance.per.broker.percentage,默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认值300秒。检查leader负载是否平衡的间隔时间。

举例:

image.png

参数名称 描述
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

4.3.7 增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。

1.创建topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four

2.手动增加副本存储。创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)。

[atguigu@hadoop102 kafka]$ vim increase-replication-factor.json

输入如下内容:

{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

3.执行副本存储计划

[atguigu@hadoop102 kafka]$ bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute

4.4 文件存储

4.4.1 Topic数据的存储机制

Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。segment默认大小为1G

image.png

因为kafka存储的信息不是立刻就删除的,比如7天之后删除。.timeindex时间戳索引文件就是可以用来进行删除过期文件。

可以通过命令来查看index文件和log文件:

 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index

index文件内容为:

Dumping ./00000000000000000000.index
offset: 3 position: 152

查看log文件:

 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

log内容为:

Dumping datas/first-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 
0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid: 
true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 
75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid: 
true
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 
152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid: 
true
baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 
229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid: 
true
baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 
370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid: 
true

4.4.2 index文件和log文件详解

image.png

如何找到offset=600的Record?

先根据相对offset+522,比如上面的65+522=587,小于600,117+522=639大于600,那么就是查询65到117之间的数据,映射到log中就是从563那一行开始往下找,不含那一行。

注意:

1.index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引

参数log.index.interval.bytes默认4kb。

2.Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。

日志存储参数配置:

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

4.4.3 文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka 中提供的日志清理策略有 delete 和 compact 两种。

1.delete:将过期数据删除

  • log.cleanup.policy = delete 所有数据启用删除策略

(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

答:必须等所有都过期才能删除。

2.compact 日志压缩

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

  • log.cleanup.policy = compact 所有数据启用压缩策略

image.png

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大

的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息

集里就保存了所有用户最新的资料。

4.5 高效读写数据

1.Kafka 本身是分布式集群,可以采用分区技术,并行度高

2.读数据采用稀疏索引,可以快速定位要消费的数据

3.顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4.页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。

PageCache页缓存linux内核的缓存。Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

image.png

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。