对于最新稳定版本,请使用 Spring Integration 7.0.0spring-doc.cadn.net.cn

响应式流支持

Spring Integration 在框架的某些部分和不同方面支持响应式流的交互。 我们将在这里讨论大部分内容,并在必要时附上目标章节的相关链接以获取详细信息。spring-doc.cadn.net.cn

前言

总结一下,Spring Integration 扩展了 Spring 编程模型,以支持著名的企业集成模式。 Spring 集成支持基于 Spring 的应用中的轻量级消息传递,并支持通过声明式适配器与外部系统的集成。 Spring Integration的主要目标是提供一个简单的企业集成解决方案模型,同时保持对生成可维护、可测试代码至关重要的关注点分离。 这一目标通过一流公民如消息,渠道端点这使我们能够构建一个集成流程(流水线),在大多数情况下,一个端点将消息发送到一个通道,供另一个端点使用。 通过这种方式,我们可以区分集成交互模型与目标业务逻辑。 关键在于中间通道:流行为依赖于实现,端点保持不变。spring-doc.cadn.net.cn

另一方面,反应流是异步流处理的标准,具有非阻塞性背压。 响应式流的主要目标是规范跨异步边界的流数据交换——比如将元素传递到另一个线程或线程池——同时确保接收端不被迫缓冲任意数量的数据。 换句话说,背压是该模型的不可或缺的一部分,以便允许在线程间的队列被界定。 响应式流实现(如 Project Reactor)的目的是在流应用的整个处理图中保留这些优势和特性。 响应式流库的最终目标是尽可能以现有编程语言结构的透明流畅方式为目标应用提供类型、作符集和支持 API,但最终解决方案并不像普通函数链调用那样迫切。 它分为几个阶段:定义和执行,执行在订阅最终响应式发布商期间进行,数据需求会从定义底部推到顶部,根据需要施加反压——我们请求尽可能多的事件。 响应式应用看起来像是“流”或者用我们习惯的春季集成术语——“流动”. 事实上,自 Java 9 以来,反应流的 SPI 以java.util.concurrent.Flow类。spring-doc.cadn.net.cn

从这里看,当我们在端点上应用一些响应式框架作符时,Spring Integration 流程似乎非常适合编写响应式流应用,但实际上问题范围更广,我们需要记住并非所有端点(例如,JdbcMessageHandler)可以在响应式流中透明处理。 当然,春季集成中响应式流支持的主要目标是让整个过程完全被动、按需启动并随时准备后压。 只有在目标协议和通道适配器系统提供反应流交互模型之前,这才有可能实现。 在以下章节中,我们将介绍 Spring Integration 中为开发响应式应用保留集成流程结构所提供的组件和方法。spring-doc.cadn.net.cn

Spring Integration 中所有通过 Project Reactor 类型实现的反应流交互,例如通量.

消息网关

与反应流最简单的交互点是@MessagingGateway我们只需将网关方法的返回类型设为单核细胞增多症<?>- 而网关方法调用背后的整个集成流程将在订阅发生时执行实例。 看反应器更多信息请见。 类似的-reply 方法在框架内部用于完全基于反应流兼容协议的入站网关(详见下方反应通道适配器)。 发送和接收作被包裹成Mono.deffer()通过将 从回复频道只要有头部就该。 这样,特定响应式协议(例如 Netty)的入站组件将作为响应式流程的订阅者和发起者,执行在 Spring 集成上执行。 如果请求有效载荷是响应式类型,最好通过反应流定义来处理,将进程推迟给发起方订阅。 为此,处理程序方法还必须返回一个反应类型。 更多信息请见下一节。spring-doc.cadn.net.cn

响应式回复有效载荷

当回复产生时消息处理器返回反应型负载,回复消息以异步方式处理,常规消息频道实现方式包括输出通道异步必须设置为true)并且当输出频道为ReactiveStreamsSubscribeableChannel(可订阅频道)实现,例如:流信息频道. 带有标准命令消息频道如果一个回复有效载荷是多值发布者(参见ReactiveAdapter.isMultiValue()更多信息),它被包裹成一个Mono.just(). 因此,必须在下游明确订阅或被流信息频道下游。 其中ReactiveStreamsSubscribeableChannel(可订阅频道)对于输出通道无需担心退税类型和订阅;所有作都由框架内部流畅处理。spring-doc.cadn.net.cn

更多信息请参见 Kotlin 协程spring-doc.cadn.net.cn

流信息频道ReactiveStreamsConsumer

