执行消费者再平衡

Kafka客户端现在支持触发强制重配的选项。 从版本开始3.1.2Spring for Apache Kafka 提供了通过消息监听器容器调用 Kafka 消费者的选项。 调用该 API 时,它只是提醒 Kafka 用户触发强制再平衡;实际的再平衡只会在下一次时发生poll()操作。 如果已经在进行再平衡,强制再平衡是不允许的。 叫权者必须等待当前的再平衡完成后才能调用另一次再平衡。 参见javadoc强制执行Rebalance更多细节请阅读。spring-doc.cadn.net.cn

以下代码片段展示了使用消息监听器容器强制重新平衡的本质。spring-doc.cadn.net.cn

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如上文代码所示,该应用程序使用KafkaListenerEndpointRegistry以获取消息监听器容器的访问权限,然后调用强制执行RebalanceAPI 上。 当调用强制执行Rebalance在听取器容器中,它将调用委派给底层的 Kafka 消费者。 Kafka消费者将在下一次中触发一次再平衡poll()操作。spring-doc.cadn.net.cn