第五章 kafka消费者

5.1 kafka消费方式

消费方式一共有两种:pull(拉取)模式和push(推送)模式

  • pull模式

consumer采用从broker中主动拉取数据。

Kafka采用这种方式

  • push模式

Kafka没有采用这种方式,因为由broker决定消息发送速率,很难适应所有消费者的

消费速率。例如推送的速度是50m/s,Consumer1、Consumer2就来不及处理消息。

pull模式不足之处是,如 果Kafka没有数据,消费者可能会陷入循环中,一直返回空数据。

注意:kafka只有pull模式,没有push模式。

5.2 消费者的工作流程

注意:同一个消费者组中不能有两个消费者消费同一个分区数据。

5.2.1 消费者总体工作流程

在0.9版本之前,offset保存在zookeeper中。后面迁移到系统主题中保存,为了消费者能不与zk进行多次通信。否则会消耗系统资源。

image.png

5.2.2 消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者 组,即消费者组是逻辑上的一个订阅者。

image.png

5.2.3 消费者组初始化流程

1.coordinator:辅助实现消费者组的初始化和分区的分配。

例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

总结:就是groupid的hashcode%分区数(默认50),得到的值就是分区号,比如得到的是0,那就找0号分区的leader在哪个broker上,这个消费者就去哪个broker消费。

重点:超时时间45s,处理时间:5min,会触发再平衡。

image.png

5.2.4 消费者消费流程

当每批次到达1字节或者到达超时时间,会自动抓取数据。

image.png

5.2.5 消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer<br />value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认 Default: 52428800(50M).消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50M)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

5.3 消费者API

5.3.1 独立消费者(订阅主题)

需求:创建一个独立消费者,消费 first 主题中数据。first主题有三个分区。

image.png

