如何保证kafka的可靠性、幂等性和有序性

【Kafka系列】如何保证Kafka的可靠性、幂等性和有序性 - 知乎 (zhihu.com)

可靠性

生成者消息丢失

Producer负责将消息写入到Kafka的Broker中。如果当网络情况出现异常或者Kafka的Broker出现宕机导致Kafka Broker未收到Producer发送的消息。那么在这种情况下,Kafka认为自己的消息已经到达了Broker,而Broker却并未收到Producer发送的消息,此时就出现了消息丢失的情况

解决方案,上面的问题非常简单,就是producer和broker之间缺乏一种消息确认机制。如果带那个broker收到消息之后发送一个ACK给producer消息通知收到就可以了。对于producer来说,如果消息长时间无应答就代表消息未送达给broker,此时可以采取消息重发等策略。需要注意的是,采用ack机制会降低kafka的吞吐量,因为producer需要等待broker确认之后才能发送后续消息。

  • application.yml配置消息确认次数:
spring.kafka.producer.acks=1 # 可选,设置 ACK 级别(0、1、all)
  • 示例代码:
//消息生产者设置发送消息的监听事件
@Component
public class ProducerTest {
    @Autowired
    KafkaTemplate<String,Object> kafkaTemplate;
    
    public void send(String msg){
        kafkaTemplate.setProducerListener(new ProducerListenerTest());
        kafkaTemplate.send("topic",msg);
    }
}

//消息发送失败处理
public class ProducerListenerTest implements ProducerListener {
    @Override
    public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
        //消息发送成功,一般可以打印日志
    }

    @Override
    public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
        //消息发送失败,可以打印日志、重发消息、写入数据库等
    }
}

broker消息丢失

为了提升Kafka消息写入的效率和性能,当Broker收到Producer发送的消息之后不会立马写入磁盘。而是通过page cache缓存机制先将数据写入内存,当page cache满时在统一刷盘进行持久化。如果当page cache未满的时候Broker出现了宕机,那么所有未刷盘的消息就会出现丢失,这其实也是一种Broker的单点故障问题。

解决方法

显然Broker消息丢失的问题是由于page cache未刷盘时Broker故障导致的,那么只要设置消息写入不止一个Broker就可以在一定程度上避免Broker的消息丢失问题。

  • application.yml配置
spring.kafka.producer.acks=2 # 一个分区,2个副本均写入成功时,才响应Producer写入成功

消费者消息丢失

Producer的消息已经发送成功,Broker消息持久化成功,自然而然的Consumer肯定能够成功的拉取到消息,那么为什么Consumer端也可能造成消息丢失呢?

实际上,当Consumer错误操作offset的情况下是可能造成消息丢失的。正常情况下,当消息消费成功之后,kafka客户端会自动提交offset到本地,并且根据新的offset再去broker中拉取数据。然而,当consumer消费过程中出现异常导致offset提交失败,那么就有可能造成消息丢失的情况。

案例一:消息批量封装到线程池

问题:

Kafka为了减少消息堆积,Consumer直接将消息处理封装为一整个Task丢到线程池中,此时就默认已经提交了offset,如果线程池处理过程中抛出异常,就会导致线程池中剩下的消息丢失:

public class KafkaConsumerService {

    private final ThreadPoolExecutor threadPoolExecutor;

    public KafkaConsumerService() {
        this.threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
    }

    @KafkaListener(id = "consumerGroup", topics = "your-topic", containerFactory = "batchFactory")
    public void listen(List<ConsumerRecord<String, String>> records) {
        Runnable task = () -> { //将Kafka消息批量封装在task中丢入线程池
            try {
                for (ConsumerRecord<String, String> record : records) {
                    processMessage(record);
                }
            } catch (Exception e) {
                // 处理异常,可以记录日志或采取其他措施
                e.printStackTrace();
            }
        };
        threadPoolExecutor.submit(task);
        // 默认提交 offset,如果线程池中的任务处理异常,消息可能会丢失
    }

    private void processMessage(ConsumerRecord<String, String> record) {
        String message = record.value();
        // 模拟处理消息的业务逻辑
        System.out.println("Received and processed message: " + message);
    }
}