流信息频道是 的组合实现消息频道出版商<信息<?>>. 一个通量作为热源,内部创建用于接收来自发送()实现。 这Publisher.subscribe()实现工作委托给该内部通量. 此外,对于按需上游用电,还有流信息频道提供了ReactiveStreamsSubscribeableChannel(可订阅频道)合同。 任何上游发行人(例如,参见下方的源轮询信道适配器和分配器)在订阅准备好时自动订阅该信道。 这些委派出版商的事件被纳入内部通量上述。spring-doc.cadn.net.cn

消费者流信息频道必须是org.reactivestreams.Subscriber为尊重反应流合同而提出的实例。 幸运的是,所有消息处理器Spring 集成中的实现还实现了核心订阅者来自反应堆项目。 感谢一位ReactiveStreamsConsumer实现过程中,整个集成流程配置对目标开发者保持透明。 在这种情况动行为从命令式推力模型转变为响应式拉力模型。 一个ReactiveStreamsConsumer也可以用来转任何消息频道通过IntegrationReactiveUtils使积分流部分被动。spring-doc.cadn.net.cn

流信息频道更多信息请见。spring-doc.cadn.net.cn

从5.5版本开始,消费者端点规格引入了响应式()将流程中的端点设为ReactiveStreamsConsumer与输入通道无关。 可选功能<?超级Flux<Message<?>>,?扩展出版商<信息<?>>>可以提供以定制源通量从输入通道通过Flux.transform()运算,例如:publishOn(),doOnNext(),重试()等。 该功能表示为@Reactive所有消息注释的子注释(@ServiceActivator,@Splitter等等)通过他们的响应式()属性。spring-doc.cadn.net.cn

源轮询通道适配器

通常,SourcePollingChannelAdapter依赖于由任务调度器. 轮询触发器由提供的选项构建,用于定期调度任务以轮询目标数据源或事件。 当输出通道ReactiveStreamsSubscribeableChannel(可订阅频道)一样触发用于确定下一次执行时间,但不是调度任务,而是SourcePollingChannelAdapter生成一个Flux<Message<?>>基于Flux.generate()对于下一个执行时间值和单声道延迟(Mono.delay)从上一步开始的持续时间内。 一个Flux.flatMapMany()随后用于轮询maxMessagesPerPoll并将其沉入输出中通量. 这个生成器通量订阅者为提供的ReactiveStreamsSubscribeableChannel(可订阅频道)尊重下游的背压。 从5.5版本开始,当maxMessagesPerPoll == 0,源根本不被调用,且flatMapMany()通过 立即完成Mono.empty()结果直到maxMessagesPerPoll在后续时间点,例如通过控制总线,将值变为非零。 这样,任何消息源实现可以转化为反应热源。spring-doc.cadn.net.cn

更多信息请参见民调消费者spring-doc.cadn.net.cn

事件驱动信道适配器

MessageProducerSupport是事件驱动信道适配器的基类,通常其sendMessage(Message<?>)作为监听器回调使用,用于生成驱动程序 API。 这个回调也可以很容易地插入到doOnNext()当消息生产者实现构建通量而非基于监听者的功能。 事实上,当输出通道消息生成者不是ReactiveStreamsSubscribeableChannel(可订阅频道). 然而,为了提升终端用户体验,并允许更多后压响应功能,MessageProducerSupport提供subscribeToPublisher(Publisher<? 扩展消息<?>>)出版商<信息<?>>>是目标系统数据的来源。 通常,它从doStart()当目标驱动API被调用时实现发行人源数据。 建议结合反应性物质MessageProducerSupport流信息频道作为输出通道用于按需订阅和下游活动观看。 当订阅发行人取消了。 叫停止()在这样的通道适配器上,完成了从源头的生产发行人. 通道适配器可以通过自动订阅新创建的源来重启发行人.spring-doc.cadn.net.cn

消息源到反应流

从5.3版本开始,aReactiveMessageSourceProducer提供。 它是提供的组合消息源以及事件驱动生产,并整合到配置中输出通道. 内部包裹消息源进入反复重新订阅的人群制作Flux<Message<?>>订阅于subscribeToPublisher(Publisher<? 扩展消息<?>>)上述。 订阅是通过Schedulers.boundedElastic()以避免可能的阻挡目标消息源. 消息源返回时(无数据可提取),该被转化为重复HenEmpty()带有延迟基于IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY 期间从订阅者上下文进入。 默认是1秒。 如果消息源生成的消息是IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK头部中的信息,如有必要,会在doOnSuccess()原版并且在doOnError()如果下游流抛出消息异常带着失败的信息拒绝。 这ReactiveMessageSourceProducer它可以用于任何需要将轮询通道适配器功能转化为响应式、按需解决方案的场景,适用于任何现有用户MessageSource<?>实现。spring-doc.cadn.net.cn