注意:在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class CustomConsumer {
    public static void main(String[] args) {

        // 1.配置
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList=new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 4.消费数据
        while (true){
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}

5.3.2 独立消费者(订阅分区)

需求:创建一个消费者,消费firstTopic主题下的0号分区的内容。

image.png

package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomConsumerPartiton {
    public static void main(String[] args) {

        // 1.配置
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题对应的分区
        List<TopicPartition> topicPartitionList=new ArrayList<>();
        // 订阅firstTopic的0号分区
        topicPartitionList.add(new TopicPartition("firstTopic",0));
        consumer.assign(topicPartitionList);


        // 4.消费数据
        while (true){
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}

5.3.3 消费者组

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。

image.png

只要设置消费者的消费者组ID相同,那么消费者就会在同一个组里。测试的时候发现同一个消费者组中不同消费者会消费不同分区。

下面消费者代码复制三份同时启动即可。

package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomConsumer1 {
    public static void main(String[] args) {

        // 1.配置
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList=new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 4.消费数据
        while (true){
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}

5.4 生产经验-分区的分配以及再平衡

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略

image.png

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。<br />该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy y 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可以选 择的策略包括 : Range 、 RoundRobin 、 Sticky CooperativeSticky

5.4.1Range以及再平衡

理论

Range 是对每个 topic 而言的。

  • 首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序
  • 假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
  • 例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个 topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。

容易产生数据倾斜!

image.png

案例

1.修改主题 first 为 7 个分区。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7

注意:分区数可以增加,但是不能减少。

2.复制 CustomConsumer 类,创建 CustomConsumer2。这样可以由三个消费者CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”,同时启动 3 个消费者。

3.启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区。

说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
 public static void main(String[] args) throws InterruptedException {
 	Properties properties = new Properties();
 	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
 	for (int i = 0; i < 7; i++) {
		kafkaProducer.send(new ProducerRecord<>("first", i, "test", "atguigu"));
 	}
 	kafkaProducer.close();
 	}
}

4.观看 3 个消费者分别消费哪些分区的数据。

发现CustomConsumer消费的是0,1,2三个分区

CustomConsumer1消费的是3,4两个分区

CustomConsumer2消费的是5,6两个分区

再平衡案例

1.停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 3、4 号分区数据。

2 号消费者:消费到 5、6 号分区数据。

0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。(也就是由1号消费者或2号消费者接过了0号消费者的任务)

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

2.再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、1、2、3 号分区数据。

2 号消费者:消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

5.4.2 RoundRobin 以及再平衡

理论

公司中使用比较多。

RoundRobin 针对集群中所有Topic而言。

RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。

image.png

案例

1.依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。

// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

2.重启 3 个消费者,重复发送消息的步骤,观看分区结果。

发现CustomConsumer消费了0,3,6三个分区数据。

CustomConsumer1消费了1,4两个分区数据。

CustomConsumer2消费了2,5两个分区数据。

再平衡案例

1.停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5 号分区数据

2 号消费者:消费到 4、1 号分区数据

0 号消费者的任务会按照 RoundRobin 的方式把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。(比如0和6分给1号消费者,3分给2号消费者)

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

2.再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、2、4、6 号分区数据。

2 号消费者:消费到 1、3、5 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

5.4.3 Sticky 以及再平衡

理论

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。

粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面(和range不同在于分区分配是无序的),在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

案例

需求:设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

1.修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

2.使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

consumer会消费0,1

consumer1会消费2,3,5

consumer会消费4,6

再平衡案例

1.停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

2.再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

5.5 offset偏移量

5.5.1 offset默认维护位置

从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

Kafka0.9版本之前,consumer默认将offset保存在Zookeeper中。

改为内置的原因在于,如果放在zk中,会进行大量的网络通信从而造成效率降低。

image.png

__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个group.id+topic+分区号就保留最新数据。

5.5.2 消费offset案例

1.思想:__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

2.在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

3.采用命令行方式,创建一个新的 topic。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2

4.启动生产者往 atguigu 生产数据。

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092

5.启动消费者消费 atguigu 数据。

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic atguigu --group test

注意:指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。

6.查看消费者消费主题__consumer_offsets。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning


[offset,atguigu,1]::OffsetAndMetadata(offset=7, leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, expireTimestamp=None)

[offset,atguigu,0]::OffsetAndMetadata(offset=8, 
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203, 
expireTimestamp=None)

5.5.3 自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。

自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

image.png

参数名称 描述
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

消费者修改自动offset参数:

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

5.5.4 手动提交offset

5.5.4.1 手动提交

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因

此Kafka还提供了手动提交offset的API。

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相

同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成

功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故

有可能提交失败。

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

image.png

5.5.4.2 同步提交offset

主要两步:

  • 第一步:先在连接配置中关闭自动提交
  • 第二步:在消费完数据后,进行commitSync()
package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomConsumerSync {
    public static void main(String[] args) {

        // 1.配置
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        // 将自动提交修改为false
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList=new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 4.消费数据
        while (true){
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }

            // 手动提交offset,已同步的方式提交
            consumer.commitSync();
        }
    }
}

5.5.4.3 异步提交offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

主要两步:

  • 第一步:先在连接配置中关闭自动提交
  • 第二步:在消费完数据后,进行commitASync()
package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class CustomConsumerSync {
    public static void main(String[] args) {

        // 1.配置
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

        // 将自动提交修改为false
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList=new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 4.消费数据
        while (true){
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }

            // 手动提交offset,已同步的方式提交
            consumer.commitSync();
        }
    }
}

5.5.4.4 指定offset消费

auto.offset.reset = earliest | latest | none 默认是 latest。

当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

(1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。

(2)latest(默认值):自动将偏移量重置为最新偏移量

(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

image.png

注意:每次执行完,需要修改消费者组名;

package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * 从指定的偏移量进行消费
 */
public class CustomConsumerOffset {
    public static void main(String[] args) {

        // 1.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList = new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 指定位置进行消费
        Set<TopicPartition> assignment = new HashSet<>();
        // 保证分区的分配方案已经执行完毕
        while (assignment.size() == 0) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
            //  获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = consumer.assignment();
        }
        // 遍历所有分区,并指定偏移量从1700开始消费
        for (TopicPartition topicPartition : assignment) {
            consumer.seek(topicPartition,1700);
        }

        // 4.消费数据
        while (true) {
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}

5.5.4.5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

例如要求按照时间消费前一天的数据,怎么处理?

获取一天前的时间戳,然后转换为offset。

package com.xqm.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * 从指定的偏移量进行消费
 */
public class CustomConsumerOffsetTime {
    public static void main(String[] args) {

        // 1.配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centosA:9092,centosB:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 必须配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        // 2.创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        // 3.订阅主题,可以同时消费多个主题
        List<String> topicList = new ArrayList<>();
        topicList.add("firstTopic");
        consumer.subscribe(topicList);

        // 指定位置进行消费
        Set<TopicPartition> assignment = new HashSet<>();
        // 保证分区的分配方案已经执行完毕
        while (assignment.size() == 0) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
            //  获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = consumer.assignment();
        }
        HashMap<TopicPartition, Long> timestampToSearch = new
                HashMap<>();
        // 封装集合存储,每个分区对应一天前的数据
        for (TopicPartition topicPartition : assignment) {
            timestampToSearch.put(topicPartition,
                    System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }
        // 获取从 1 天前开始消费的每个分区的 offset
        Map<TopicPartition, OffsetAndTimestamp> offsets =
                consumer.offsetsForTimes(timestampToSearch);
        // 遍历每个分区,对每个分区设置消费时间。
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp =
                    offsets.get(topicPartition);
            // 根据时间指定开始消费的位置
            if (offsetAndTimestamp != null){
                consumer.seek(topicPartition,
                        offsetAndTimestamp.offset());
            }
        }

        // 4.消费数据
        while (true) {
            // 每秒拉取一次数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            // 遍历
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
            }
        }
    }
}

5.6 漏消费和重复消费

重复消费:已经消费了数据,但是 offset 没提交。

漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。

image.png

思考:怎么能做到既不漏消费也不重复消费呢?详看消费者事务。

5.7 消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如MySQL)。

image.png

5.8 生产经验——数据积压(消费者如何提高吞吐量)

因为kafka默认7天才删除消费过的数据,所以可能会产生数据积压的问题。

image.png

参数名称 描述
fetch.max.bytes 默认 Default: 52428800(50M)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50M)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

第六章 Kafka-Eagle监控

6.1 Mysql环境准备

自行安装mysql。

6.2 Kafka环境准备

6.2.1 关闭集群

要先关闭kafka集群和zookeeper集群。

6.2.2 修改内存大小

修改/opt/module/kafka/bin/kafka-server-start.sh 命令中

修改下面参数值

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

修改为:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
        export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
        export JMX_PORT="9999"
#    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

注意:每个kafka节点都要修改。

6.3 Kafka-Eagle 安装

6.3.1 官网

官网:https://www.kafka-eagle.org/

下载的版本为: kafka-eagle-bin-2.0.8.tar.gz

6.3.2 上传 & 解压

上传到/usr/local/software目录下,并解压。

[root@centosA software]# ll
总用量 251204
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7
[root@centosA software]# pwd
/usr/local/software

解压

[root@centosA software]# tar -zxvf kafka-eagle-bin-2.0.8.tar.gz 
kafka-eagle-bin-2.0.8/
kafka-eagle-bin-2.0.8/efak-web-2.0.8-bin.tar.gz
[root@centosA software]# ll
总用量 251204
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
drwxrwxr-x. 2 root root        39 10月 13 2021 kafka-eagle-bin-2.0.8
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7

6.3.3 继续解压

解压完上面之后,进入目录,发现里面还有一个压缩包。

[root@centosA software]# cd kafka-eagle-bin-2.0.8/
[root@centosA kafka-eagle-bin-2.0.8]# ll
总用量 79164
-rw-rw-r--. 1 root root 81062577 10月 13 2021 efak-web-2.0.8-bin.tar.gz

继续解压到指定目录

[root@centosA kafka-eagle-bin-2.0.8]# tar -zxvf efak-web-2.0.8-bin.tar.gz -C /usr/local/software/

查看解压情况:

[root@centosA software]# ll
总用量 251204
drwxr-xr-x. 8 root root        74 11月  3 11:27 efak-web-2.0.8
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
drwxrwxr-x. 2 root root        39 10月 13 2021 kafka-eagle-bin-2.0.8
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7

6.3.4 修改名称

[root@centosA software]# mv efak-web-2.0.8/ efak
[root@centosA software]# ll
总用量 251204
drwxr-xr-x. 8 root root        74 11月  3 11:27 efak
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7

6.3.5 查看目录结构

bin:启动脚本

conf:配置文件

db:存储数据库

[root@centosA software]# cd ./efak/
[root@centosA efak]# ll
总用量 0
drwxr-xr-x. 2 root root 33 11月  3 11:27 bin
drwxr-xr-x. 2 root root 62 11月  3 11:27 conf
drwxr-xr-x. 2 root root  6 9月  13 2021 db
drwxr-xr-x. 2 root root 23 11月  3 11:27 font
drwxr-xr-x. 9 root root 91 9月  13 2021 kms
drwxr-xr-x. 2 root root  6 9月  13 2021 logs

6.3.6 修改配置文件

[root@centosA conf]# vim system-config.properties 

1.修改zk集群

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=centosA:2181,centosB:2181,centosC:2181/kafka

2.注销掉zk存储,偏移量保存在kafka中

######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
# cluster2.efak.offset.storage=zk

3.配置mysql连接

######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://ip:port/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=自己的密码

6.3.7 配置环境变量

进入并配置环境变量

[root@centosA conf]# vim ~/.bashrc

加入:

# kafkaEFAK
export KE_HOME=/usr/local/software/efak
export PATH=$PATH:$KE_HOME/bin

重加载环境变量

[root@centosA conf]# source ~/.bashrc

测试:

[root@centosA conf]# echo $KE_HOME
/usr/local/software/efak

6.3.8 开启监控

1.注意:启动之前需要先启动 ZK 以及 KAFKA

2.启动

[root@centosA efak]# ./bin/ke.sh start
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version 2.0.8 -- Copyright 2016-2021
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.49.135:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

最下面会显示url以及登录的用户名和密码。

登录进来的页面为:

image.png

6.3.9 停止监控

[root@centosA efak]# ./bin/ke.sh stop

6.4 Kafka-Eagle 页面操作

6.4.1 进入的主页面

可以看到kafka集群信息、主题信息、zookeeper集群、消费者信息等。

image.png

6.4.2 大屏展示

image.png

第七章 Kafka-Kraft 模式

7.1 Kafka-Kraft 架构

image.png

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

这样做的好处有以下几个:

  • Kafka 不再依赖外部框架,而是能够独立运行;
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强

controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。

7.2 Kafka-Kraft 集群部署

7.2.1 重新解压一份kafka安装包

解压到/usr/local/software目录下

[root@centosA software]# tar -zxvf kafka_2.12-3.0.0.tgz 
[root@centosA software]# ll
总用量 335664
drwxr-xr-x. 8 root root       101 11月  3 12:26 efak
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
drwxr-xr-x. 7 root root       105 9月   9 2021 kafka_2.12-3.0.0
-rw-rw-rw-. 1 root root  86486610 2月   9 2022 kafka_2.12-3.0.0.tgz
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7

重命名

[root@centosA software]# mv kafka_2.12-3.0.0 kafka_kraft
[root@centosA software]# ll
总用量 335664
drwxr-xr-x. 8 root root       101 11月  3 12:26 efak
-rw-r--r--. 1 root root 176154027 10月 28 15:19 jdk-8u191-linux-x64.rpm
drwxr-xr-x. 3 root root        19 10月 28 14:09 kafka
-rw-rw-rw-. 1 root root  86486610 2月   9 2022 kafka_2.12-3.0.0.tgz
-rw-rw-rw-. 1 root root  81074069 2月   9 2022 kafka-eagle-bin-2.0.8.tar.gz
drwxr-xr-x. 7 root root       105 9月   9 2021 kafka_kraft
drwxr-xr-x. 8 root root       160 10月 28 16:27 zk-3.5.7
[root@centosA software]# pwd
/usr/local/software

7.2.2 修改配置文件

修改/usr/local/software/kafka_kraft/config/kraft/server.properties这个配置文件。

  • process.roles=broker,controller这个属性代表此台服务器起到什么作用,是Broker还是Controller,或者同时承担两种角色。ps:controller用来选举leader。
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
node.id=2
# The connect string for the controller quorum
controller.quorum.voters=2@centosA:9093,3@centosB:9093,4@centosC:9093
  • 对外保留的端口
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://centosA:9092
  • log.dir:数据存储的位置
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/software/kafka_kraft/data

7.2.3 将kafka_kraft目录复制到另外两台服务器上

[root@centosA software]# scp -r ./kafka_kraft/ root@centosB:/usr/local/software/
[root@centosA software]# scp -r ./kafka_kraft/ root@centosC:/usr/local/software/

修改另外两台的配置文件:

# The node id associated with this instance's roles
node.id=4
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://centosC:9092

7.2.4 初始化集群数据目录

1.生成存储目录的唯一ID

centosA:

[root@centosA kafka_kraft]# bin/kafka-storage.sh random-uuid
Co6vSgHFQluE2EJiec38qQ
[root@centosA kafka_kraft]# pwd
/usr/local/software/kafka_kraft

centosB:

[root@centosB kafka_kraft]# pwd
/usr/local/software/kafka_kraft
[root@centosB kafka_kraft]# bin/kafka-storage.sh random-uuid
eumSngAsQsi2RuXUSP7vQQ

centosC:

[root@centosC kafka_kraft]# pwd
/usr/local/software/kafka_kraft
[root@centosC kafka_kraft]# bin/kafka-storage.sh random-uuid
BaXQaznrR5O0Ppoaff-iiQ

2.用该 ID 格式化 kafka 存储目录(三台节点)。

注意:三台机器只能使用同一个UUID。这里使用的是centsoA生成的UUID。

如果不注意使用不同的UUID,那么就删掉data目录,重新绑定。

centosA:

[root@centosA kafka_kraft]# ./bin/kafka-storage.sh format -t Co6vSgHFQluE2EJiec38qQ -c /usr/local/software/kafka_kraft/config/kraft/server.properties 
Formatting /usr/local/software/kafka_kraft/data

centosB:

[root@centosB kafka_kraft]# ./bin/kafka-storage.sh format -t Co6vSgHFQluE2EJiec38qQ -c /usr/local/software/kafka_kraft/config/kraft/server.properties 
Formatting /usr/local/software/kafka_kraft/data

centosC:

[root@centosC kafka_kraft]# ./bin/kafka-storage.sh format -t Co6vSgHFQluE2EJiec38qQ -c /usr/local/software/kafka_kraft/config/kraft/server.properties 
Formatting /usr/local/software/kafka_kraft/data

7.2.5 启动kafka集群

启动centosA:

[root@centosA kafka_kraft]# ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties 

启动centosB:

[root@centosB kafka_kraft]# ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties 

启动centosC:

[root@centosC kafka_kraft]# ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties 

7.2.6 关闭kafka集群

关闭centosA:

[root@centosA kafka_kraft]$ bin/kafka-server-stop.sh

关闭centosB:

[root@centosB kafka_kraft]$ bin/kafka-server-stop.sh

关闭centosC:

[root@centosC kafka_kraft]$ bin/kafka-server-stop.sh

7.2.7 测试kafka集群

新增主题:

[root@centosA kafka_kraft]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --create --topic firstTopic --partitions 3  --replication-factor 3
Created topic firstTopic.

查看主题:

[root@centosA kafka_kraft]# ./bin/kafka-topics.sh --bootstrap-server centosA:9092 --list
firstTopic

7.2.8 启动和停止脚本

#!/bin/bash

case $1 in
"start")
 /usr/local/software/kafka_kraft/bin/kafka-server-start.sh -daemon  /usr/local/software/kafka_kraft/config/kraft/server.properties
;;
"stop")
  /usr/local/software/kafka_kraft/bin/kafka-server-stop.sh