解决方案:

要解决消息丢失问题,可以使用 Spring Kafka 提供的事务功能和手动提交每次的offset来解决这个问题,确保在消息处理期间的异常情况下不会丢失消息。以下是如何修改你的代码以实现这一点:

@EnableKafka
@EnableTransactionManagement
public class KafkaConsumerService {

    private final KafkaTemplate<String,String> kafkaTemplate;

    public KafkaConsumerService(KafkaTemplate<String,String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(id = "consumerGroup", topics = "your-topic", containerFactory = "batchFactory")
    @Transactional
    public void listen(List<ConsumerRecord<String, String>> records) {
        try {
            for (ConsumerRecord<String,String>; record : records) {
                processMessage(record);
            }
        } catch (Exception e) {
            // 处理异常,可以记录日志或采取其他措施
            e.printStackTrace();
        }
    }

    private void processMessage(ConsumerRecord&lt;String, String&gt; record) {
        String message = record.value();
        // 模拟处理消息的业务逻辑
        System.out.println("Received and processed message: " + message);
        // 手动提交 offset
        kafkaTemplate.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)), "consumerGroup");
    }
}

这段代码中使用了 Spring Kafka 的事务管理功能。当异常发生时,事务会回滚,确保消息不会丢失。同时,在成功处理每条消息后,手动提交 offset,以确保不会重复消费消息。你需要配置适当的事务管理器以及事务的属性,以便使事务生效。确保在项目中配置了 Kafka 事务管理器,并在消费者中使用 @Transactional 注解来启用事务。

案例二:错误使用try-catch,导致一场捕获的同时也提交了offset

原因:

在没有设置手动提交offset的情况下,当消息被处理的时候会自动提交offset,但是当消息处理过程中出现了异常,而offset又已经被提交,那么就会导致当前这条消息丢失。

@EnableKafka
public class KafkaConsumerService {

    @KafkaListener(id = "consumerGroup", topics = "your-topic")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        String message = record.value();
        try {
            // 模拟处理消息的业务逻辑
            processMessage(message);
        } catch (Exception e) {
            // 处理异常,可以记录日志或采取其他措施
            e.printStackTrace();
        }
    }

    private void processMessage(String message) {
        // 模拟消息处理逻辑,例如打印消息,这里消息在处理的时候会自动提交offset
        System.out.println("Received and processed message: " + message);
        i = 1 /0 ; //这里模拟处理程序抛出异常
    }
}

解决方法:

解决方法也很简单,只需要在处理成功的时候才手动提交offset即可。

@EnableKafka
public class KafkaConsumerService {

    @KafkaListener(id = "consumerGroup", topics = "your-topic")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        String message = record.value();
        try {
            // 模拟处理消息的业务逻辑
            processMessage(message);
            // 手动提交 offset
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // 处理异常,可以记录日志或采取其他措施
            e.printStackTrace();
        }
    }

    private void processMessage(String message) {
        // 模拟消息处理逻辑,例如打印消息,这里消息在处理的时候会自动提交offset
        System.out.println("Received and processed message: " + message);
        i = 1 /0 ; //这里模拟处理程序抛出异常
    }
}

幂等性

消费者幂等

常见的消息队列如Rabbit MQ、Rocket MQ和Kafka等都有可能出现消息重复消费的问题,因为这个问题不是MQ来保证的,而是需要开发者自己来保证消息不会被重复消费。

再kafka中有offset的概念,每个消息在写入broker的时候都会有一个offset来代表消息序号,并且log文件的名字也是消息序号offset.log。consumer在消费数据之后,每隔一段时间(定期)会将消费过的offset提交到kafka,表示已经被消费过了,consumer重启之后可以从原来的offset进行消费。但是,如果consumer出现意外宕机或者直接被手动kill掉进程了,就会导致有些已经被消费过的消息,其offset还没来得及提交,这就会导致有些消息在重启之后会被再消费一次

