Kafka生产者解析

博客 动态
0 190
羽尘
羽尘 2022-05-18 20:59:22
悬赏:0 积分 收藏

Kafka 生产者解析

一、消息发送

1.1 数据生产流程

数据生产流程图解:

  1. Producer创建时,会创建?个Sender线程并设置为守护线程
  2. ?产消息时,内部其实是异步流程;?产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
  3. 批次发送的条件为:缓冲区数据??达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个
  4. 批次发送后,发往指定分区,然后落盘到 broker;如果?产者配置了retrires参数?于0并且失败原因允许重试,那么客户端内部会对该消息进?重试
  5. 落盘到broker成功,返回?产元数据给?产者
  6. 元数据返回有两种?式:?种是通过阻塞直接返回,另?种是通过回调返回

1.2 必要的参数配置

先来看看我们一般在程序中是怎么配置的:

最常用的配置项:

属性说明重要性
bootstrap.servers?产者客户端与broker集群建?初始连接需要的broker地址列表,由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需要写全部的Kafka集群中broker的地址,但也不要写?个,以防该节点宕机的时候不可?。形式为:host1:port1,host2:port2,....high
key.serializer实现了接?org.apache.kafka.common.serialization.Serializer的key序列化类。high
value.serializer实现了接?org.apache.kafka.common.serialization.Serializer的value序列化类。high
acks该选项控制着已发送消息的持久性。
acks=0:?产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该消息,retries设置也不起作?,因为客户端不关?消息是否发送失败。客户端收到的消息偏移量永远是-1。
acks=1:leader将记录写到它本地?志,就响应客户端确认消息,?不等待follower副本的确认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该消息。
acks=all:leader等待所有同步的副本确认该消息。保证了只要有?个同步副本存在,消息就不会丢失。这是最强的可?性保证。等价于acks=-1。默认值为1,字符串。可选值:[all, -1, 0, 1]
high
compression.type?产者?成数据的压缩格式。默认是none(没有压缩)。允许的值:nonegzipsnappylz4。压缩是对整个消息批次来讲的。消息批的效率也影响压缩的?例。消息批越?,压缩效率越好。字符串类型的值。默认是nonehigh
retries设置该属性为?个?于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常重新发送并???。允许重试但是不设置max.in.flight.requests.per.connection为 1,存在消息乱序的可能,因为如果两个批次发送到同?个分区,第?个失败了重试,第?个成功了,则第?个消息批在第?个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647]high

1.3 拦截器

1.3.1 拦截器介绍

Producer 的拦截器(Interceptor)和 Consumer 的 Interceptor 主要?于实现Client端的定制化控制逻辑。
对于Producer??,Interceptor使得?户在消息发送前以及Producer回调逻辑前有机会对消息做?些定制化需求,?如修改消息等。同时,Producer允许?户指定多个Interceptor按序作?于同?条消息从?形成?个拦截链(Interceptor Chain)。Intercetpor 的实现接?是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的?法包括:

  • onSend(ProducerRecord):该?法封装进KafkaProducer.send?法中,即运?在?户主线程中。Producer确保在消息被序列化以计算分区前调?该?法。?户可以在该?法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响?标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该?法会在消息被应答之前或消息发送失败时调?,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运?在Producer的IO线程中,因此不要在该?法中放?很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要?于执??些资源清理?作。

如前所述,Interceptor可能被运?在多个线程中,因此在具体实现时?户需要??确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调?它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误?志中??在向上传递。这在使?过程中要特别留意。

1.3.2 自定义拦截器

自定义拦截器步骤:

  1. 实现ProducerInterceptor接?
  2. 在KafkaProducer的设置中设置?定义的拦截器

自定义拦截器 1