分路器和聚合器

摘要消息分流器得到发行人对于其逻辑,过程自然地覆盖发行人将这些数据映射为发送给输出通道. 如果该信道是ReactiveStreamsSubscribeableChannel(可订阅频道)通量封装器用于发行人是按需订阅的,这种分路器的行为更像是平面地图反应器算符,当我们将输入事件映射为多值输出时发行人. 当整个集成流程都用流信息频道分配器之前和之后,将Spring Integration配置与反应流需求及其事件处理作符对齐。 使用普通通道时,一个发行人转换为可迭代用于标准迭代生成的拆分逻辑。spring-doc.cadn.net.cn

一个FluxAggregatorMessageHandler是另一种特定反应流逻辑实现的示例,可以视为“响应式算子”就反应堆项目而言。 它基于Flux.groupBy()Flux.window()(或缓冲区()) 操作员。 收到的消息会被沉入Flux.create()FluxAggregatorMessageHandler被创造出来,使其成为一个热源。 这通量由 订阅ReactiveStreamsSubscribeableChannel(可订阅频道)按需,或直接在FluxAggregatorMessageHandler.start()输出通道不是被动反应的。 这消息处理器当整个积分流由 a 构建时,具有其能力流信息频道在此组件之前和之后,使整个逻辑处于后压状态。spring-doc.cadn.net.cn

更多信息请参见“溪流与通量分裂”和“通量聚合器”。spring-doc.cadn.net.cn

Java DSL

集成流程在 Java 中,DSL 可以从任意发行人实例(参见IntegrationFlow.from(Publisher<Message<T>>)). 此外,还有一个IntegrationFlowBuilder.toReactivePublisher()算符,集成流程可以被转化为反应性热源。 一个流信息频道在这两种情况下都用于内部;它可以订阅入站发行人根据其ReactiveStreamsSubscribeableChannel(可订阅频道)合同,这是一份出版商<信息<?>>对于下游用户来说,单独使用。 具有动态性集成流程注册我们可以实现一个强大的逻辑,将反应流与这种集成流结合起来,连接/连接发行人.spring-doc.cadn.net.cn