;;
esac

第八章 kafka集成Spring boot

8.1 创建工程

image.png

导入的依赖为:

  <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

8.2 yml


server:
  port: 8093
spring:
  kafka:
  # 连接集群
    bootstrap-servers: centosA:9092,centosB:9092,centosC:9092
    producer:
    # 生产者的key和value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
      retries: 3
      #kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
      batch-size: 16384
      #设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
      buffer-memory: 33554432
      #发出消息持久化机制参数
      #(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
      #(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
      #(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。
      acks: 1
    consumer:
    # 消费者的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
     #自动提交offset关闭,默认true
#      enable-auto-commit: false
          #earliest:第一次从头开始消费,以后按照消费offset记录继续消费
          #latest(默认) :只消费自己启动之后发送到主题的消息
#      auto-offset-reset: earliest
          # 指定消息key和消息体的编解码方式



kafka:
  topic: firstTopic
  groupId: firstGroup

8.3 生产者

package com.xqm.kafkaspringboot.controller;


import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/kafkaSend")
    public String data(String msg){
        kafkaTemplate.send("firstTopic",msg);
        // StringSerializer
        return "发送成功";
    }
}

测试生产者:

http://localhost:8093/kafkaSend?msg=hello

消费者端:

[root@centosA kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server centosA:9092 --topic firstTopic
hello

8.4 消费者

package com.xqm.kafkaspringboot.controller;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {


    @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
    public void consumerMessage(String msg) {
        System.out.println("收到的消息为" + msg);
    }
}

第九章 生产调优

9.1 kafka硬件配置选择

9.1.1 场景说明

100 万日活跃度,每人每天 100 条日志,每天总共的日志条数是 100 万 * 100 条 = 1 亿条。

1 亿/24 小时/60 分/60 秒 = 1150 条/每秒钟。

每条日志大小:0.5k - 2k(取 1k)。

1150 条/每秒钟 * 1k ≈ 1m/s 。

高峰期每秒钟:1150 条 * 20 倍 = 23000 条。

每秒多少数据量:20MB/s。

9.1.2 服务器台数选择

服务器台数= 2 * (生产者峰值生产速率 * 副本 / 100) + 1

= 2 * (20m/s * 2 / 100) + 1

= 3 台

建议 3 台服务器。

9.1.3 磁盘选择

kafka 底层主要是顺序写,固态硬盘和机械硬盘的顺序写速度差不多。

建议选择普通的机械硬盘。

每天总数据量:1 亿条 * 1k ≈ 100g

100g * 副本 2 * 保存时间 3 天 / 0.7 ≈ 1T

建议三台服务器硬盘总大小,大于等于 1T。

9.1.4 内存选择

Kafka 内存组成:堆内存 + 页缓存

(1)Kafka 堆内存建议每个节点:10g ~ 15g

kafka-server-start.sh 中修改

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
 export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G"
fi

(2)查看 Kafka 进程号

[atguigu@hadoop102 kafka]$ jps
2321 Kafka
5255 Jps
1931 QuorumPeerMain

(3)根据 Kafka 进程号,查看 Kafka 的 GC 情况

jstat -gc -2321 ls 10

image.png

参数说明:

S0C:第一个幸存区的大小;

S1C:第二个幸存区的大小;

S0U:第一个幸存区的使用大小;

S1U:第二个幸存区的使用大小;

EC:eden区的大小;

EU:eden区的使用大小

OC:老年代大小;

OU:老年代使用大小

MC:方法区大小;

MU:方法区使用大小

CCSC:压缩类空间大小;

CCSU:压缩类空间使用大小

YGC:年轻代垃圾回收次数;

YGCT:年轻代垃圾回收消耗时间

FGC:老年代垃圾回收次数;

FGCT:老年代垃圾回收消耗时间

GCT:垃圾回收消耗总时间;

(4).根据 Kafka 进程号,查看 Kafka 的堆内存

jmap -heap 2321

Heap Usage:
G1 Heap:
 regions = 2048
 capacity = 2147483648 (2048.0MB)
 used = 246367744 (234.95458984375MB)
 free = 1901115904 (1813.04541015625MB)
 11.472392082214355% used
G1 Young Generation:
Eden Space:
 regions = 83
 capacity = 105906176 (101.0MB)
 used = 87031808 (83.0MB)
 free = 18874368 (18.0MB)
 82.17821782178218% used
Survivor Space:
 regions = 7
 capacity = 7340032 (7.0MB)
 used = 7340032 (7.0MB)
 free = 0 (0.0MB)
 100.0% used
G1 Old Generation:
 regions = 147
 capacity = 2034237440 (1940.0MB)
 used = 151995904 (144.95458984375MB)
 free = 1882241536 (1795.04541015625MB)
 7.471886074420103% used
13364 interned Strings occupying 1449608 bytes.

(5)页缓存:页缓存是 Linux 系统服务器的内存。我们只需要保证 1 个 segment(1g)中25%的数据在内存中就好。

每个节点页缓存大小 =(分区数 * 1g * 25%)/ 节点数。例如 10 个分区,页缓存大小=(10 * 1g * 25%)/ 3 ≈ 1g

建议服务器内存大于等于 11G。

9.1.5 CPU选择

num.io.threads = 8 负责写磁盘的线程数,整个参数值要占总核数的 50%。

num.replica.fetchers = 1 副本拉取线程数,这个参数占总核数的 50%的 1/3。

num.network.threads = 3 数据传输线程数,这个参数占总核数的 50%的 2/3。

建议 32 个 cpu core。

9.1.6 网络选择

网络带宽 = 峰值吞吐量 ≈ 20MB/s 选择千兆网卡即可。

100Mbps 单位是 bit;10M/s 单位是 byte ; 1byte = 8bit,100Mbps/8 = 12.5M/s。

一般百兆的网卡(100Mbps )、千兆的网卡(1000Mbps)、万兆的网卡(10000Mbps)。

9.2 kafka生产者

3.1.1 Updating Broker Configs
From Kafka version 1.1 onwards, some of the broker configs can be 
updated without restarting the broker. See the Dynamic Update Mode 
column in Broker Configs for the update mode of each broker config.
read-only: Requires a broker restart for update
per-broker: May be updated dynamically for each broker
cluster-wide: May be updated dynamically as a cluster-wide default.
May also be updated as a per-broker value for testing.

9.2.1 生产者核心流程和参数配置

image.png

参数名称 描述
bootstrap.servers 生 产 者 连 接 集 群 所 需 的 broker 地 址 清 单 。 例 如centosA:9092,centosB:9092,centosC:9092。可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。<br />1:生产者发送过来的数据,Leader 收到数据后应答。<br />-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。<br />支持压缩类型:none、gzip、snappy、lz4 和 zstd。

9.2.2 生产者提高吞吐量

参数名称 描述
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。<br />支持压缩类型:none、gzip、snappy、lz4 和 zstd。

9.2.3 数据可靠性

参数名称 描述
ack 0:生产者发送过来的数据,不需要等数据落盘应答。<br />1:生产者发送过来的数据,Leader 收到数据后应答。<br />-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。<br />默认值是-1,-1 和 all是等价的。

至少一次(At Least Once)= ACK 级别设置为-1 + 分区副本大于等于 2 + ISR 里应答的最小副本数量大于等于 2。

9.2.4 数据去重

幂等性和事务。

参数名称 描述
enable.idempotence 是否开启幂等性,默认 true,表示开启幂等性。

事务API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

9.2.5 数据有序

单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序;

9.2.6 数据乱序

参数名称 描述
enable.idempotence 是否开启幂等性,默认 true,表示开启幂等性。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。

9.3 kafka的Broker

9.3.1 核心流程和参数配置

image.png

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。<br />该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。建议关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;<br />如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。
num.replica.fetchers 默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

9.3.2 服役新节点和退役旧节点

服役新节点和退役旧节点、包括增加分区、增加副本因子、调整分区副本。

见broker章节。

9.3.3 Leader Partition 负载平衡

详见broker章节。

col1 col2
auto.leader.rebalance.enable 默认是 true。自动 Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

9.3.4 自动创建主题

如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为1)、副本因子为default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。