public class InterceptorOne<Key, Value> implements ProducerInterceptor<Key, Value> {    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);    @Override    public ProducerRecord<Key, Value> onSend(ProducerRecord<Key, Value> record) {        System.out.println("拦截器1---go");        // 此处根据业务需要对相关的数据作修改        String topic = record.topic();        Integer partition = record.partition();        Long timestamp = record.timestamp();        Key key = record.key();        Value value = record.value();        Headers headers = record.headers();        // 添加消息头        headers.add("interceptor", "interceptorOne".getBytes());        ProducerRecord<Key, Value> newRecord = new ProducerRecord<Key, Value>(topic,                partition, timestamp, key, value, headers);        return newRecord;    }    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        System.out.println("拦截器1---back");        if (exception != null) {            // 如果发?异常,记录?志中            LOGGER.error(exception.getMessage());        }    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }}

照着 拦截器 1 再加两个拦截器

生产者

public class MyProducer1 {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        Map<String, Object> configs = new HashMap<>();        // 设置连接Kafka的初始连接?到的服务器地址        // 如果是集群,则可以通过此初始连接发现集群中的其他broker        configs.put("bootstrap.servers", "192.168.0.102:9092");        // 设置key的序列化器        configs.put("key.serializer", IntegerSerializer.class);        // 设置?定义的序列化类        configs.put("value.serializer", UserSerializer.class);        // 设置自定义分区器        configs.put("partitioner.class", "com.mfc.config.MyPartitioner");        // 设置拦截器        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,                "com.mfc.interceptor.InterceptorOne,"                        + "com.mfc.interceptor.InterceptorTwo,"                        + "com.mfc.interceptor.InterceptorThree");        KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);        User user = new User();        user.setUserId(1001);        user.setUsername("阿彪");        // ?于封装Producer的消息        ProducerRecord<Integer, User> record = new ProducerRecord<>(                "topic_1", // 主题名称                0, // 分区编号                user.getUserId(), // 数字作为key                user // user 对象作为value        );        producer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception e) {                if (e == null) {                    System.out.println("消息发送成功:" + metadata.topic() + "\t"                            + metadata.partition() + "\t"                            + metadata.offset());                } else {                    System.out.println("消息发送异常");                }            }        });        // 关闭?产者        producer.close();    }}

1.4 序列化器

1.4.1 Kafka 自带序列化器

Kafka使?org.apache.kafka.common.serialization.Serializer接??于定义序列化器,将泛型指定类型的数据转换为字节数组。

package org.apache.kafka.common.serialization;import java.io.Closeable;import java.util.Map;/**将对象转换为byte数组的接?该接?的实现类需要提供?参构造器@param <T> 从哪个类型转换*/public interface Serializer<T> extends Closeable {    /*    类的配置信息    @param configs key/value pairs    @param isKey key的序列化还是value的序列化    */    void configure(Map<String, ?> var1, boolean var2);    /*    将对象转换为字节数组     @param topic 主题名称     @param data 需要转换的对象     @return 序列化的字节数组    */    byte[] serialize(String var1, T var2);    /*    关闭序列化器    该?法需要提供幂等性,因为可能调?多次。    */    void close();}

系统提供了该接?的?接?以及实现类:

org.apache.kafka.common.serialization.ByteArraySerializer

org.apache.kafka.common.serialization.ByteBufferSerializer

org.apache.kafka.common.serialization.BytesSerializer

org.apache.kafka.common.serialization.DoubleSerializer

org.apache.kafka.common.serialization.FloatSerializer

org.apache.kafka.common.serialization.IntegerSerializer

org.apache.kafka.common.serialization.StringSerializer

org.apache.kafka.common.serialization.LongSerializer

org.apache.kafka.common.serialization.ShortSerializer

1.4.2 自定义序列化器

数据的序列化?般?产中使? avro

?定义序列化器需要实现 org.apache.kafka.common.serialization.Serializer<T> 接?,并实现其中的serialize?法。

实体类

public class User {    private Integer userId;    private String username;    // set、get方法省略}

自定义序列化器

public class UserSerializer implements Serializer<User> {    @Override    public void configure(Map<String, ?> map, boolean b) {        // do Nothing    }    @Override    public byte[] serialize(String topic, User user) {        try {            // 如果数据是null,则返回null            if (user == null) return null;            Integer userId = user.getUserId();            String username = user.getUsername();            int length = 0;            byte[] bytes = null;            if (null != username) {                bytes = username.getBytes("utf-8");                length = bytes.length;            }            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);            buffer.putInt(userId);            buffer.putInt(length);            buffer.put(bytes);            return buffer.array();        } catch (UnsupportedEncodingException e) {            throw new SerializationException("序列化数据异常");        }    }    @Override    public void close() {        // do Nothing    }}

生产者

public class MyProducer1 {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        Map<String, Object> configs = new HashMap<>();        // 设置连接Kafka的初始连接?到的服务器地址        // 如果是集群,则可以通过此初始连接发现集群中的其他broker        configs.put("bootstrap.servers", "192.168.0.102:9092");        // 设置key的序列化器        configs.put("key.serializer", IntegerSerializer.class);        // 设置?定义的序列化类        configs.put("value.serializer", UserSerializer.class);        KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);        User user = new User();        user.setUserId(1001);        user.setUsername("阿彪");        // ?于封装Producer的消息        ProducerRecord<Integer, User> record = new ProducerRecord<>(                "topic_1", // 主题名称                0, // 分区编号                user.getUserId(), // 数字作为key                user // user 对象作为value        );        producer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception e) {                if (e == null) {                    System.out.println("消息发送成功:" + metadata.topic() + "\t"                            + metadata.partition() + "\t"                            + metadata.offset());                } else {                    System.out.println("消息发送异常");                }            }        });        // 关闭?产者        producer.close();    }}

1.5 分区器

1.5.1 Kafka 自带分区器

默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使?record提供的分区号
  2. 如果record没有提供分区号,则使?key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使?轮询的?式分配分区号。
    • 会?先在可?的分区中分配分区号
    • 如果没有可?的分区,则在该主题所有分区中分配分区号。

看一下kafka的生产者(KafkaProducer)源码:

再看Kafka自带的默认分区器(DefaultPartitioner):

默认的分区器实现了 Partitioner 接口,先看一下接口:

public interface Partitioner extends Configurable, Closeable {    /**     * 为指定的消息记录计算分区值     *     * @param topic 主题名称     * @param key 根据该key的值进?分区计算,如果没有则为null     * @param keyBytes key的序列化字节数组,根据该数组进?分区计算。如果没有key,则为null     * @param value 根据value值进?分区计算,如果没有,则为null     * @param valueBytes value的序列化字节数组,根据此值进?分区计算。如果没有,则为null     * @param cluster 当前集群的元数据     */    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);    /**     * 关闭分区器的时候调?该?法     */    public void close();}
1.5.2 自定义分区器

如果要?定义分区器,则需要

