多分区原子写入:
事务能够保证Kafka topic下每个分区的原?写?。事务中所有的消息都将被成功写?或者丢弃。
?先,我们来考虑?下原?读取-处理-写?周期是什么意思。简??之,这意味着如果某个应?程序在某个topic tp0的偏移量X处读取到了消息A,并且在对消息A进?了?些处理(如B = F(A)),之后将消息B写?topic tp1,则只有当消息A和B被认为被成功地消费并?起发布,或者完全不发布时,整个读取过程写?操作是原?的。
现在,只有当消息A的偏移量X被标记为已消费,消息A才从topic tp0消费,消费到的数据偏移量(record offset)将被标记为提交偏移量(Committing offset)。在Kafka中,我们通过写??个名为offsets topic的内部Kafka topic来记录offset commit。消息仅在其offset被提交给offsets topic时才被认为成功消费。
由于offset commit只是对Kafka topic的另?次写?,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原?写?也启?原?读取-处理-写?循环:提交偏移量X到offset topic和消息B到tp1的写?将是单个事务的?部分,所以整个步骤都是原?的。
粉碎“僵尸实例”:
我们通过为每个事务Producer分配?个称为transactional.id的唯?标识符来解决僵?实例的问题。在进程重新启动时能够识别相同的Producer实例。
API要求事务性Producer的第?个操作应该是在Kafka集群中显示注册transactional.id。 当注册的时候,Kafka broker?给定的transactional.id检查打开的事务并且完成处理。 Kafka也增加了?个与transactional.id相关的epoch。Epoch存储每个transactional.id内部元数据。
?旦epoch被触发,任何具有相同的transactional.id和旧的epoch的?产者被视为僵?,Kafka拒绝来?这些?产者的后续事务性写?。
简??之:Kafka可以保证Consumer最终只能消费?事务性消息或已提交事务性消息。它将保留来?未完成事务的消息,并过滤掉已中?事务的消息。
在?个原?操作中,根据包含的操作类型,可以分为三种情况,前两种情况是事务引?的场景,最后?种没?:
consume-transform-produce模式创建消费者代码,需要:
auto.commit)进?关闭commitSync()或者commitAsync()isolation.level创建生产者,代码如下,需要:
transactional.id属性enable.idempotence属性事务相关配置
Broker configs:
| 配置项 | 说明 |
|---|---|
| transactional.id.timeout.ms | 在ms中,事务协调器在?产者TransactionalId提前过期之前等待的最?时间,并且没有从该?产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周?次的?产者作业维护它们的id |
| max.transaction.timeout.ms | 事务允许的最?超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防?客户机超时过?,从?导致?户?法从事务中包含的主题读取内容。 默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。 |
| transaction.state.log.replication.factor | 事务状态topic的副本数量。默认值:3 |
| transaction.state.log.num.partitions | 事务状态主题的分区数。默认值:50 |
| transaction.state.log.min.isr | 事务状态主题的每个分区ISR最?数量。默认值:2 |
| transaction.state.log.segment.bytes | 事务状态主题的segment??。默认值:104857600字节 |
Producer configs:
| 配置项 | 说明 |
|---|---|
| enable.idempotence | 开启幂等 |
| transaction.timeout.ms | 事务超时时间 事务协调器在主动中?正在进?的事务之前等待?产者更新事务状态的最?时间。这个配置值将与InitPidRequest?起发送到事务协调器。如果该值?于max.transaction.timeout。在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。 默认是60000。这使得交易不会阻塞下游消费超过?分钟,这在实时应?程序中通常是允许的。 |
| transactional.id | ?于事务性交付的TransactionalId。这?持跨多个?产者会话的可靠性语义,因为它允许客户端确保使?相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则?产者仅限于幂等交付。 |
Consumer configs:
| 配置项 | 说明 |
|---|---|
| isolation.level | - read_uncommitted:以偏移顺序使?已提交和未提交的消息。 - read_committed:仅以偏移量顺序使??事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使?者中缓冲消息,直到看到给定事务中的所有消息。 |