生产环境建议将该参数设置为 false。

9.4 kafka消费者

9.4.1 核心流程和参数配置

image.png

image.png

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer<br />value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true,则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets 的分区数,默认是 50 个分区。不建议修改。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认3s,该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。不建议修改。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值,仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (brokerconfig)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

9.4.2 消费再平衡

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可以选择的策略包括:Range、RoundRobin、Sticky、CooperativeSticky

9.4.3 指定offset和时间消费消费.事务

详见消费者章节。

kafkaConsumer.seek(topic, 1000);
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(topicPartition, System.currentTimeMillis() -
1 * 24 * 3600 * 1000);
kafkaConsumer.offsetsForTimes(timestampToSearch);

9.4.4 消费者提高吞吐量

增加分区数。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
参数名称 描述
fetch.max.bytes 默认 Default: 52428800(50M)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

9.5 kafka总体

9.5.1 如何提高吞吐量

  • 1.提升生产吞吐量

(1)buffer.memory:发送消息的缓冲区大小,默认值是 32m,可以增加到 64m。

(2)batch.size:默认是 16k。如果 batch 设置太小,会导致频繁网络请求,吞吐量下降;如果 batch 太大,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(3)linger.ms,这个值默认是 0,意思就是消息必须立即被发送。一般设置一个 5-100毫秒。如果 linger.ms 设置的太小,会导致频繁网络请求,吞吐量下降;如果 linger.ms 太长,会导致一条消息需要等待很久才能被发送出去,增加网络延时。