  1. ?先开发Partitioner接?的实现类
  2. 在KafkaProducer中进?设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")

实现Partitioner接??定义分区器:

public class MyPartitioner implements Partitioner {    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        return 0;    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }}

然后在?产者中配置:

二、消息发送原理

原理图解:

由上图可以看出:KafkaProducer 有两个基本线程:

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
    • 消息收集器RecoderAccumulator为每个分区都维护了?个 Deque<ProducerBatch> 类型的双端队列。
    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低?络影响;
    • 由于?产者客户端使? java.io.ByteBuffer 在发送消息之前进?消息保存,并维护了?个 BufferPool 实现 ByteBuffer 的复?;该缓存池只针对特定??( batch.size 指定)的 ByteBuffer进?管理,对于消息过?的缓存,不能做到重复利?。
    • 每次追加?条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取?个ProducerBatch,判断当前消息的??是否可以写?该批次中。若可以写?则写?;若不可以写?,则新建?个ProducerBatch,判断该消息??是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建?新的ProducerBatch,这样?便进?缓存重复利?;若超过,则以计算的消息??建?对应的 ProducerBatch ,缺点就是该内存不能被复?了。
  • Sender线程:
    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。
    • 进?步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进?缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压?最?的?个,以实现消息的尽快发出。

三、更多生产者参数配置

参数名称描述
retry.backoff.ms在向?个指定的主题分区重发消息的时候,重试之间的等待时间。
?如3次重试,每次重试之后等待该时间?度,再接着重试。在?些失败的场景,避免了密集循环的重新发送请求。
long型值,默认100。可选值:[0,...]
retriesretries重试次数
当消息发送出现错误的时候,系统会重发消息。
跟客户端收到错误时重发?样。
如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
否则在重试此失败消息的时候,其他的消息可能发送成功了
request.timeout.ms客户端等待请求响应的最?时?。如果服务端响应超时,则会重发请求,除?达到重试次数。该设置应该?replica.lag.time.max.ms (a broker configuration)要?,以免在服务器延迟时间内重发消息。int类型值,默认:30000,可选值:[0,...]
interceptor.classes在?产者接收到该消息,向Kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进?处理。
要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor接?。默认没有拦截器。
Map<String, Object> configs中通过List集合配置多个拦截器类名。
acks默认值:all。
acks=0:
?产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
该情形不能保证broker是否真的收到了消息,retries配置也不会?效。发送的消息的返回的消息偏移量永远是-1。

acks=1
表示消息只需要写到主分区即可,然后就响应客户端,?不等待副本分区的确认。
在该情形下,如果主分区收到消息确认之后就宕机了,?副本分区还没来得及同步该消息,则该消息丢失。

acks=all
?领分区会等待所有的ISR副本分区确认记录。
该处理保证了只要有?个ISR副本分区存活,消息就不会丢失。
这是Kafka最强的可靠性保证,等效于acks=-1
batch.size当多个消息发送到同?个分区的时候,?产者尝试将多个记录作为?个批来处理。批处理提?了客户端和服务器的处理效率。
该配置项以字节为单位控制默认批的??。
所有的批?于等于该值。
发送给broker的请求将包含多个批次,每个分区?个,并包含可发送的数据。
如果该值设置的?较?,会限制吞吐量(设置为0会完全禁?批处理)。如果设置的很?,?有?点浪费内存,因为Kafka会永远分配这么?的内存来参与到消息的批整合中。
client.id?产者发送请求的时候传递给broker的id字符串。
?于在broker的请求?志中追踪什么应?发送了什么消息。
?般该id是跟业务有关的字符串。
compression.type?产者发送的所有数据的压缩?式。默认是none,也就是不压缩。
?持的值:none、gzip、snappy和lz4。
压缩是对于整个批来讲的,所以批处理的效率也会影响到压缩的?例。
send.buffer.bytesTCP发送数据的时候使?的缓冲区(SO_SNDBUF)??。如果设置为0,则使?操作系统默认的。
buffer.memory?产者可以?来缓存等待发送到服务器的记录的总内存字节。如果记录的发送速度超过了将记录发送到服务器的速度,则?产者将阻塞max.block.ms的时间,此后它将引发异常。此设置应?致对应于?产者将使?的总内存,但并??产者使?的所有内存都?于缓冲。?些额外的内存将?于压缩(如果启?了压缩)以及维护运?中的请求。long型数据。默认值:33554432,可选值:[0,...]
connections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:540000
linger.ms?产者在发送请求传输间隔会对需要发送的消息进?累积,然后作为?个批次发送。?般情况是消息的发送的速度?消息累积的速度慢。有时客户端需要减少请求的次数,即使是在发送负载不?的情况下。该配置设置了?个延迟,?产者不会?即将消息发送到broker,?是等待这么?段时间以累积消息,然后将这段时间之内的消息作为?个批次发送。该设置是批处理的另?个上限:?旦批消息达到了batch.size指定的值,消息批会?即发送,如果积累的消息字节数达不到batch.size的值,可以设置该毫秒值,等待这么?时间之后,也会发送消息批。该属性默认值是0(没有延迟)。如果设置linger.ms=5,则在?个请求发送之前先等待5ms。long型值,默认:0,可选值:[0,...]
max.block.ms控制KafkaProducer.send()KafkaProducer.partitionsFor()阻塞的时?。当缓存满了或元数据不可?的时候,这些?法阻塞。在?户提供的序列化器和分区器的阻塞时间不计?。long型值,默认:60000,可选值:[0,...]
max.request.size单个请求的最?字节数。该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多的数据。服务器有??的限制批??的设置,与该配置可能不?样。int类型值,默认1048576,可选值:[0,...]
partitioner.class实现了接?org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值为:org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使?操作系统默认的值。int类型值,默认32768,可选值:[-1,...]
security.protocol跟broker通信的协议:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
string类型值,默认:PLAINTEXT
max.in.flight.requests.per.connection单个连接上未确认请求的最?数量。达到这个数量,客户端阻塞。如果该值?于1,且存在失败的请求,在重试的时候消息顺序不能保证。
int类型值,默认5。可选值:[1,...]
reconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直?达到此最?值。在计算退避增量之后,添加20%的随机抖动以避免连接?暴。
long型值,默认1000,可选值:[0,...]
reconnect.backoff.ms尝试重连指定主机的基础等待时间。避免了到该主机的密集重连。该退避时间应?于该客户端到broker的所有连接。
long型值,默认50。可选值:[0,...]
posted @ 2022-05-18 20:51 下半夜的风 阅读(0) 评论(0) 编辑 收藏 举报
回帖
    羽尘

    羽尘 (王者 段位)

    2335 积分 (2)粉丝 (11)源码

     

    温馨提示

    亦奇源码

    最新会员