事务协调器和事务?志
事务协调器是每个Kafka内部运?的?个模块。事务?志是?个内部的主题。每个协调器拥有事务?志所在分区的?集,即这些 borker 中的分区都是Leader。
每个transactional.id都通过?个简单的哈希函数映射到事务?志的特定分区,事务?志?件__transaction_state-0。这意味着只有?个Broker拥有给定的transactional.id。
通过这种?式,我们利?Kafka可靠的复制协议和Leader选举流程来确保事务协调器始终可?,并且所有事务状态都能够持久化。
值得注意的是,事务?志只保存事务的最新状态?不是事务中的实际消息。消息只存储在实际的Topic的分区中。事务可以处于诸如“Ongoing”,“prepare commit”和“Completed”之类的各种状态中。正是这种状态和关联的元数据存储在事务?志中。
事务数据流
数据流在抽象层?上有四种不同的类型
initTransactions API向coordinator注册?个transactional.id。 此时,coordinator使?该transactional.id关闭所有待处理的事务,并且会避免遇到僵?实例,由具有相同的transactional.id的Producer的另?个实例启动的任何事务将被关闭和隔离。每个Producer会话只发??次。coordinator注册分区commitTransaction或abortTransaction时,会向coordinator发送?个请求以开始两阶段提交协议。Kafka在引?幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
?产中,会出现各种不确定的因素,?如在Producer在发送给Broker的时候出现?络异常。?如以下这种异常情况的出现:
上图这种情况,当Producer第?次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(?如?络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
幂等性
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的?致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x)。f函数表示对消息的处理。
?如,银?转账,如果失败,需要重试。不管重试多少次,都要保证最终结果?定是?致的。
幂等性实现
添加唯?ID,类似于数据库的主键,?于唯?标记?个消息。
Kafka为了实现幂等性,它在底层设计架构中引?了ProducerID和SequenceNumber。