如上图所示的案例:

  1. 数据1、数据2、数据3依次被push到Kafka中,Kafka会为这3条数据分配offset,依次为152、153、154。
  2. consumer按照顺序对数据进行消费,假设消费到offset=153时还未提交offset就被重启,那么此时消费过的152和153数据的offset并未被提交,Kafka就不知道这两个数据已经被消费过了。
  3. consumer重启之后会去Kafka中拉取offset,就会导致数据152、153被重复消费,如果消费者的任务是将消息写入数据库,那么此时就会出现数据重复。

如何保证消费者幂等

消息的重复消费从机制上来说是可以避免的,就是每次消费完成之后进行消息确认并更新offset,这样就能保证每次提交的offset都是最新的位置,但是这样一来消费者消费效率会降低。另外一方面可以从业务层面对消息进行去重。实际上重复消费不可怕,可怕的是数据重复,只要我们能保证最终的数据结果是唯一的即可。

我们可以为消息声称一个全局的唯一标志id,当消费者消费消息的时候先手动从本地的唯一id来判断消息是否已经被消费过了来避免重复消费的问题,代码如下所示:

public class KafkaConsumerService {

    @KafkaListener(id = "consumerGroup", topics = "my-topic")
    public void listen(ConsumerRecord<String, String> record) {
        // 获取消息的唯一标识符(假设消息中包含了 ID)
        String messageId = record.key();

        // 检查消息是否已经处理过
        if (!isMessageProcessed(messageId)) {
            // 处理消息
            processMessage(record.value());
            
            // 标记消息为已处理
            markMessageAsProcessed(messageId);
        }
    }

    private void processMessage(String message) {
        // 实现消息处理逻辑
        System.out.println("Processing message: " + message);
    }

    private boolean isMessageProcessed(String messageId) {
        // 查询是否已经处理过消息,可以使用数据库、缓存或其他存储来记录已处理的消息
        // 返回 true 表示已经处理过,返回 false 表示未处理
        return false;
    }

    private void markMessageAsProcessed(String messageId) {
        // 将消息标记为已处理,以避免重复处理
        // 可以更新数据库、缓存或其他存储来记录已处理的消息
    }
}

消息去重

这个方案需要与具体的业务相结合进行讨论,例如:如果你的消费者就是将消息写入到MySQL数据库中,那么可以先根据“数据库主键”或者“布隆过滤器”来进行查询,以保证消息最终的幂等性。当需要确保消息的幂等性时,可以根据不同的场景采取相应的策略,以下是几个例子:

  • 数据库操作:如果你需要将数据写入数据库,可以在写入之前先根据主键或唯一键查找记录。如果记录已经存在,可以选择执行更新操作而不是插入。这样可以确保不会出现重复数据。
  • Redis:在 Redis 中,每次都是使用 SET 命令,这本身天然支持幂等性,因为 SET 命令无论是设置新值还是更新已存在的键,都不会导致数据的重复。
  • 自定义标识符:如果你处理的场景不适用于上述两种情况,你可以要求生产者在每条消息中添加一个全局唯一的标识符,例如订单 ID。在消费者端,你可以根据这个标识符去查询一个持久化存储(如 Redis),检查消息是否已经被处理过。如果消息尚未被处理,执行消息处理逻辑,并将该标识符写入存储中。如果消息已经处理过,可以选择跳过处理,以确保不会重复处理相同的消息。
  • 数据库唯一键:如果你使用数据库来存储数据,可以利用数据库的唯一键约束。通过设置唯一键,可以确保在尝试插入重复数据时会触发唯一键约束,而不会导致数据库中出现重复或脏数据。

通过采取这些策略,你可以有效地保证消息的幂等性,无论是在数据库操作、Redis 写入还是其他场景中。

生产者幂等性

producer的幂等性指的是当发送相同消息时,数据在服务端进会被持久化一次,确保数据不会丢失也不会重复存储。

  1. 幂等性仅能在producer单个会话内得到保障,如果producer遇到意外故障并重新启动跨会话的幂等性无法保证。因为在幂等条件下,无法获取之前状态信息,所以不能实现会话间的不丢失和重复
  2. 幂等性不能跨多个topic-partition的幂等性。当涉及多个topic-partition时,它们之间的状态不会同步。

