|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
Kafka 队列(共享消费者)
从4.0版本开始,Spring for Apache Kafka通过共享消费者支持Kafka队列,这些共享消费者是Apache Kafka 4.0.0的一部分,并实现了KIP-932(Kafka队列)。 该功能目前处于抢先体验阶段。
Kafka 队列实现了与传统消费者组不同的消费模型。与基于分区的分配模型不同,共享消费者可以从同一分区协作使用,记录在共享组内的消费者之间分配。
股份消费者工厂
这ShareConsumerFactory负责创建共享消费者实例。Spring Kafka 提供默认共享消费者工厂实现。
配置
你可以配置一个默认共享消费者工厂类似于你配置普通机的方式消费者工厂:
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
制造商选项
这默认共享消费者工厂提供多种构造器选项:
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
解串器配置
你可以用多种方式配置解串器:
-
通过配置属性(推荐用于简单情况):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -
通过赛特斯:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer()); -
通过提供商(适用于需要为每个消费者创建反串化器的情况):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
设置配置解串器自false如果你的反串化器已经完全配置好,工厂不应该重新配置。
生命周期听众
你可以添加监听者以监控共享消费者的生命周期:
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
分享消息监听器容器
ShareKafkaMessageListenerContainer
这ShareKafkaMessageListenerContainer为共享消费者提供一个容器,支持并发处理:
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
容器属性
共享容器支持面向普通消费者的部分容器属性:
-
主题:订阅主题名称数组 -
组ID: 分享群ID -
客户标识:消费者的客户端ID -
kafka消费者属性:额外消费物业
|
共享消费者不支持:
|
并发
这ShareKafkaMessageListenerContainer通过在单个容器内创建多个消费者线程,支持并发处理。每个线程运行自己的线程ShareConsumer参与同一个分享组的实例。
与传统的消费者组中并发涉及分区分发不同,共享消费者在经纪人处利用Kafka的记录级分发。这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,Kafka代理在所有消费者实例中分发记录。
|
并发是跨应用实例的加法 从分享小组的角度来看,每一个 例如: * 应用实例1: 这意味着设置 |
程序化配置并发
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
通过工厂配置并发
你可以在工厂层面设置默认并发,这适用于该工厂创建的所有容器:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
每监听器并发
并发设置可以通过以下方式为每个监听者覆盖并发属性:
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
并发考虑
-
线程安全:每个消费者线程都有自己的
ShareConsumer实例并独立管理自己的确认 -
客户端ID:每个消费者线程都会获得一个带有数字后缀的唯一客户端ID(例如,
我的容器-0,我的容器-1,等等) -
指标:所有消费者线程的指标汇总并可通过以下方式访问
container.metrics() -
生命周期:所有消费者线程作为一个整体一起开始和终止
-
工作分发:Kafka 代理处理共享组中所有消费者实例的记录分发
-
显式确认:每个线程独立管理其记录的确认;一个线程中的未确认记录不会阻挡其他线程
与显式确认的并发
并发与显式确认模式无缝协作。 每个消费者线程独立跟踪并确认其自身记录:
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
|
记录获取与分发行为: 共享消费者采用拉取模型,每个消费者线程调用
虽然记录由一位消费者获得,但其他消费者无法获取。 当获取锁到期时,未确认的记录会自动恢复为“可用”状态,并可交付给其他消费者。 代理通过以下方式限制每个分区可获取的记录数量 并发的启示:
配置:
|
注释驱动的听众
与共享消费者的@KafkaListener
你可以使用@KafkaListener通过配置共享消费者ShareKafkaListenerContainerFactory:
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
然后用它来做你的听众:
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
共享组偏移重置
与普通消费者组不同,共享组在偏移重置行为上使用不同的配置。 你可以用程序方式配置:
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
记录致谢
共享消费者支持两种确认模式,分别控制记录处理后如何确认。
隐式确认(默认)
在隐式模式下,记录会根据处理结果自动确认:
成功处理:记录被确认为接受处理错误:记录被确认为拒绝
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
明确确认
在显式模式下,应用程序必须使用提供的 ShareAcrevelgment 手动确认每个记录。
有两种方式可以配置显式确认模式:
选项1:使用 Kafka 客户端配置
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
选项二:使用 Spring 容器配置
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}
确认类型
共享用户支持三种确认类型:
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry
ShareAcknowledgegment API
这共享确认界面提供了显式确认的方法:
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
}
监听器接口
共享消费者支持针对不同用例的专用监听器接口:
基础消息监听器
对于简单情况,使用标准的MessageListener:
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
AcknowledgeledgingShareConsumerAwareMessageListener
该接口提供访问ShareConsumer实例支持可选的确认。
确认参数是可空的,取决于容器的确认模式:
隐式模式示例(确认为空)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认非空)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
确认约束
在显式确认模式下,容器会强制执行重要的约束:
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. One-time Acknowledgment: Each record can only be acknowledged once. Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
| 在显式模式下,未确认记录将阻止后续消息处理。 务必确保所有代码路径中的记录都被确认。 |
确认超时检测
为了帮助识别缺失的确认,Spring Kafka 提供了可配置的超时检测。 当记录在指定的超时内未被确认时,会记录一个警告,并记录该未确认记录的详细信息。
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
当记录超过超时时间时,你会看到类似的警告:
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record.
此功能帮助开发者快速识别代码中缺失确认调用的情况,避免了因忘记确认而出现“Spring Kafka 不再消耗新记录”的常见问题。
致谢示例
混合确认模式
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
条件确认
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
毒消息保护与投递计数
KIP-932包含经纪人端毒讯保护,防止不可处理的记录被无限期重投。
工作原理
每当股票组中的消费者获取记录时,经纪人会增加内部交付次数。 第一次收购将交付次数设为1,之后每次收购都会递增。 当投递计数达到配置上限(默认:5)时,记录会进入归档状态,不再有资格进行额外的投递尝试。
配置
可通过管理员API配置每个共享组的最大投递尝试次数:
private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
// Default is 5, adjust based on your retry tolerance
ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
|
交付次数不会暴露于应用程序 配送数量由经纪人内部管理,不对消费者应用开放。 这是KIP-932中有意设计的决定。 投递计数是近似值,作为毒报保护机制,而非精确的重投计数器。 应用程序无法通过任何API查询或访问该值。 对于应用级重试逻辑,请使用以下确认类型:
经纪人会自动防止一次无休止的重新投递 |
重试策略建议
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.attempt.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
经纪人的毒消息保护确保即使你总是使用也没问题释放对于错误,记录不会被无限次重试。
当发送尝试次数超过后,这些邮件将自动归档。
与普通消费者的区别
股份消费者与普通消费者在几个关键方面有所不同:
-
无分区分配:共享消费者不能被分配特定分区
-
无主题模式:分享消费者不支持订阅主题模式
-
协作式消费:同一共享组中的多个消费者可以同时从同一分区消费
-
记录级确认:支持明确确认
接受,释放和拒绝类型 -
不同的组管理:共享组使用不同的协调器协议
-
无批量处理:共享消费者的记录单独处理,而非批量处理
-
经纪人端重试管理:送货次数跟踪和毒讯保护由经纪人管理,不暴露给应用