该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

Apache Kafka Streams 支持

从1.1.4版本开始,Apache Kafka的Spring为Kafka流提供了一流的支持。 要从Spring应用中使用它,以下卡夫卡流JAR必须出现在ClassPath上。 它是 Spring for Apache Kafka 项目的一个可选依赖,且不会传递下载。spring-doc.cadn.net.cn

基本

参考文献中的Apache Kafka Streams建议使用该API的方法如下:spring-doc.cadn.net.cn

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

所以,我们有两个主要组成部分:spring-doc.cadn.net.cn

KStream暴露于卡夫卡流由单一实例StreamsBuilder即使逻辑不同,也要同时启动和停止。 换句话说,所有由StreamsBuilder与单一生命周期控制相关联。 曾经卡夫卡流实例已被关闭streams.close(),无法重启。 相反,是新的卡夫卡流必须创建实例以重启流处理。

春季管理

为了简化从 Spring 应用上下文角度使用 Kafka Streams,并通过容器实现生命周期管理,Spring for Apache Kafka 引入了StreamsBuilderFactoryBean. 这是一颗摘要FactoryBean实现以暴露StreamsBuilder作为豆子的单例实例。 以下示例构成了这样一个豆子:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从2.2版本开始,流配置现作为KafkaStreamsConfiguration宾格而非StreamsConfig.

StreamsBuilderFactoryBean其他实现SmartLifecycle管理内部的生命周期卡夫卡流实例。 类似于 Kafka Streams API,你必须定义KStream在你开始之前的实例卡夫卡流. 这同样适用于Kafka Streams的Spring API。 因此,当你使用默认值时autoStartup = trueStreamsBuilderFactoryBean你必须声明KStream实例StreamsBuilder在应用上下文刷新之前。 例如KStream可以是普通的 bean 定义,而 Kafka Streams API 则使用且不会产生任何影响。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果你想手动控制生命周期(例如通过某种条件停止和启动),你可以引用StreamsBuilderFactoryBean通过使用工厂豆()前缀直接进行豆子。 因为&StreamsBuilderFactoryBean利用其内部卡夫卡流例如,停止并重新开始是安全的。 一个新的卡夫卡流在每个开始(). 你也可以考虑用不同的方法StreamsBuilderFactoryBean实例,如果你想控制 的生命周期KStream单独实例。spring-doc.cadn.net.cn

你也可以指定KafkaStreams.StateListener,Thread.UncaughtExceptionHandlerStateRestoreListener关于StreamsBuilderFactoryBean,这些都委托给了内部卡夫卡流实例。spring-doc.cadn.net.cn

另外,除非间接设置这些选项StreamsBuilderFactoryBean,你可以用KafkaStreamsCustomizer回调接口至:spring-doc.cadn.net.cn

  1. (从版本2.1.5开始)配置一个内部卡夫卡流实例使用customize(KafkaStreams)spring-doc.cadn.net.cn

  2. (从版本3.3.0)实例化一个自定义实现卡夫卡流initKafkaStreams(Topology, Properties, KafkaClientSupplier)spring-doc.cadn.net.cn

注意KafkaStreamsCustomizer覆盖StreamsBuilderFactoryBean.spring-doc.cadn.net.cn

如果你需要执行一些卡夫卡流作,你可以直接访问内部卡夫卡流实例通过使用StreamsBuilderFactoryBean.getKafkaStreams().spring-doc.cadn.net.cn

你可以自动接线StreamsBuilderFactoryBean按类型分类,但你应确保在豆子定义中使用完整类型,如下示例所示:spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,你也可以添加@Qualifier如果你用接口豆定义,那就是按名称注入。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从版本 2.4.1 开始,工厂豆子有了新的属性基础设施定制器类型为KafkaStreamsInfrastructureCustomizer;这使得对StreamsBuilder(例如添加州商店)和/或拓扑学在溪流形成之前。spring-doc.cadn.net.cn

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

默认提供无作实现,以避免在不需要时同时实现两种方法。spring-doc.cadn.net.cn

一个CompositeKafkaStreamsInfrastructureCustomizer提供给你需要应用多个自定义器时使用。spring-doc.cadn.net.cn

KafkaStreams 千米表支持

在2.5.3版本中引入,你可以配置KafkaStreamsMicrometerListener自动注册用于卡夫卡流由工厂豆管理的对象:spring-doc.cadn.net.cn

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON 序列化与反序列化

在读取或写入主题或状态存储时以 JSON 格式进行序列化和反序列化数据时,Spring for Apache Kafka 提供了JsonSerde该实现使用 JSON,委派给JsonSerializerJsonDeserializer详见串行化、反序列化和消息转换。 这JsonSerde实现通过其构造体(目标类型或对象映射器). 在下面的例子中,我们使用JsonSerde串行化和反串行化Kafka流的有效载荷(该流JsonSerde在需要实例时也可以以类似方式使用):spring-doc.cadn.net.cn

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

