|
对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
交易
本节介绍了 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0客户端库增加了对事务的支持。 Spring for Apache Kafka 通过以下方式增加了支持:
-
KafkaTransactionManager: 与正常的 Spring 事务支持一起使用 (@Transactional,交易模板,等等) -
事务
KafkaMessageListenerContainer -
本地交易
卡夫卡模板 -
与其他事务管理器的事务同步
通过提供默认KafkaProducerFactory其中transactionId前缀.
在这种情况下,与其管理单一共享制作人工厂维护着一批交易生产者。
当用户呼叫时接近()在生产者中,它会被返回缓存进行重复利用,而不是被关闭。
这transactional.id每个生产者的属性为transactionId前缀 + n哪里n开头为0并且会根据每新增一个生产者而增加。
在之前版本的 Apache Kafka Spring 中,transactional.id为由带有记录监听器的监听器容器启动的交易,以支持围栏僵尸,但现在已不再必要,且EOSMode.V2从3.0开始,是唯一的选择。
对于运行多个实例的应用程序,transactionId前缀每个实例必须是唯一的。
另见“恰好一次语义学”。
使用 Spring Boot 时,只需设置spring.kafka.producer.transaction-id-prefix属性 - Spring Boot 会自动配置KafkaTransactionManager然后接线到监听器容器里。
从2.5.8版本开始,你现在可以配置最大年龄生产工厂的财产。
这在使用可能为经纪人闲置的交易型生产者时非常有用transactional.id.expiration.ms.
与当前的卡夫卡客户端,这可能导致制片人受限例外没有重新平衡。
通过设置最大年龄减少到transactional.id.expiration.ms工厂会刷新生产者,如果超过其最大寿命。 |
用KafkaTransactionManager
这KafkaTransactionManager是 Spring Framework 的实现PlatformTransactionManager.
其构建器中附有生产工厂的引用。
如果你提供定制生产工厂,它必须支持交易。
看ProducerFactory.transactionCapable().
你可以使用KafkaTransactionManager支持正常的 Spring 事务 (@Transactional,交易模板,以及其他。
如果交易是活跃的,任何卡夫卡模板在事务范围内执行的作使用事务的制作人.
管理者根据成功或失败来提交或回滚该事务。
你必须配置卡夫卡模板用同样的生产工厂作为交易管理器。
事务同步
本节涉及仅生产者事务(未由监听器容器发起的交易);有关容器开始交易时串联交易的信息,请参见“使用消费者发起的交易”。
如果你想把记录发送到 Kafka 并进行一些数据库更新,你可以用普通的 Spring 事务管理,比如DataSourceTransactionManager.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
拦截机用于@Transactional注释启动事务,且卡夫卡模板将与该事务管理器同步交易;每个发送者都会参与该交易。
当方法退出时,数据库事务将提交,随后是 Kafka 事务。
如果你希望提交顺序相反(先卡夫卡),可以使用嵌套@Transactional方法,外层方法配置为使用DataSourceTransactionManager,以及配置为使用KafkaTransactionManager.
参见“与其他事务管理器的 Kafka 事务示例”,了解一个应用程序在 Kafka 优先或数据库优先配置下同步 JDBC 和 Kafka 事务的示例。
| 从2.5.17、2.6.12、2.7.9和2.8.0版本开始,如果主事务提交后同步事务提交失败,异常将抛给调用者。 此前,这些信息被默许(在调试级别记录)。 申请应采取补救措施(如有必要),以补偿已承诺的主要交易。 |
使用消费者发起的交易
这ChainedKafkaTransactionManager自2.7版本起已被弃用;参见 JavaDocs 的超级类ChainedTransactionManager更多信息请见。
相反,使用一个KafkaTransactionManager在容器中启动Kafka事务,并对监听器方法进行注释@Transactional开始另一笔交易。
请参阅“与其他事务管理器的 Kafka 交易示例”,这是一个串联 JDBC 和 Kafka 事务的示例应用。
卡夫卡模板本地交易
你可以使用卡夫卡模板在本地事务中执行一系列作。
以下示例展示了如何实现:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数就是模板本身 (这).
如果回调正常退出,则该事务被提交。
如果抛出异常,交易会被回滚。
如果存在KafkaTransactionManager(或同步)事务正在进行中,但不被使用。
取而代之的是使用新的“嵌套”事务。 |
TransactionIdPrefix
跟EOSMode.V2(又名试用版),这是唯一支持的模式,现在不再需要使用相同的模式transactional.id即使是消费者发起的交易;事实上,每个实例上必须与生产者发起的交易一样唯一。
该属性在每个应用实例上必须有不同的值。
TransactionIdSuffix 已修复
自3.2版本起,新的交易Id后缀策略引入接口以管理transactional.id后缀。
默认实现为DefaultTransactionIdSuffixStrategy设置时maxCache大于 0 可以重复使用transactional.id在特定范围内,否则通过递增计数器即时生成后缀。
当请求交易生产者时transactional.id全部使用,投掷a无生产者可用例外.
用户随后可以使用重试模板配置为重新尝试该异常,并设置相应的退回。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
设置时maxCache到5,transactional.id是我的.txid。+'{0-4}'。
使用KafkaTransactionManager其中ConcurrentMessageListenerContainer以及赋能maxCache,必须设maxCache到一个大于或等于 的值并发.
如果MessageListenerContainer无法获得transactional.id后缀,它会抛出无生产者可用例外.
在使用 嵌套事务时ConcurrentMessageListenerContainer需要调整maxCache设置以应对嵌套事务数量的增加。 |
卡夫卡模板交易式与非事务式出版
通常,当卡夫卡模板是交易型(配置为具备交易能力的生产者工厂),则需要交易。
交易可以由交易模板一个@Transactional方法,调用执行交易,或通过监听器容器,配置为KafkaTransactionManager.
任何试图在交易范围之外使用该模板的行为,都会导致模板抛出非法州例外.
从2.4.3版本开始,你可以设置模板允许非交易属性到true.
在这种情况下,模板将允许作无需事务即可运行,方法是调用生产工厂的createNonTransactionalProducer()方法;生产者将像往常一样被缓存或线程绑定以便重用。
看用默认KafkaProducerFactory.
与批处理监听器的事务
当监听器在交易使用时失败,AfterRollback处理器在回滚发生后被调用以采取某些作。
使用默认时AfterRollback处理器在唱片听众中,会进行寻曲,以便将失败的唱片重新交付。
然而,使用批处理监听器时,整个批次会被重新交付,因为框架不知道批次中哪个记录失败了。
更多信息请参见后回滚处理器。
使用批处理监听器时,2.4.2 版本引入了处理批处理失败的替代机制:批处理记录适配器.
当集装箱工厂批处理听器设置为 true,配置为批处理记录适配器,听者一次调用一条记录。
这允许批处理错误,同时根据异常类型停止整个批处理。
默认批处理记录适配器提供的,可以配置为标准消费者记录恢复器例如:死信出版恢复者.
以下测试用例配置摘要展示了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}