(4)compression.type:默认是 none,不压缩,但是也可以使用 lz4 压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大 producer 端的 CPU 开销。

  • 2.增加分区
  • 3.消费者提高吞吐量

(1)调整 fetch.max.bytes 大小,默认是 50m。

(2)调整 max.poll.records 大小,默认是 500 条。

  • 4.增加下游消费者处理能力

9.5.2 数据精准一次

  • 1.生产者角度

acks 设置为-1 (acks=-1)。

幂等性(enable.idempotence = true) + 事务 。

  • 2.broker 服务端角度

分区副本大于等于 2 (–replication-factor 2)。

ISR 里应答的最小副本数量大于等于 2 (min.insync.replicas = 2)。

  • 3.消费者

事务+手动提交offset(enable.auto.commit = false)

消费者输出的目的地必须支持事务(MySQL、Kafka)。

9.5.3 合理设置分区数

(1)创建一个只有 1 个分区的 topic。

(2)测试这个 topic 的 producer 吞吐量和 consumer 吞吐量。

(3)假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s。

(4)然后假设总的目标吞吐量是 Tt,那么分区数 = Tt / min(Tp,Tc)。

例如:producer 吞吐量 = 20m/s;consumer 吞吐量 = 50m/s,期望吞吐量 100m/s;

分区数 = 100 / 20 = 5 分区

分区数一般设置为:3-10 个

分区数不是越多越好,也不是越少越好,需要搭建完集群,进行压测,再灵活调整分区个数

9.5.4 单条日志超过1M

参数名称 描述
message.max.bytes 默认 1m,broker 端接收每个批次消息最大值。
max.request.size 默认 1m,生产者发往 broker 每个请求消息最大值。针对 topic级别设置消息体的大小
replica.fetch.max.bytes 默认 1m,副本同步数据,每个批次消息最大值。
fetch.max.bytes 默认 Default: 52428800(50M)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响

9.5.5 服务器挂掉

在生产环境中,如果某个 Kafka 节点挂掉。

正常处理办法:

(1)先尝试重新启动一下,如果能启动正常,那直接解决。

(2)如果重启不行,考虑增加内存、增加 CPU、网络带宽。

(3)如果将 kafka 整个节点误删除,如果副本数大于等于 2,可以按照服役新节点的方式重新服役一个新节点,并执行负载均衡。