如果需要实现跨多个Topic-partition的幂等性,需要考虑使用kafka的事务机制

要解决的问题

在探讨如何实现 Producer 的幂等性之前,我们首先要理解幂等性的出发点:它是为了解决什么问题而存在的?

在正常情况下,Producer向Broker推送消息,Broker将消息写入到某一Topic的Partition中,并向Producer返回ACK信号表示消息收到:

然而,由于网络或其它原因,Producer和Broker之间通讯可能会出现异常,从而导致ACK在途中丢失,这样就会造成Producer再次推送消息导致消息重复:

在 Kafka 0.11.0 之前,Kafka 已经能够通过 Producer 和 Server 端的相关配置来保证消息不会丢失,即至少一次交付(at least once)。然而,像上面例子中消息的重复传递的情况,却无法靠Kafka自身解决。

虽然,对于大多数应用程序来说,确保消息不会丢失已经足够满足需求。但对于某些应用场景,如支付数据等,它们需要精确的消息计数。在这种情况下,如果上游数据存在重复,下游应用程序必须在处理消息时执行去重操作。一种常见的去重方法是使用唯一标识键,根据此键来检查消息是否重复。

在这种情况下,因为上游生产者可能引起消息重复问题,这就需要所有需要精确计数的下游应用执行复杂的去重操作。现在,想象一下,如果系统在消息发送时就能够确保"仅一次交付",那对于下游应用来说将是多么大的解脱。这正是幂等性的目标,主要是为了解决生产者发送消息重复的问题。

Producer如何保证幂等性

配置:在最新的kafka中通过producer配置enable.idmpotence就可以实现消息的幂等性,它会确保相同的消息在发送时只会被写入一次,即使生产者发生重试或失败。当启用幂等性时,producer会自动为每个消息分配一个序列号,并在Broker上维护一个日志,每个producer发送的消息序列号,以确保不会写入重复消息。

在 application.yml中,你可以配置 Kafka Producer 幂等性如下:

spring:
  kafka:
    producer:
      # 配置生产者在接收到消息后的应答方式。acks=1 表示只需要 Leader 分区确认消息写入即可。
      acks: 1
      # 启用 Producer 的幂等性配置,这个属性设置为 true。
      enable-idempotence: true      

producer其幂等性的视线原来主要是kafka加入了2个标记值:

  • PID:在producer初始化分配的时候,作为每个producer会话的唯一标识
  • 序列号(Sequence Number):producer发送的每条消息都会带有序列号,从0开始递增,Broker通过序列号来判断消息是否可以接受;

Broker会为每个Topic partition组合维护PID和序列号。对每条接受到的消息,都会检查它序列号是否比Broker所维护的值严格+1,只有这样才是合法的,其他情况都会丢弃。如下图所示,加上PID和sequence number之后,Broker就会检测到有两条PID = 100且seq = 1的消息写入了Partition,并忽略掉重发的那一条,成功避免了重复。

上面所说的幂等性保证了最细粒度的消息不重不漏,具体来说,它确保了以下几个方面的情况:

  • 单个Producer会话:在单个Producer会话内,消息不会重复发送,即使Producer重启,也能保证消息不重复。这是通过PID(Producer ID)来实现的,PID唯一标识了Producer的会话。
  • 单个Topic Partition级别:对于单个Topic Partition(主题分区),消息也不会重复,确保了在特定分区内的幂等性。

然而,幂等性的保证在以下情况下会失效:

  • Producer重启(PID发生变化):如果Producer重启,PID会发生变化,这可能导致某些消息在新PID下被认为是不同的消息,从而无法保证幂等性。
  • 跨Topic和跨Partition:如果消息涉及跨多个Topic或不同的分区,幂等性保证可能失效,因为Producer会话在不同的Topic和Partition之间不共享。

需要注意的是,实现事务性要比实现幂等性复杂得多,需要协调组件(Transaction Coordinator)来确保跨多个Producer会话、多个Topic和多个Partition的事务性操作。虽然实现事务性更复杂,但它可以提供更高级别的一致性和可靠性,适用于需要严格事务性保证的应用场景。