同样,这是?种理想状态下的发送流程。实际情况下,会有很多不确定的因素,?如Broker在发送Ack信号给Producer时出现?络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发?异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引?了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,?之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有?条(x2,y2),不会出现重复发送的情况。
客户端在?成Producer时,会实例化如下代码:
// 实例化?个Producer对象Producer<String, String> producer = new KafkaProducer<>(props);在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有?个maybeWaitForPid()?法,?来?成?个ProducerID,实现代码如下:
private void maybeWaitForPid() { if (transactionState == null) return; while (!transactionState.hasPid()) { try { Node node = awaitLeastLoadedNodeReady(requestTimeout); if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node); if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody(); transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch()); } else { log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node); } } else { log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again."); } } catch (Exception e) { log.warn("Received an exception while trying to get a pid. Will back off and retry.", e); } log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs); time.sleep(retryBackoffMs); metadata.requestUpdate(); }}在Kafka事务中,?个原?性操作,根据操作类型可以分为3种情况。情况如下:
// 初始化事务,需要注意确保transation.id属性被分配void initTransactions();// 开启事务void beginTransaction() throws ProducerFencedException;// 为Consumer提供的在事务内Commit Offsets的操作void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;// 提交事务void commitTransaction() throws ProducerFencedException;// 放弃事务,类似于回滚事务的操作void abortTransaction() throws ProducerFencedException;案例1:单个Producer,使?事务保证消息的仅?次发送:
package com.mfc.kafka.demo.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;import java.util.Map;public class MyTransactionalProducer { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 提供客户端ID configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer"); // 事务ID configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id"); // 要求ISR都确认 configs.put(ProducerConfig.ACKS_CONFIG, "all"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs); // 初始化事务 producer.initTransactions(); // 开启事务 producer.beginTransaction(); try { // producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01")); producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02")); // int i = 1 / 0; // 提交事务 producer.commitTransaction(); } catch (Exception ex) { // 中?事务 producer.abortTransaction(); } finally { // 关闭?产者 producer.close(); } }} 案例2:在消费-转换-?产模式,使?事务保证仅?次发送。
package com.mfc.kafka.demo;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Collections;import java.util.HashMap;import java.util.Map;public class MyTransactional { public static KafkaProducer<String, String> getProducer() { Map<String, Object> configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 设置client.id configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01"); // 设置事务id configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02"); // 需要所有的ISR副本确认 configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 启?幂等性 configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs); return producer; } public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) { Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 设置消费组ID configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02"); // 不启?消费者偏移量的?动确认,也不要?动确认 configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 只读取已提交的消息 // configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs); return consumer; } public static void main(String[] args) { String consumerGroupId = "consumer_grp_id_101"; KafkaProducer<String, String> producer = getProducer(); KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId); // 事务的初始化 producer.initTransactions(); //订阅主题 consumer.subscribe(Collections.singleton("tp_tx_01")); final ConsumerRecords<String, String> records = consumer.poll(1_000); // 开启事务 producer.beginTransaction(); try { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { System.out.println(record); producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value())); offsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下?条要消费的消息 } // 将该消息的偏移量提交作为事务的?部分,随事务提交和回滚(不提交消费偏移量) producer.sendOffsetsToTransaction(offsets, consumerGroupId); // int i = 1 / 0; // 提交事务 producer.commitTransaction(); } catch (Exception e) { e.printStackTrace(); // 回滚事务 producer.abortTransaction(); } finally { // 关闭资源 producer.close(); consumer.close(); } } }Kafka集群包含若?个broker,broker.id指定broker的编号,编号不要重复。
Kafka集群上创建的主题,包含若?个分区。
每个分区包含若?个副本,副本因?包括了Follower副本和Leader副本。
副本?分为ISR(同步副本分区)和OSR(?同步副本分区)。
控制器就是?个broker。
控制器除了?般broker的功能,还负责Leader分区的选举。
Broker 选举:
集群?第?个启动的broker在Zookeeper中创建临时节点<KafkaZkChroot>/controller。
其他broker在该控制器节点创建Zookeeper watch对象,使?Zookeeper的监听机制接收该节点的变更。
即:Kafka通过Zookeeper的分布式锁特性选举集群控制器。
下图中,节点/myKafka/controller是?个zookeeper临时节点,其中"brokerid":0,表示当前控制器是broker.id为 0 的broker。
每个新选出的控制器通过 Zookeeper 的条件递增操作获得?个全新的、数值更?的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防?“脑裂”。
?如当?个Leader副本分区所在的broker宕机,需要选举新的Leader副本分区,有可能两个具有不同纪元数字的控制器都选举了新的Leader副本分区,如果选举出来的Leader副本分区不?样,听谁的?脑裂了。有了纪元数字,直接使?纪元数字最新的控制器结果。
当控制器发现?个 broker 已经离开集群,那些失去Leader副本分区的Follower分区需要?个新Leader(这些分区的?领刚好是在这个 broker 上)。
下图中,<KafkaChroot>/brokers/ids/0保存该broker的信息,此节点为临时节点,如果broker节点宕机,该节点丢失。
集群控制器负责监听ids节点,?旦节点?节点发送变化,集群控制器得到通知。
控制器遍历这些Follower副本分区,并确定谁应该成为新Leader分区,然后向所有包含新Leader分区和现有Follower的 broker 发送请求。该请求消息包含了谁是新Leader副本分区以及谁是Follower副本分区的信息。随后,新Leader分区开始处理来??产者和消费者的请求,?跟随者开始从新Leader副本分区消费消息。
当控制器发现?个 broker 加?集群时,它会使? broker ID 来检查新加?的 broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加?的 broker 和其他 broker,新 broker上的副本分区开始从Leader分区那?消费消息,与Leader分区保持同步。
结论:
--replication-factor 3,表示分区的副本数,不要超过broker的数量。replica.lag.time.max.ms默认值:10000)。acks=all。Follower收到消息后,会像Leader发送ACK。?旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那么Leader就向Producer发送ACK。当某个topic的--replication-factor为N(N>1)时,每个Partition都有N个副本,称作replica。原则上是将replica均匀的分配到整个集群上。不仅如此,partition的分配也同样需要均匀分配。为了更好的负载均衡。
副本分配的三个?标:
在不考虑机架信息的情况下:

Leader的选举:
如果Leader宕机了该怎么办?很容易想到我们在Follower中重新选举?个Leader,但是选举哪个作为leader呢?Follower可能已经落后许多了,因此我们要选择的是”最新”的Follow:新的Leader必须拥有与原来Leader commit过的所有信息。
kafka动态维护?组同步leader数据的副本(ISR),只有这个组的成员才有资格当选leader,kafka副本写?不被认为是已提交,直到所有的同步副本已经接收才认为。这组ISR保存在zookeeper,正因为如此,在ISR中的任何副本都有资格当选leader。
基于Zookeeper的选举?式:
?数据很多组件都有Leader选举的概念,如HBASE等。它们?都基于ZK进?选举,所有Follow都在ZK上?注册?个Watch,?旦Leader宕机,Leader对应的Znode会?动删除,那些Follow由于在Leader节点上注册了Watcher,故可以得到通知,就去参与下?轮选举,尝试去创建该节点,ZK会保证只有?个Follow创建成功,成为新的Leader。
但是这种?式有?个缺点:
基于Controller的选举?式:
Kafka 0.8后的Leader Election?案解决了上述问题,它在所有broker中选出?个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的?式(?ZooKeeper Queue的?式更?效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
如何处理Replica的恢复:
如果Replica都死了怎么办?
只要?少有?个replica,就能保证数据不丢失,可是如果某个partition的所有replica都死了怎么办?有两种?案:
unclean.leader.election.enable禁?它。Broker宕机怎么办?
Controller在Zookeeper的/brokers/ids节点上注册Watch。?旦有Broker宕机,其在Zookeeper对应的Znode会?动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
对set_p中的每?个Partition:
从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
决定该Partition的新Leader。如果当前ISR中有?少?个Replica还幸存,则选择其中?个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意?个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
将新的Leader,ISR和新的leader_epoch及controller_epoch写?/brokers/topics/[topic]/partitions/[partition]/state。
[zk: localhost:2181(CONNECTED) 13] get /brokers/topics/bdstar/partitions/0/state{"controller_epoch":1272,"leader":0,"version":1,"leader_epoch":4,"isr":[0,2]}直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在?个RPC操作中发送多个命令从?提?效率。
Controller宕机怎么办?
每个Broker都会在/controller上注册?个Watch。
[zk: localhost:2181(CONNECTED) 19] get /controller{"version":1,"brokerid":1...............}当前Controller宕机时,对应的/controller会?动消失。所有“活”着的Broker竞选成为新的Controller,会创建新的Controller Path
[zk: localhost:2181(CONNECTED) 19] get /controller{"version":1,"brokerid":2...............}注意:只会有?个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是?次性的,被fire?次之后即失效,所以需要重新注册
Kafka中,?个主题可以有多个分区,增强主题的可扩展性,为了保证靠可?,可以为每个分区设置副本数。
只有Leader副本可以对外提供读写服务,Follower副本只负责poll Leader副本的数据,与Leader副本保持数据的同步。
系统维护?个ISR副本集合,即所有与Leader副本保持同步的副本列表。
当Leader宕机找不到的时候,就从ISR列表中挑选?个分区做Leader。如果ISR列表中的副本都找不到了,就剩下OSR的副本了。
此时,有两个选择:要么选择OSR的副本做Leader,优点是可以?即恢复该分区的服务。缺点是可能会丢失数据。
要么选择等待,等待ISR列表中的分区副本可?,就选择该可?ISR分区副本做Leader。优点是不会丢失数据缺点是会影响当前分区的可?性。
水位标记:
?位或?印(watermark)?词,表示位置信息,即位移(offset)。Kafka源码中使?的名字是??位,HW(high watermark)。
副本??:
Kafka分区使?多个副本(replica)提供?可?。
LEO和HW:
每个分区副本对象都有两个重要的属性:LEO和HW。

上图中,HW值是7,表示位移是07的所有消息都已经处于“已提交状态”(committed),?LEO值是14,813的消息就是未完全备份(fully replicated)——为什么没有14?LEO指向的是下?条消息到来时的位移。
消费者?法消费分区下Leader副本中位移?于分区HW的消息。
Follower副本不停地向Leader副本所在的broker发送FETCH请求,?旦获取消息后写???的?志中进?备份。那么Follower副本的LEO是何时更新的呢?Kafka有两套Follower副本LEO:
Kafka使?前者帮助Follower副本更新其HW值;利?后者帮助Leader副本更新其HW。
Follower副本的本地LEO何时更新?
Follower副本的LEO值就是?志的LEO值,每当新写??条消息,LEO值就会被更新。当Follower发送FETCH请求后,Leader将数据返回给Follower,此时Follower开始Log写数据,从??动更新LEO值。
Leader端Follower的LEO何时更新?
Leader端的Follower的LEO更新发?在Leader在处理Follower FETCH请求时。?旦Leader接收到Follower发送的FETCH请求,它先从Log中读取相应的数据,给Follower返回数据前,先更新Follower的LEO。
Follower更新HW发?在其更新LEO之后,?旦Follower向Log写完数据,尝试更新??的HW值。
?较当前LEO值与FETCH响应中Leader的HW值,取两者的?者作为新的HW值。
即:如果Follower的LEO?于Leader的HW,Follower HW值不会?于Leader的HW值。
和Follower更新LEO相同,Leader写Log时?动更新??的LEO值。
Leader的HW值就是分区HW值,直接影响分区数据对消费者的可?性 。
Leader会尝试去更新分区HW的四种情况:
结论:
当Kafka broker都正常?作时,分区HW值的更新时机有两个:
Leader如何更新??的HW值?Leader broker上保存了?套Follower副本的LEO以及??的LEO。当尝试确定分区HW时,它会选出所有满?条件的副本,?较它们的LEO(包括Leader的LEO),并选择最?的LEO值作为HW值。
需要满?的条件,(?选?):
replica.lag.time.max.ms参数值(默认是10s)如果Kafka只判断第?个条件的话,确定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具备了“?刻进?ISR”的资格,因此就可能出现分区HW值越过ISR中副本LEO的情况——不允许。因为分区HW定义就是ISR中所有副本LEO的最?值。
我们假设有?个topic,单分区,副本因?是2,即?个Leader副本和?个Follower副本。我们看下当producer发送?条消息时,broker端的副本到底会发?什么事情以及分区HW是如何被更新的。
初始时Leader和Follower的HW和LEO都是0(严格来说源代码会初始化LEO为-1,不过这不影响之后的讨论)。Leader中的Remote LEO指的就是Leader端保存的Follower LEO,也被初始化成0。此时,?产者没有发送任何消息给Leader,?Follower已经开始不断地给Leader发送FETCH请求了,但因为没有数据因此什么都不会发?。值得?提的是,Follower发送过来的FETCH请求因为?数据?暂时会被寄存到Leader端的purgatory中,待500ms (replica.fetch.wait.max.ms参数)超时后会强制完成。倘若在寄存期间?产者发来数据,则Kafka会?动唤醒该FETCH请求,让Leader继续处理。
producer给该topic分区发送了?条消息
此时的状态如下图所示:
如上图所示,Leader接收到PRODUCE请求主要做两件事情:
PRODUCE请求处理完成后各值如下,Leader端的HW值依然是0,?LEO是1,Remote LEO也是0。
| 属性 | 阶段 | 旧值 | 新值 | 备注 |
|---|---|---|---|---|
| Leader LEO | PRODUCE处理完成 | 0 | 1 | 写?了?条数据 |
| Remote LEO | PRODUCE处理完成 | 0 | 0 | 还未Fetch |
| Leader HW | PRODUCE处理完成 | 0 | 0 | min(LeaderLEO=1, RemoteLEO=0)=0 |
| Follower LEO | PRODUCE处理完成 | 0 | 0 | 还未Fetch |
| Follower HW | PRODUCE处理完成 | 0 | 0 | min(LeaderHW=0, FollowerLEO=0)=0 |
假设此时follower发送了FETCH请求,则状态变更如下:
本例中当follower发送FETCH请求时,Leader端的处理依次是:
?Follower副本接收到FETCH Response后依次执?下列操作:
此时,第?轮FETCH RPC结束,我们会发现虽然Leader和Follower都已经在Log中保存了这条消息,但分区HW值尚未被更新,仍为0。
| 属性 | 阶段 | 旧值 | 新值 | 备注 |
|---|---|---|---|---|
| Leader LEO | PRODUCE和Follower FETCH处理完成 | 0 | 1 | 写?了?条数据 |
| Remote LEO | PRODUCE和Follower FETCH处理完成 | 0 | 0 | 第?次fetch中offset为0 |
| Leader HW | PRODUCE和Follower FETCH处理完成 | 0 | 0 | min(LeaderLEO=1,RemoteLEO=0)=0 |
| Follower LEO | PRODUCE和Follower FETCH处理完成 | 0 | 1 | 同步了?条数据 |
| Follower HW | PRODUCE和Follower FETCH处理完成 | 0 | 0 | min(LeaderHW=0,FollowerLEO=1)=0 |
Follower第?轮FETCH
分区HW是在第?轮FETCH RPC中被更新的,如下图所示:
Follower发来了第?轮FETCH请求,Leader端接收到后仍然会依次执?下列操作:
同样地,Follower副本接收到FETCH response后依次执?下列操作:
| 属性 | 阶段 | 旧值 | 新值 | 备注 |
|---|---|---|---|---|
| Leader LEO | 第?次Follower FETCH处理完成 | 1 | 1 | 未写?新数据 |
| Remote LEO | 第?次Follower FETCH处理完成 | 0 | 1 | 第2次fetch中offset为1 |
| Leader HW | 第?次Follower FETCH处理完成 | 0 | 1 | min(RemoteLEO,LeaderLEO)=1 |
| Follower LEO | 第?次Follower FETCH处理完成 | 1 | 1 | 未写?新数据 |
| Follower HW | 第?次Follower FETCH处理完成 | 0 | 1 | 第2次fetch resp中的LeaderHW和本地FollowerLEO都是1 |
此时消息已经成功地被复制到Leader和Follower的Log中且分区HW是1,表明消费者能够消费offset = 0的消息。
当Leader?法?即满?FECTH返回要求的时候(?如没有数据),那么该FETCH请求被暂存到Leader端的purgatory中(炼狱),待时机成熟尝试再次处理。Kafka不会?限期缓存,默认有个超时时间(500ms),?旦超时时间已过,则这个请求会被强制完成。当寄存期间还没超时,?产者发送PRODUCE请求从?使之满?了条件以致被唤醒。此时,Leader端处理流程如下:
Kafka使?HW值来决定副本备份的进度,?HW值的更新通常需要额外?轮FETCH RPC才能完成。但这种设计是有问题的,可能引起的问题包括:
使?HW值来确定备份进度时其值的更新是在下?轮RPC中完成的。如果Follower副本在标记上?的的第?步与第?步之间发?崩溃,那么就有可能造成数据的丢失。
上图中有两个副本:A和B。开始状态是A是Leader。
假设?产者min.insync.replicas为1,那么当?产者发送两条消息给A后,A写?Log,此时Kafka会通知?产者这两条消息写?成功。
| 代 | 属性 | 阶段 | 旧值 | 新值 | 备注 |
|---|---|---|---|---|---|
| 1 | Leader LEO | PRODUCE和Follower FETCH处理完成 | 0 | 1 | 写?了?条数据 |
| 1 | Remote LEO | PRODUCE和Follower FETCH处理完成 | 0 | 0 | 第?次fetch中offset为0 |
| 1 | Leader HW | PRODUCE和Follower FETCH处理完成 | 0 | 0 | min(LeaderLEO=1,FollowerLEO=0)=0 |
| 1 | Follower LEO | PRODUCE和Follower FETCH处理完成 | 0 | 1 | 同步了?条数据 |
| 1 | Follower HW | PRODUCE和Follower FETCH处理完成 | 0 | 0 | min(LeaderHW=0, FollowerLEO=1)=0 |
| 2 | Leader LEO | 第?次Follower FETCH处理完成 | 1 | 2 | 写?了第?条数据 |
| 2 | Remote LEO | 第?次Follower FETCH处理完成 | 0 | 1 | 第2次fetch中offset为1 |
| 2 | Leader HW | 第?次Follower FETCH处理完成 | 0 | 1 | min(RemoteLEO=1,LeaderLEO=2)=1 |
| 2 | Follower LEO | 第?次Follower FETCH处理完成 | 1 | 2 | 写?了第?条数据 |
| 2 | Follower HW | 第?次Follower FETCH处理完成 | 0 | 1 | min(LeaderHW=1,FollowerLEO=2)=1 |
| 3 | Leader LEO | 第三次Follower FETCH处理完成 | 2 | 2 | 未写?新数据 |
| 3 | Remote LEO | 第三次Follower FETCH处理完成 | 1 | 2 | 第3次fetch中offset为2 |
| 3 | Leader HW | 第三次Follower FETCH处理完成 | 1 | 2 | min(RemoteLEO=2,LeaderLEO)=2 |
| 3 | Follower LEO | 第三次Follower FETCH处理完成 | 2 | 2 | 未写?新数据 |
| 3 | Follower HW | 第三次Follower FETCH处理完成 | 1 | 2 | 第3次fetch resp中的LeaderHW和本地FollowerLEO都是2 |
但是在broker端,Leader和Follower的Log虽都写?了2条消息且分区HW已经被更新到2,但Follower HW尚未被更新还是1,也就是上?标记的第?步尚未执?,表中最后?条未执?。
倘若此时副本B所在的broker宕机,那么重启后B会?动把LEO调整到之前的HW值1,故副本B会做?志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1。此时follower副本底层log中就只有?条消息,即offset = 0的消息!
B重启之后需要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的Leader,?当A重启回来后也会执??志截断,将HW调整回1。这样,offset=1的消息就从两个副本的log中被删除,也就是说这条已经被?产者认为发送成功的数据丢失。
丢失数据的前提是min.insync.replicas=1时,?旦消息被写?Leader端Log即被认为是committed。延迟?轮FETCH RPC更新HW值的设计使follower HW值是异步延迟更新,若在这个过程中Leader发?变更,那么成为新Leader的Follower的HW值就有可能是过期的,导致?产者本是成功提交的消息被删除。
除了可能造成的数据丢失以外,该设计还会造成Leader的Log和Follower的Log数据不?致。
如Leader端记录序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
看图:
假设:A是Leader,A的Log写?了2条消息,但B的Log只写了1条消息。分区HW更新到2,但B的HW还是1,同时?产者min.insync.replicas仍然为1。
假设A和B所在Broker同时宕机,B先重启回来,因此B成为Leader,分区HW = 1。假设此时?产者发送了第3条消息(红?表示)给B,于是B的log中offset = 1的消息变成了红框表示的消息,同时分区HW更新到2(A还没有回来,就B?个副本,故可以直接更新HW?不?理会A)之后A重启回来,需要执??志截断,但发现此时分区HW=2?A之前的HW值也是2,故不做任何调整。此后A和B将以这种状态继续正常?作。
显然,这种场景下,A和B的Log中保存在offset = 1的消息是不同的记录,从?引发不?致的情形出现。
造成上述两个问题的根本原因在于
但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发?的任何崩溃都可能导致HW值的过期。
Kafka从0.11引?了leader epoch来取代HW值。Leader端使?内存保存Leader的epoch信息,即使出现上?的两个场景也能规避这些问题。
所谓Leader epoch实际上是?对值:<epoch, offset>:
epoch表示Leader的版本号,从0开始,Leader变更过1次,epoch+1
offset对应于该epoch版本的Leader写?第?条消息的offset。因此假设有两对值:
<0, 0><1, 120>则表示第?个Leader从位移0开始写?消息;共写了120条[0, 119];?第?个Leader版本号是1,从位移120处开始写?消息。

只需要知道每个副本都引?了新的状态来保存??当leader时开始写?的第?条消息的offset以及leader版本。这样在恢复的时候完全使?这些信息??HW来判断是否需要截断?志。

依靠Leader epoch的信息可以有效地规避数据不?致的问题。
消息重复主要发?在以下三个阶段:
?产发送的消息没有收到正确的broke响应,导致producer重试。
producer发出?条消息,broke落盘以后因为?络等种种原因发送端得到?个发送失败的响应或者?络中断,然后producer收到?个可恢复的Exception重试消息导致消息重复。

说明:
new KafkaProducer() 后创建一个线程 KafkaThread 扫描RecordAccumulator中是否有消息KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中KafkaThread 扫描到RecordAccumulator中有消息后,将消息发送到Kafka集群RecordAccumulator中,等待后台线程KafkaThread 扫描再次发送异常是 RetriableException 类型或者 TransactionManager允许重试;RetriableException 类继承关系如下:
如果设置max.in.flight.requests.per.connection大于1 (默认5, 单个连接.上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试, 但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。
设置max.in.flight.requests.per.connection为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一一个请求, 生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。
启动Kafka的幂等性
要启动Kafka的幂等性,设置enable.idempotence=true,以及ack=all和retries>1
ack=0,不重试
可能会丢失消息,适用于吞吐量指标重要性高于数据丢失,如:日志收集
ack=0,不重试
生产者发送消息完毕,不管结果,如果发送失败也就丢失了
ack=1,Leader crash
生产者发送消息完毕,只等待Leader写入成功就返回了,Leader 分区丢失了,此时Follower没来得及同步,消息丢失
unclean.leader.election.enable 配置true
允许选举ISR以外的副本作为leader,会导致数据丢失,默认为false。 生产者发送异步消息,只等待Lead写入成功就返回,Leader分区丢失,此时ISR中没有Follower, Leader从OSR中选举,因为OSR中本来落后于Leader造成消息丢失。
禁用unclean选举,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable:false
生产者发完消息,等待Follower同步完再返回, 如果异常则重试。副本的数量可能影响吞吐量,不超过5个,一般三个。
不允许unclean Leader选举。
配置:min.insync.replicas>1
当生产者将acks设置为all (或-1 )时,min.insync.replicas>1。指定确认消息写成功需要的最小副本数量。达不到这个最小值,生产者将引发-个异常(要么是NotEnoughReplicas, 要么是NotEnoughReplicasAfterAppend)。
当一起使用时,min.insync.replicas和ack允许执行更大的持久性保证。一个典型的场景 是创建一个复制因子为3的主题,设置min.insync复制到2个, 用 all 配置发送。将确保如果大多数副本没有收到写操作,则生产者将引发异常。
失败的 offset 单独记录
生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理。
根本原因
数据消费完没及时提交 offset 到 broker
场景
消息消费端在消费过程中挂掉没有及时提交offset到broke,另一个消费端启动拿之前记录的offset开始消费,由于offset的滞后性可能会导致新启动的客户端有少量重复消费。
取消自动提交
每次消费完或者程序退出时手动提交。这可能也没法保证一条不重复
下游做幂等
一般是让 下游做幂等或者尽量每消费-条消息都记录offset, 对于少数严格的场景可能需要把offset或唯一ID (例如订单ID)和下游状态更新放在同一个数据库里面做事务来保证精确的一次更新或者在下游数据表里面同时记录消费offset,然后更新下游数据的时候用消费位移做乐观锁拒绝旧位移的数据更新。
Zookeeper不适合?批量的频繁写?操作。
Kafka 1.0.2将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets主题,并且默认提供了kafka_consumer_groups.sh脚本供?户查看consumer信息。
创建topic “tp_test_01”
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1使?kafka-console-producer.sh脚本?产消息
[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt由于默认没有指定key,所以根据round-robin?式,消息分布到不同的分区上。 (本例中?产了100条消息)
验证消息?产成功
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --brokerlist node1:9092 --topic tp_test_01 --time -1tp_test_01:2:20tp_test_01:4:20tp_test_01:1:20tp_test_01:3:20tp_test_01:0:20[root@node1 ~]#结果输出表明100条消息全部?产成功!
创建?个console consumer group
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning获取该consumer group的group id(后?需要根据该id查询它的位移信息)
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list查询__consumer_offsets topic所有内容
注意:运?下?命令前先要在consumer.properties中设置exclude.internal.topics=false
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。
计算指定consumer group在__consumer_offsets topic中分区信息
这时候就?到了第5步获取的group.id(本例中是console-consumer-49366)。Kafka会使?下?公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions对应的分区=Math.abs("console-consumer-49366".hashCode()) % 50 = 19,即__consumer_offsets的分区19保存了这个consumer group的位移信息。
获取指定consumer group的位移信息
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"下?是输出结果:
...[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212][console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212][console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]...上图可?,该consumer group果然保存在分区11上,且位移信息都是对的(这?的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这?的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每??志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]。