自2.3版本起,在为生产者/消费工厂编程构建串行化器/反串行器时,可以使用流流API简化配置。spring-doc.cadn.net.cn

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher

KafkaStreamBrancher类引入了一种更便捷的方式,可以在 之上构建条件分支KStream.spring-doc.cadn.net.cn

请考虑以下不使用KafkaStreamBrancher:spring-doc.cadn.net.cn

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用KafkaStreamBrancher:spring-doc.cadn.net.cn

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置Kafka Streams环境,StreamsBuilderFactoryBean需要一个KafkaStreamsConfiguration实例。 请参阅Apache Kafka文档了解所有可能的选项。spring-doc.cadn.net.cn

从2.2版本开始,流配置现作为KafkaStreamsConfiguration对象,而非作为StreamsConfig.

为了避免大多数情况下的模板代码,尤其是在开发微服务时,Apache Kafka 的 Spring 提供了@EnableKafkaStreams注释,你应该在@Configuration类。 你只需要声明一个KafkaStreamsConfiguration豆子命名defaultKafkaStreamsConfig. 一个StreamsBuilderFactoryBean比恩,名字defaultKafkaStreamsBuilder在应用上下文中自动声明。 你可以申报并使用任何额外的StreamsBuilderFactoryBean豆子也是。 你还可以对这颗豆子进行额外定制,比如提供一个实现以下功能的豆子StreamsBuilderFactoryBeanConfigurer. 如果有多个这样的豆子,则会根据它们的特性进行施用有序。有序财产。spring-doc.cadn.net.cn

清理与停止配置

当工厂停止运营时,KafkaStreams.close()用两个参数称为:spring-doc.cadn.net.cn

  • 关闭超时:等待线程关闭多久(默认为DEFAULT_CLOSE_TIMEOUT设置为10秒)。可以配置为StreamsBuilderFactoryBean.setCloseTimeout().spring-doc.cadn.net.cn

  • leaveGroupOnClose : 以触发来自该组的消费者离开呼叫(默认为false).可以配置为StreamsBuilderFactoryBean.setLeaveGroupOnClose().spring-doc.cadn.net.cn

默认情况下,当工厂豆子停止时,KafkaStreams.cleanUp()称为 方法。 从2.1.2版本开始,工厂豆子增加了额外的构造子,取一个CleanupConfig具有属性的对象,可以让你控制清理()方法称为开始()停止()或者两者都不是。 从2.7版本开始,默认状态是永远不清理本地状态。spring-doc.cadn.net.cn

集中器

3.0版本增加了头部Enricher处理器的扩展上下文处理器;提供与已弃用版本相同的功能HeaderEnricher实现了被弃用的转换器接口。 这可以用来在流处理中添加头部;头部值为SpEL表达式;表达式评估的根对象具有三个性质:spring-doc.cadn.net.cn

这些表达式必须返回一个字节[]或者字符串(将转换为字节[]UTF-8).spring-doc.cadn.net.cn

在溪流中使用增浓剂:spring-doc.cadn.net.cn

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不会更改钥匙;它只是添加了页眉。spring-doc.cadn.net.cn

你需要为每条记录创建一个新的实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

这里有一个简单的例子,添加了一个字面标题和一个变量:spring-doc.cadn.net.cn

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

3.0版本增加了MessagingProcessor的扩展上下文处理器,提供与被弃用的相同功能MessagingTransformer实现了被弃用的转换器接口。 这使得 Kafka Streams 拓扑能够与 Spring Messaging 组件(如 Spring 集成流程)交互。 转换器需要实现消息功能.spring-doc.cadn.net.cn

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring 集成会自动提供使用其实现的方式。GatewayProxyFactoryBean. 它还需要消息信息转换器将键、值和元数据(包括头部)转换为/从Spring Messageaging中转换留言<?>. 看[从一个KStream] 了解更多信息。spring-doc.cadn.net.cn

从反序列化异常中恢复

2.3 版本引入了恢复反序列化异常处理程序当发生反序列化异常时,它可以采取一些作。 请参阅卡夫卡关于的文献DeserializationExceptionHandler,其中恢复反序列化异常处理程序是一个实现。 这恢复反序列化异常处理程序配置为消费者记录恢复器实现。 该框架提供了死信出版恢复者这会把失败的记录送到一个死信话题。 有关该回收者的更多信息,请参见“发布死信记录”。spring-doc.cadn.net.cn

要配置恢复器,请在流配置中添加以下属性:spring-doc.cadn.net.cn

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,那个恢复器()BEAN 可以是你自己的实现消费者记录恢复器.spring-doc.cadn.net.cn

交互式查询支持

