此版本仍在开发中,尚未被视为稳定版本。如需最新稳定版本,请使用 Spring Boot 4.0.4!spring-doc.cadn.net.cn

Apache Pulsar 支持

Apache Pulsar 通过提供 Spring for Apache Pulsar 项目的自动配置来支持。spring-doc.cadn.net.cn

org.springframework.pulsar:spring-pulsar 在类路径上时,Spring Boot 将自动配置并注册经典的(命令式)Spring for Apache Pulsar 组件。 当 org.springframework.pulsar:spring-pulsar-reactive 在类路径上时,它也会对响应式组件执行相同的操作。spring-doc.cadn.net.cn

分别有 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive Starters,用于方便地收集命令式和响应式使用所需的依赖项。spring-doc.cadn.net.cn

连接到 Pulsar

当您使用 Pulsar starter 时,Spring Boot 将自动配置并注册一个 PulsarClient bean。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于 pulsar://localhost:6650 的本地 Pulsar 实例。 可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整此配置。spring-doc.cadn.net.cn

该值必须是一个有效的 Pulsar 协议 URL

您可以通过指定任何以 spring.pulsar.client.* 为前缀的应用程序属性来配置客户端。spring-doc.cadn.net.cn

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarClientBuilderCustomizer bean。spring-doc.cadn.net.cn

身份验证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置 pluginClassName 以及插件所需的任何参数来指定要使用的身份验证插件。 您可以将参数设置为从参数名称到参数值的映射。 以下示例展示了如何配置 AuthenticationOAuth2 插件。spring-doc.cadn.net.cn

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

你需要确保在 spring.pulsar.client.authentication.param.* 下定义的名称与你的身份验证插件所期望的名称完全匹配(通常采用驼峰命名法)。 Spring Boot 不会对这些条目尝试任何形式的宽松绑定。spring-doc.cadn.net.cn

例如,如果要为 AuthenticationOAuth2 身份验证插件配置颁发者 URL,则必须使用 spring.pulsar.client.authentication.param.issuerUrl。 如果使用其他形式,如 issuerurlissuer-url,则该设置不会应用于该插件。spring-doc.cadn.net.cn

这种缺乏宽松绑定的情况也使得使用环境变量进行身份验证参数变得有问题,因为在转换过程中会丢失大小写敏感性。 如果使用环境变量来设置参数,那么您需要按照 这些步骤 在 Spring for Apache Pulsar 参考文档中进行操作,以便正常工作。spring-doc.cadn.net.cn

SSL

默认情况下,Pulsar 客户端以明文形式与 Pulsar 服务通信。 您可以按照 这些步骤 在 Spring for Apache Pulsar 参考文档中启用 TLS 加密。spring-doc.cadn.net.cn

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档spring-doc.cadn.net.cn

以响应式方式连接到 Pulsar

当 Reactive 自动配置被激活时,Spring Boot 将自动配置并注册一个 ReactivePulsarClient Bean。spring-doc.cadn.net.cn

ReactivePulsarClient 适配了前述 PulsarClient 的实例。 因此,请参考前面的部分来配置 ReactivePulsarClient 所使用的 PulsarClientspring-doc.cadn.net.cn

连接到 Pulsar 管理端

Spring for Apache Pulsar 的 PulsarAdministration 客户端也会自动配置。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于 http://localhost:8080 的本地 Pulsar 实例。 可以通过将 spring.pulsar.admin.service-url 属性设置为 (http|https)://<host>:<port> 形式的不同值来调整此配置。spring-doc.cadn.net.cn

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarAdminBuilderCustomizer bean。spring-doc.cadn.net.cn

身份验证

当访问需要身份验证的 Pulsar 集群时,管理员客户端需要与普通 Pulsar 客户端相同的安全配置。 您可以使用上述 身份验证配置,将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authenticationspring-doc.cadn.net.cn

要在启动时创建主题,请添加一个类型为 PulsarTopic 的 bean。 如果该主题已存在,则会忽略此 bean。

发送消息

Spring 的 PulsarTemplate 是自动配置的,您可以使用它来发送消息,如下例所示:spring-doc.cadn.net.cn

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar 生产者。 Spring Boot 自动配置也提供了该生产者工厂,默认情况下,它会缓存所创建的生产者。 您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置生产者工厂和缓存设置。spring-doc.cadn.net.cn

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 ProducerBuilderCustomizer bean。 这些定制器将应用于所有创建的生产者。 您还可以在发送消息时传入 ProducerBuilderCustomizer,以仅影响当前生产者。spring-doc.cadn.net.cn

如果您需要对发送的消息进行更多控制,可以在发送消息时传入一个 TypedMessageBuilderCustomizerspring-doc.cadn.net.cn

以响应式方式发送消息

当 Reactive 自动配置被激活时,Spring 的 ReactivePulsarTemplate 会被自动配置,您可以使用它来发送消息,如下例所示:spring-doc.cadn.net.cn

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的发送器。 Spring Boot 自动配置也提供了这个发送器工厂,默认情况下,它会缓存其创建的生产者。 您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置发送器工厂和缓存设置。spring-doc.cadn.net.cn