有序性

什么是消息有序性

消息队列(MQ)中的消息有序性指的是确保消息按照特定顺序进行传递和处理,如对于支付系统来说通常要保障消息在时间上的有序性。消息的有序性通常可以分为全局有序和局部有序两种情况:

全局有序: 在一个Topic中的所有消息都需要按照它们被生产的顺序进行消费,确保全局有序性;

局部有序: 在一个Topic中,只有同一个业务字段的消息需要按照它们被生产的顺序进行消费。例如,如果Topic中的消息代表订单的流水记录,包含订单orderId,业务要求只有相同orderId的消息需要保持生产顺序的消费;

全局消息有序性

Kafka的一个Topic可以被分成多个Partition,而Producer发送消息时,这些消息会被分散存储到不同的Partition中。即使Producer按照顺序发送消息到Broker,一旦消息进入Kafka,它们就不一定会按照相同的顺序进入相同的Partition,这可能导致消息的顺序在不同Partition中变得混乱。Kafka 本身并不提供全局有序性的保证,因为 Kafka 的设计目标之一是提供高吞吐量和分布式处理的能力,而不是强制性的全局有序性。

单一topic有序性

将一个特定的 Topic 配置为只有一个 Partition。这样,消息将被按照发送的顺序存储和传递到这个 Partition 中,从而保持了该 Partition 内的全局有序性。但这会牺牲 Kafka 的并行性和吞吐量,因为所有的消息都只能由一个 Partition 处理。

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-servers");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 只设置一个Partition
        configProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, FixedPartitioner.class.getName());

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

有序key

在Kafka中,key相同的消息会被强制放在痛一个分区里面,根据这个特点,可以在消息中使用有序的键(Key),并确保相同键的消息发送到相同的 Partition。这可以通过自定义分区策略来实现,确保具有相同键的消息都被分配到相同的 Partition,从而在该 Partition 中保持有序。

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-bootstrap-servers");
        //设置有序的key,来保证消息在partition中有序
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

严格的Acknowledgment

确保 Producer 等待每条消息得到 Kafka 的确认(acknowledgment)后才发送下一条消息,以确保消息的有序性。这种方法属于杀敌1000自损800的方法,因为将极大降低Kafka的性能,失去作为MQ的意义。

@Service
public class KafkaMessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                // 消息成功发送后的处理逻辑
            }

            @Override
            public void onFailure(Throwable ex) {
                // 消息发送失败后的处理逻辑
            }
        });
    }
}

需要注意的是,这些方法都会牺牲 Kafka 的分布式性能和可伸缩性,因此在实际应用中需要仔细权衡需求和性能。如果全局有序性对你的应用非常关键,那么 Kafka 可能不是最佳选择,你可能需要考虑其他消息队列系统或分布式数据存储,它们更强调全局有序性。

消费者保证有序消费

除了Producer需要保证讲消息全局有序的推送到Kafka中之外,Consumer消费的时候也需要采用单线程消费模型或者保证消息顺序的消费模型来保证消息消费的有序性。

如下图所示,如果消费者端采用多线程消费模型,那么即使partition中的消息是全局有序的,最后消费的过程中也会导致消息的顺序出现错乱:

Kafka消息局部有序性

Kafka的数据在同一个Partition下默认是有序的,但在多个Partition中不能保证其顺序性。Kafka适用于高吞吐的流式大数据,更容忍数据的松散顺序要求。如果Kafka要保证多个Partition的全局有序性,这会导致潜在性能问题。

举例来说,如果一个Partition发生了堵塞,为了保持有序性,其他分区也不能被消费。这会导致系统失去并发性,性能急剧下降。因此,Kafka采用多分区的概念,只保证单个分区内的消息有序。这种方式确保不同分区之间不会相互干扰,允许更好地平衡性能和顺序性需求。

  1. 总结 本文比较系统的介绍了MQ中的消息可靠性、幂等性和有序性分别是什么含义,同时介绍了Kafka是如何保证消息的可靠性、幂等性和有序性的。
Last modification:April 29, 2024
如果觉得我的文章对你有用,请随意赞赏