从3.2版本开始,Apache Kafka的Spring提供了Kafka Streams交互式查询所需的基本功能。 交互式查询在有状态 Kafka Streams 应用中非常有用,因为它们提供了一种持续查询应用中有状态存储的方式。 因此,如果应用程序想要实现当前系统视图,交互式查询提供了实现这一目标的方法。 想了解更多关于交互式查询的信息,请参阅本文。 Spring 对 Apache Kafka 的支持主要围绕一个名为KafkaStreamsInteractiveQueryService它是围绕 Kafka Streams 库中交互式查询 API 的表象。 应用程序可以创建一个该服务的实例作为豆子,然后以后用它来按状态存储的名称检索。spring-doc.cadn.net.cn

以下代码片段展示了一个示例。spring-doc.cadn.net.cn

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设 Kafka Streams 应用程序有一个状态存储,称为应用商店那么该存储可以通过KafkStreamsInteractiveQueryAPI 如下所示。spring-doc.cadn.net.cn

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用程序获得了状态存储的访问权限,就可以从中查询键值信息。spring-doc.cadn.net.cn

在这种情况下,应用程序使用的状态存储是一个只读键值存储。 Kafka Streams 应用程序可以使用其他类型的状态存储。 例如,如果应用程序偏好查询基于窗口的存储,它可以在 Kafka Streams 应用的业务逻辑中构建该存储,然后再检索该存储。 因此,获取可查询存储的 APIKafkaStreamsInteractiveQueryService具有通用的存储类型签名,以便终端用户可以分配合适的类型。spring-doc.cadn.net.cn

这是API的类型签名。spring-doc.cadn.net.cn

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

调用该方法时,用户可以具体请求正确的状态存储类型,正如我们在上面示例中所做的。spring-doc.cadn.net.cn

重试状态存储检索

当尝试用KafkaStreamsInteractiveQueryService但由于各种原因,州商店可能找不到。 如果这些理由是暂时的,KafkaStreamsInteractiveQueryService提供一个选项,通过注入自定义文件,重新尝试状态存储的检索重试模板. 默认情况下,重试模板该 用于KafkaStreamsInteractiveQueryService最多尝试三次,固定回放一秒。spring-doc.cadn.net.cn

以下是你如何注入自定义的重试模板KafkaStreamsInteractiveQueryService最多尝试次数为十次。spring-doc.cadn.net.cn

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面显示的用于获取状态存储的 API -retrieveQueryableStore旨在本地可用的键值状态存储。 在制作环境中,Kafka Streams 应用通常根据分区数量分布。 如果一个主题有四个分区,并且有四个同一个 Kafka Streams 处理器实例在运行,那么每个实例可能负责处理该主题中的单个分区。 在这种情况下,打电话retrieveQueryableStore可能无法给出实例所期望的正确结果,尽管它可能会返回有效的存储。 假设有四个分区的主题包含多个键的数据,且单个分区总是负责特定的键。 如果调用的实例retrieveQueryableStore如果需要关于该实例不托管的密钥的信息,则不会接收任何数据。 这是因为当前的 Kafka Streams 实例对该密钥一无所知。 为此,调用实例首先需要确保他们拥有该密钥所托管的Kafka Streams处理器实例的主机信息。 这些数据可以从同一 Kafka Streams 下的任何实例中检索到application.id如下所示。spring-doc.cadn.net.cn

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上述示例代码中,调用实例查询的是某个特定的键12345来自名为应用商店. API 还需要对应的密钥串行器,在这里是整数序列化器. Kafka Streams 会在同一个范畴下查看所有实例application.id并尝试查找托管该特定密钥的实例, 一旦找到,它会返回该主机信息,作为主机信息对象。spring-doc.cadn.net.cn

API的界面如下:spring-doc.cadn.net.cn

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

当使用多个Kafka Streams处理器实例时application.id通过这种分布式方式,应用应提供一个RPC层,在该层中可以通过RPC端点(如REST)查询状态存储。 更多详情请参见本文。 使用 Apache Kafka 的 Spring 时,通过 Spring-web 技术添加基于 Spring 的 REST 端点非常容易。 一旦有了 REST 端点,就可以用它查询任何 Kafka Streams 实例的状态存储,前提是主机信息密钥托管的位置是实例已知的。spring-doc.cadn.net.cn

如果托管实例的密钥是当前实例,那么应用程序无需调用RPC机制,而是在JVM内调用。 然而,问题在于应用程序可能不知道发起调用的实例是密钥托管的所在,因为特定服务器可能因消费者重新平衡而丢失分区。 为了解决这个问题,KafkaStreamsInteractiveQueryService通过API方法查询当前主机信息提供了便捷的API。getCurrentKafkaStreamsApplicationHostInfo()返回电流主机信息. 其理念是应用程序首先可以获取密钥存放位置的信息,然后对主机信息还有关于当前实例的那个。 如果主机信息数据匹配后,可以通过retrieveQueryableStore否则就用RPC选项吧。spring-doc.cadn.net.cn

卡夫卡流示例

以下示例结合了本章涵盖的多个主题:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}