如果您需要对发送者工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer bean。 这些定制器将应用于所有创建的发送者。 您还可以在发送消息时传入 ReactiveMessageSenderBuilderCustomizer,以仅影响当前发送者。spring-doc.cadn.net.cn

如果您需要对发送的消息进行更多控制,可以在发送消息时传入一个 MessageSpecBuilderCustomizerspring-doc.cadn.net.cn

接收消息

当存在 Apache Pulsar 基础设施时,可以使用 @PulsarListener 注解任何 bean 以创建监听器端点。 以下组件在 someTopic 主题上创建了一个监听器端点:spring-doc.cadn.net.cn

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 以及它用于构建底层 Pulsar 消费者的消费者工厂。 您可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。spring-doc.cadn.net.cn

如果您需要对消费者工厂的配置有更多控制,请考虑注册一个或多个 ConsumerBuilderCustomizer beans。 这些自定义器会应用于工厂创建的所有消费者,因此所有 @PulsarListener 实例都会受到影响。 您还可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。spring-doc.cadn.net.cn

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean。spring-doc.cadn.net.cn

以响应式方式接收消息

当存在 Apache Pulsar 基础设施且启用了响应式自动配置时,可以使用 @ReactivePulsarListener 注解任何 Bean 以创建响应式监听器端点。 以下组件在 someTopic 主题上创建了一个响应式监听器端点:spring-doc.cadn.net.cn

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了 ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 以及它用于构建底层响应式 Pulsar 消费者的消费者工厂。 你可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。spring-doc.cadn.net.cn

如果您需要对消费者工厂的配置有更多控制,请考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer beans。 这些自定义器会应用于工厂创建的所有消费者,因此所有 @ReactivePulsarListener 实例都会受到影响。 您还可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来定制单个监听器。spring-doc.cadn.net.cn

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> bean。spring-doc.cadn.net.cn

读取消息

Pulsar 读取器接口使应用程序能够手动管理游标。 当您使用读取器连接到主题时,需要指定读取器在连接到主题时开始读取的消息。spring-doc.cadn.net.cn

当存在 Apache Pulsar 基础设施时,可以使用 @PulsarReader 注解任何 bean,以便使用读取器消费消息。 以下组件创建了一个读取器端点,该端点从 someTopic 主题的开头开始读取消息:spring-doc.cadn.net.cn

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar 读取器。 Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来自定义它。spring-doc.cadn.net.cn

如果您需要对读者工厂的配置有更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer beans。 这些自定义程序将应用于工厂创建的所有读者,因此所有 @PulsarReader 实例。 您还可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来自定义单个监听器。spring-doc.cadn.net.cn

如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> bean。spring-doc.cadn.net.cn

以响应式方式读取消息

当存在 Apache Pulsar 基础架构并激活了响应式自动配置时,Spring 的 ReactivePulsarReaderFactory 将被提供,您可以使用它来创建读取器,以便以响应式方式读取消息。 以下组件使用提供的工厂创建读取器,并从 someTopic 主题中读取 5 分钟前的一条消息:spring-doc.cadn.net.cn

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来自定义它。spring-doc.cadn.net.cn

如果您需要对读取器工厂配置进行更多控制,请考虑在使用工厂创建读取器时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。spring-doc.cadn.net.cn

如果您需要对读取器工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer Bean。 这些定制器将应用于所有创建的读取器。 您还可以在创建读取器时传递一个或多个 ReactiveMessageReaderBuilderCustomizer,以仅将定制应用于所创建的读取器。spring-doc.cadn.net.cn

有关上述任何组件的更多详细信息以及要了解其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。spring-doc.cadn.net.cn

使用响应式变体时,目前不支持事务。

spring.pulsar.transaction.enabled 属性设置为 true 将会:spring-doc.cadn.net.cn

transactional 属性的 @PulsarListener 可用于精确调整在何时应与监听器一起使用事务。spring-doc.cadn.net.cn

如需更灵活地控制Spring for Apache Pulsar事务功能,您应定义自己的 PulsarTemplate 和/或 ConcurrentPulsarListenerContainerFactory beans。 您也可以定义一个 PulsarAwareTransactionManager bean,如果默认自动配置的 PulsarTransactionManager 不适合的话。spring-doc.cadn.net.cn

其他 Pulsar 属性

自动配置支持的属性在附录的 集成属性 部分中有说明。 请注意,这些属性(带连字符或驼峰式)通常直接映射到Apache Pulsar的配置属性。 有关详细信息,请参阅Apache Pulsar文档。spring-doc.cadn.net.cn

只有 Pulsar 支持的部分属性可以通过 PulsarProperties 类直接访问。 如果您希望使用其他不被直接支持的附加属性来调整自动配置的组件,您可以使用每个上述组件所支持的自定义器。spring-doc.cadn.net.cn