测试应用
这春季卡夫卡测试JAR包含一些有用的工具,帮助测试你的应用程序。
嵌入式Kafka代理
自从Kafka 4.0完全过渡到KRaft模式后,只有嵌入KafkaKraftBroker现已实现:
-
嵌入KafkaKraftBroker-使用牛皮纸在控制器和经纪人结合模式下。
如下章节所述,有几种配置经纪人的技术。
KafkaTestUtils
org.springframework.kafka.test.utils.KafkaTestUtils提供多种静态辅助方法,用于消耗记录、检索各种记录偏移量等。完整详情请参阅其 Javadocs。
JUnit
org.springframework.kafka.test.utils.KafkaTestUtils提供了一些静态方法来设置生产者和消费者属性。以下列表展示了这些方法签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
|
从2.5版本开始, 使用嵌入式中介时,通常最佳做法是每次测试使用不同的主题,以防止串扰。
如果因某种原因无法做到,请注意 |
|
Spring for Apache Kafka 不再支持 JUnit 4。 建议迁移到朱尼特木星。 |
这EmbeddedKafkaBroker类有一个实用方法,可以让你对它创建的所有主题进行消费。
以下示例展示了如何使用它:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
这KafkaTestUtils有一些实用方法可以从消费者那里获取结果。
以下列表展示了这些方法签名:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
以下示例展示了如何使用KafkaTestUtils:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当嵌入的Kafka代理由EmbeddedKafkaBroker,一个名为spring.embedded.kafka.brokers设置为卡夫卡经纪人的地址。
方便常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)为本物业提供了这些条件。
而不是默认spring.embedded.kafka.brokers系统属性,即Kafka经纪人的地址可以暴露在任何任意且方便的属性之下。
为此,一个Spring.嵌入.卡夫卡.brokers.property。 (EmbeddedKafkaBroker.BROKER_LIST_PROPERTY)系统属性可以在启动嵌入的卡夫卡之前设置。
例如,Spring靴 aspring.kafka.bootstrap-servers配置属性分别被设置为自动配置 Kafka 客户端。
所以,在用嵌入的Kafka在随机端口上运行测试之前,我们可以先设置spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers作为系统属性——以及EmbeddedKafkaBroker将用它来暴露其经纪人地址。
这现在是该属性的默认值(从版本 3.0.10 开始)。
与EmbeddedKafkaBroker.brokerProperties(Map<String, String>)你可以为Kafka服务器提供额外的属性。
有关可能的经纪人属性的更多信息,请参见 Kafka 配置。
主题配置
以下示例配置创建了名为猫和帽子有五个分区,一个主题为东西1有10个分区,以及一个名为东西2共有15个分区:
@SpringJUnitConfig
@EmbeddedKafka(
partitions = 5,
topics = {"cat", "hat"}
)
public class MyTests {
@Autowired
private EmbeddedKafkaBroker broker;
@Test
public void test() {
broker.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,addTopics当出现问题时(例如添加已存在的主题),会抛出例外。
2.6版本新增了该方法,返回Map<String,例外>;密钥是主题名称,值为零为了成功,或者例外为了失败。
使用同一代理处理多个测试类别
你可以用同一个代理进行多个测试类别,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaZKBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static volatile boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
synchronized (EmbeddedKafkaBroker.class) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
}
return embeddedKafka;
}
}
这假设使用Spring Boot环境,嵌入式代理取代了引导服务器的属性。
然后,在每个测试类别中,你可以使用类似的作:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果你没有使用 Spring Boot,可以通过以下方式获取引导式服务器broker.getBrokersAsString().
上述示例未提供在所有测试完成后关闭经纪人的机制。
如果你在Gradle守护进程中运行测试,这可能会成为问题。
在这种情况下你不应该使用这种技巧,或者你应该用某种东西来呼唤摧毁()在EmbeddedKafkaBroker等你的检查完成后。 |
从3.0版本开始,框架会暴露一个GlobalEmbeddedKafkaTestExecutionListenerJUnit平台;默认情况下是被禁用的。
这需要 JUnit 平台 1.8 或更高。
这位听众的目的是创建一个全球化的EmbeddedKafkaBroker整个测试计划都在计划结束时停止。
为了支持该监听器,从而为项目中的所有测试建立一个全局嵌入的 Kafka 集群,spring.kafka.global.embedded.enabled属性必须设置为true通过系统属性或 JUnit Platform 配置。
此外,还可以提供以下这些属性:
-
春季.卡夫卡.嵌入。计数- 需管理的Kafka经纪人数量; -
spring.kafka.embedded.ports- 每个Kafka中介起始端口(逗号分隔值),0如果偏好随机端口;取值数必须等于计数上述; -
spring.kafka.embedded.topics- 在起始 Kafka 集群中创建的主题(逗号分隔值); -
Spring.kafka.embedded.partitions- 为创建的主题提供分区数量; -
spring.kafka.embedd.broker.properties.location- 用于其他Kafka代理配置属性的文件位置;该属性的价值必须遵循Spring资源抽象模式。
本质上,这些特性模拟了一些@EmbeddedKafka属性。
关于配置属性的更多信息及如何提供,请参阅JUnit Jupiter用户指南。
例如,一个Spring.embedded.kafka.brokers.property=my.bootstrap-servers可以添加到一个junit-platform.properties(junit-platform.properties)文件存放在测试类路径中。
从3.0.10版本开始,经纪人会自动将此设置为spring.kafka.bootstrap-servers默认情况下,用于与 Spring Boot 应用程序的测试。
| 建议不要将全局嵌入式Kafka和每个测试类合并在单一测试套件中。 它们共享相同的系统属性,因此很可能导致意想不到的行为。 |
春季卡夫卡测试具有传递依赖于朱尼特-朱庇特-API和Junit平台发射器(后者用于支持全球嵌入式经纪人)。
如果你想使用嵌入式代理但不使用 JUnit,可能需要排除这些依赖。 |
@EmbeddedKafka注解
我们通常建议你使用单一代理实例,以避免在测试之间启动和停止代理(并且每次测试使用不同的主题)。
从2.0版本开始,如果你使用Spring的测试应用上下文缓存,还可以声明一个EmbeddedKafkaBroker因此,一个代理可以在多个测试类别中使用。
为了方便起见,我们提供了一个称为@EmbeddedKafka注册EmbeddedKafkaBroker豆。
以下示例展示了如何使用它:
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class TestKafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从2.2.4版本开始,你还可以使用@EmbeddedKafka注释以指定Kafka端口属性。
自4.0版本起,所有与ZooKeeper相关的属性已从该版本中移除。@EmbeddedKafka自《卡夫卡4.0》以来,注释专用于KRAFT。 |
以下示例设置主题,brokerProperties(brokerProperties)和经纪房产地点属性@EmbeddedKafka支持属性占位符决议:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在上述例子中,属性占位符为${Kafka.topics.another-topic},${kafka.broker.logs-dir}和${kafka.broker.port}从春季中解决环境.
此外,经纪人房产还会从中加载broker.properties由经纪房产地点.
属性占位符被解析为经纪房产地点以及资源中发现的任何属性占位符的网址。
性质定义为brokerProperties(brokerProperties)覆盖性质在经纪房产地点.
你可以使用@EmbeddedKafka与朱尼特·朱庇特的注释。
@EmbeddedKafka与JUnit Jupiter的注释
从2.3版本开始,有两种方式可以使用@EmbeddedKafka与朱尼特·朱庇特的注释。
当与@SpringJunitConfig注释,嵌入代理被添加到测试应用上下文中。
你可以在类别或方法层面自动将经纪人接入测试,获取经纪人地址列表。
当不使用春季测试上下文时,EmbdeddedKafkaCondition创建经纪人;该条件包含参数解析器,方便你在测试方法中访问代理。
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
除非类被注释,否则会创建一个独立的代理(不包含 Spring 的 TestContext@EmbeddedKafka也被注释(或元注释)为ExtendWith(SpringExtension.class).@SpringJunitConfig和@SpringBootTest因此,当这些注释同时存在时,将使用基于上下文的中介。
| 当有 Spring 测试应用上下文时,主题和代理属性可以包含属性占位符,只要属性在某处被定义,占位符就会被解决。 如果没有 Spring 上下文,这些占位符就无法解决。 |
嵌入式经纪人@SpringBootTest附注
Spring Initializr 现在会自动添加春季卡夫卡测试测试范围对项目配置的依赖。
|
如果你的应用使用了Kafka活页夹
|
在 Spring Boot 应用测试中使用嵌入式代理有多种方法。
包括:
@EmbeddedKafka跟@SpringJunitConfig
使用@EmbeddedKafka跟@SpringJUnitConfig,建议使用@DirtiesContext考试课上。
这是为了防止在测试套件中多次测试后,JVM关闭时出现潜在的竞赛条件。
例如,不使用@DirtiesContext这EmbeddedKafkaBroker在应用上下文仍需资源时,可能会提前关闭。
因为每一个嵌入卡夫卡测试运行会创建自己的临时目录,当出现该竞态条件时,会生成错误日志消息,表示试图删除或清理的文件已不可用。
添加@DirtiesContext确保每次测试后应用上下文被清理且不缓存,从而减少潜在资源竞态的脆弱性。
@EmbeddedKafka注释或EmbeddedKafkaBroker豆
以下示例展示了如何使用@EmbeddedKafka用于创建嵌入式代理的注释:
@SpringJUnitConfig
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
void test() {
...
}
}
这bootstrapServersProperty自动设置为spring.kafka.bootstrap-servers默认情况下,从3.0.10版本开始。 |
哈姆克雷斯特匹配者
这org.springframework.kafka.test.hamcrest.KafkaMatchers提供以下匹配器:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
断言J条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
示例
以下示例汇总了本章涵盖的大部分主题:
@EmbeddedKafka(topics = KafkaTemplateTests.TEMPLATE_TOPIC)
public class KafkaTemplateTests {
public static final String TEMPLATE_TOPIC = "templateTopic";
public static EmbeddedKafkaBroker embeddedKafka;
@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的例子使用了Hamcrest匹配器。
跟断言J,最后部分看起来如下代码:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
模拟消费者与制作人
这卡夫卡客户端图书馆提供模拟消费者和模拟制片人课程用于测试。
如果你想在某些测试中使用这些类,使用听者容器或卡夫卡模板从3.0.7版本开始,框架现提供模拟消费者工厂和模拟制作工厂实现。
这些工厂可以在监听器容器和模板中使用,而非默认工厂,后者需要运行(或嵌入)代理。
这里是一个简单实现返回单个消费者的示例:
@Bean
ConsumerFactory<String, String> consumerFactory() {
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
List<TopicPartition> topicPartitions = Collections.singletonList(topicPartition0);
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
.toMap(Function.identity(), tp -> 0L));
consumer.updateBeginningOffsets(beginningOffsets);
consumer.schedulePollTask(() -> {
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
new RecordHeaders(), Optional.empty()));
consumer.addRecord(
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
new RecordHeaders(), Optional.empty()));
});
return new MockConsumerFactory(() -> consumer);
}
如果你想测试并发,可以提供商工厂构造函数中的lambda每次都需要创建一个新的实例。
与模拟制作工厂,有两个构造子;一个用于创建简单工厂,另一个用于创建支持交易的工厂。
以下是一些例子:
@Bean
ProducerFactory<String, String> nonTransFactory() {
return new MockProducerFactory<>(() ->
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
}
@Bean
ProducerFactory<String, String> transFactory() {
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
mockProducer.initTransactions();
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
}
注意在第二种情况下,λ 是一个BiFunction<布尔,字符串>其中第一个参数为真,如果调用者需要交易生产者;可选的第二个参数包含事务ID。
这可以是默认值(如构造函数所提供),也可以被KafkaTransactionManager(或卡夫卡模板用于本地事务),如果配置如此。
交易ID会提供,以防你想使用其他模拟制片人基于这个数值。
如果你在多线程环境中使用 producers,双功能应该返回多个生产者(也许用线程绑定,使用ThreadLocal).
事务模拟制片人s 必须通过呼叫初始化交易initTransaction(). |
当使用模拟制片人如果你不想每次发送后关闭生产者,那么可以提供自定义服务模拟制片人覆盖关闭不调用关闭来自超级阶层的方法。
这对于测试很方便,因为可以在同一生产商上验证多个发布内容而不关闭该平台。
这里有一个例子:
@Bean
MockProducer<String, String> mockProducer() {
return new MockProducer<>(false, new StringSerializer(), new StringSerializer()) {
@Override
public void close() {
}
};
}
@Bean
ProducerFactory<String, String> mockProducerFactory(MockProducer<String, String> mockProducer) {
return new MockProducerFactory<>(() -> mockProducer);
}