从版本5.5.6开始,atoReactivePublisher(布尔 autoStartOnSubscribe)存在作变量以控制整体生命周期集成流程归来者之后出版商<信息<?>>. 通常,响应式发布者的订阅和消费发生在后期运行阶段,而非响应式流组合期间,甚至不在响应式流组合期间应用上下文启动。 为了避免用于生命周期管理的样板代码集成流程出版商<信息<?>>订阅点,为了更好的终端用户体验,这家新运营商配备了自动开始订阅旗帜已经引入。 它标记(如果true集成流程及其分量autoStartup = false,所以一个应用上下文在流程中不会自动启动消息的生产和使用。 相反,开始()对于集成流程是从内部启动的Flux.doOnSubscribe(). 独立于自动开始订阅值,流动被从一个Flux.doOn取消()Flux.doOnTerminate()- 如果没有任何信息可供消费,生成消息就没有意义。spring-doc.cadn.net.cn

在完全相反的使用场景下,当集成流程应调用响应式流并在完成后继续,a通量变换()算符在集成流程定义. 此时流动被转化为流信息频道该 被传播为通量函数, 在Flux.transform()算子。 函数的结果被包裹为单声<消息<?>>用于将平坦映射为输出通量该账户由另一家订阅流信息频道用于下游流。spring-doc.cadn.net.cn

更多信息请参见 Java DSL 章节spring-doc.cadn.net.cn

响应式消息处理器

从5.3版本开始,响应式消息处理器框架中原生支持。 这种类型的消息处理器专为响应式客户端设计,它们会返回响应式类型以按需订阅进行低级作执行,且不提供任何回复数据以继续响应式流组合。 当响应式消息处理器在命令式积分流程中被使用,handleMessage()回流后立即被认购,仅仅因为此类流中没有反应性流组成以抵消背压。 在这种情况下,框架会对此进行封装响应式消息处理器变成了响应式消息处理适配器- 一个简单的实现消息处理器. 然而,当ReactiveStreamsConsumer参与了流量(例如当消耗通道为流信息频道),这样的响应式消息处理器由整个反应流组成,其中flatMap()反应堆操作员在消耗时应遵守背压。spring-doc.cadn.net.cn

其中一个开箱易用的响应式消息处理器实现是ReactiveMongoDbStoringMessageHandler用于出站通道适配器。 更多信息请参见 MongoDB 反应性通道适配器spring-doc.cadn.net.cn

从6.1版本开始,集成流程定义暴露了方便的handleReactive(ReactiveMessageHandler)航站操作员。 任何响应式消息处理器实现(即使只是用API)可用于该运算符。 框架认同返回的单<虚空>自然而然。 以下是该算符可能配置的一个简单示例:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

该算符的超载版本接受Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>围绕所提供的 定制消费者端点响应式消息处理器.spring-doc.cadn.net.cn

此外,还有一个ReactiveMessageHandlerSpec还提供了基于 的变体。 在大多数情况下,它们用于协议特定的通道适配器实现。 请参见下一节,链接指向目标技术及其相应的反应性通道适配器。spring-doc.cadn.net.cn

反应性通道适配器

当目标集成协议提供响应式流解决方案时,在 Spring Integration 中实现通道适配器就变得简单。spring-doc.cadn.net.cn

入站事件驱动的通道适配器实现是将请求(如有必要)包裹进延迟通量并且只有在协议组件发起订阅时才执行发送(并产生回复,如有),并且只有在从听众法中回归。 这样我们就有一个反应性流溶液被精确封装在这个组分中。 当然,下游集成流应遵守反应流规范,按需、背压准备方式进行。spring-doc.cadn.net.cn

这并非总能通过 的特性(或当前实现)提供消息处理器处理器用于集成流程。这一限制可以通过线程池和队列来解决,或者流信息频道(见上文)在没有响应式实现的情况下,集成端点的前后。spring-doc.cadn.net.cn

一个响应式事件驱动的入站通道适配器示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

使用方式大致如下:spring-doc.cadn.net.cn

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者用声明语的方式:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使没有通道适配器,我们也可以以以下方式使用 Java DSL:spring-doc.cadn.net.cn

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

响应式出站通道适配器实现是指根据目标协议提供的响应式API,发起(或延续)响应式流与外部系统交互。入站有效载荷可以是响应式类型,也可以作为整个集成流的事件,作为响应式流顶部的一部分。如果处于单向、发射后遗忘的情景,则可立即订阅回传的响应式类型,或者在下游(请求-回复情景)传播以进一步集成流或目标业务逻辑中显式订阅,但仍保留响应式流语义。spring-doc.cadn.net.cn

一个响应式出站通道适配器的示例:spring-doc.cadn.net.cn

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我们将能够同时使用两个通道适配器:spring-doc.cadn.net.cn

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration为WebFluxRSocketMongoDbR2DBCZeroMQ、GraphQLApache Cassandra提供通道适配器(或网关)实现。Redis Stream Channel 适配器也是响应式的,并使用ReactiveStream作来自 Spring Data。更多响应式通道适配器正在推出,例如基于ReactiveKafkaProducer模板ReactiveKafkaConsumerTemplate来自Spring的Apache Kafka等。对于许多其他非响应式通道适配器,建议使用线程池以避免响应式流处理中的阻塞。spring-doc.cadn.net.cn

对命令式上下文传播的响应式

上下文传播库位于类路径上时,项目反应器可以ThreadLocal数值(例如,微米观测安全上下文持有者)并将其存储在订户上下文。 相反的作也可行,比如需要填充日志MDC进行追踪,或让我们从反应流调用的服务恢复作用域中的观察值。请参见Project Reactor文档中关于其上下文传播特殊作符的更多信息。如果整个解决方案是单一响应式流组合,存储和恢复上下文就能顺畅运行,因为订户上下文从下游一直可见到合成的起始(通量). 但是,如果应用在不同应用间切换通量实例或命令式处理,然后返回,然后上下文与订户可能无法使用。针对此类用例,Spring Integration提供了额外功能(从版本开始)6.0.5)用于存储反应堆ContextView进入IntegrationMessageHeaderAccessor.REACTOR_CONTEXT消息头部由反应流生成,例如当我们执行直接处理时发送()操作。 该头部随后用于FluxMessageChannel.subscribeTo()以恢复反应堆上下文消息该信道将要发射的。目前,该头部是从WebFluxInboundEndpointRSocketInboundGateway但可用于任何执行响应式对命令式积分的解决方案中。填充该头部的逻辑如下:spring-doc.cadn.net.cn

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

注意,我们还需要使用handle()操作员以恢复反应堆ThreadLocal上下文中的值。即使以报头形式发送,框架也无法假设它将被恢复到ThreadLocal下游的价值。spring-doc.cadn.net.cn

从一个消息另一方面通量该逻辑可执行:spring-doc.cadn.net.cn

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));