重新平衡听众

容器属性有一个性质,称为consumerRebalanceListener,该实现取了 Kafka 客户端的消费者RebalanceListener接口。 如果没有提供该属性,容器会配置一个日志监听器,记录在信息水平。 该框架还增加了一个子接口消费者意识RebalanceListener. 以下列表显示了消费者意识RebalanceListener界面定义:spring-doc.cadn.net.cn

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

注意,当分区被撤销时,有两次回调。 第一个人马上被叫来。 第二个是在提交任何待处理的抵消后调用。 如果你想在某个外部仓库中保持偏移量,这非常有用,如下示例所示:spring-doc.cadn.net.cn

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从2.4版本开始,采用了一种新方法onPartitionsLost()已被添加(类似于同名的方法,在ConsumerRebalanceLister). 默认实现于ConsumerRebalanceLister简单呼叫在被取消的分区上. 默认实现于消费者意识RebalanceListener什么都没做。 在为监听器容器提供自定义监听器(任一类型)时,重要的是你的实现不要调用在被取消的分区上onPartitionsLost(分区丢失). 如果你实现消费者RebalanceListener你应该覆盖默认方法。 这是因为监听器容器会调用自己的在被取消的分区上从其实现onPartitionsLost(分区丢失)在调用实现中的方法后, 如果你的实现委托给默认行为,在被取消的分区上每次消费者调用容器监听器上的该方法。

Kafka 4.0 消费者再平衡协议

Spring for Apache Kafka 4.0 支持 Apache Kafka 4.0 的新消费者重平衡协议(KIP-848),通过服务器驱动的增量分区分配提升性能。 这减少了消费者群体的重新平衡停机时间。spring-doc.cadn.net.cn

要启用新协议,请配置group.protocol财产:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.group.protocol=consumer

请记住,上述房产属于春季靴房产。 如果你没有使用Spring Boot,建议手动设置,如下图所示。spring-doc.cadn.net.cn

或者,你可以用程序方式设置:spring-doc.cadn.net.cn

Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

新协议与以下协议无缝兼容消费者意识RebalanceListener. 由于渐进式再平衡,onPartitionsAssigned可以多次调用,使用更小的分区集,这与传统协议中典型的单一回调不同。spring-doc.cadn.net.cn

新协议采用服务器端分区分配,忽略客户端自定义分配器通过spring.kafka.consumer.partition-assignment-strategy. 如果检测到自定义分配器,会记录警告。 要使用自定义分配器,设置group.protocol=classic(如果你没有指定 的值,这是默认的group.protocol).spring-doc.cadn.net.cn