使用 Spring 进行 Apache Pulsar

目录

1. 序言

我们建议对于基于Apache Pulsar的应用使用Spring-Boot-First方法,因为这大大简化了流程。 为此,你可以添加Spring-脉冲-Spring-靴-Starters模块作为依赖。
本书大部分内容都期望读者使用起始机,并以此为前提给出了大部分配置指导。 不过,当说明针对Spring BootStarters使用时,会特别说明。

2. 快速巡演

我们将快速介绍Apache Pulsar的Spring Boot应用示例,它能生成并消耗数据。 这是一个完整的应用程序,只要你在默认位置运行一个Pulsar集群,就不需要额外配置——本地主持:6650.spring-doc.cadn.net.cn

2.1. 依赖关系

Spring Boot 应用程序只需Spring-脉冲-Spring-靴-StartersDependency。以下列表展示了如何分别定义Maven和Gradle的依赖关系:spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-pulsar</artifactId>
        <version>3.2.12</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-pulsar:3.2.12'
}

使用版本 0.2.x上述坐标变化如下:spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-boot-starter</artifactId>
        <version>0.2.0</version>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0'
}

2.2. 应用代码

以下列表展示了该示例的 Spring Boot 应用案例:spring-doc.cadn.net.cn

@SpringBootApplication
public class PulsarBootHelloWorld {

    public static void main(String[] args) {
        SpringApplication.run(PulsarBootHelloWorld.class, args);
    }

    @Bean
    ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
        return (args) -> pulsarTemplate.send("hello-pulsar-topic", "Hello Pulsar World!");
    }

    @PulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
    void listen(String message) {
        System.out.println("Message Received: " + message);
    }
}

让我们快速介绍该应用的更高级细节。 在文档后面,我们会更详细地介绍这些组件。spring-doc.cadn.net.cn

在之前的示例中,我们高度依赖 Spring Boot 的自动配置。 Spring Boot 会自动配置我们的应用中的多个组件。 它会自动提供脉冲星客户端,生产者和消费者都使用,用于应用。spring-doc.cadn.net.cn

Spring Boot还能自动配置脉冲星模板我们会在应用程序中注入,并开始向Pulsar主题发送记录。 应用程序向名为你好,脉冲星. 请注意,应用程序并未指定任何模式信息,因为 Spring for Apache Pulsar 库会自动根据你发送的数据类型推断模式类型。spring-doc.cadn.net.cn

我们使用脉冲星听者从中取用的注释你好,脉冲星我们发布数据的主题。脉冲星听者是一个方便注释,将消息监听器容器基础设施包裹在 Apache Pulsar 的 Spring 中。 在幕后,它创建了一个消息监听器容器,用于创建和管理 Pulsar 消费者。 与普通Pulsar用户一样,使用时默认订阅类型脉冲星听者独家模式。 随着记录被发布到......你好,脉冲星主题,脉冲监听器消耗它们并在控制台上打印。 该框架还从数据类型推断所使用的模式类型脉冲星听者方法作为有效载荷的使用——字符串,在这种情况下。spring-doc.cadn.net.cn

3. 脉冲星客户端

当你使用脉冲星Spring靴Starters时,你会得到脉冲星客户端自动配置。spring-doc.cadn.net.cn

默认情况下,应用程序尝试连接到本地的 Pulsar 实例pulsar://localhost:6650. 这可以通过设置Spring.pulsar.client.service-url财产价值变为不同。spring-doc.cadn.net.cn

该值必须是有效的脉冲星协议URL。

你可以通过指定任意Spring.Pulsar.client.*应用属性。spring-doc.cadn.net.cn

如果你没有使用Starters,你需要配置并注册脉冲星客户端你自己。 有一个DefaultPulsarClientFactory那个可以帮忙的建造者定制工具。

3.1. TLS加密(SSL)

默认情况下,Pulsar客户端以明文与Pulsar服务通信。 以下部分将介绍如何配置Pulsar客户端以使用TLS加密(SSL)。 前提是经纪人也配置了使用 TLS 加密。spring-doc.cadn.net.cn

Spring Boot 自动配置目前不支持任何 TLS/SSL 配置属性。 你可以提供PulsarClientBuilderCustomizer该系统在 Pulsar 客户端构建器上设置必要的属性。 Pulsar 支持隐私增强邮件(PEM)和 Java KeyStore(JKS)证书格式。spring-doc.cadn.net.cn

按照以下步骤配置TLS:spring-doc.cadn.net.cn

  1. 调整Pulsar客户端服务网址以使用以下内容脉冲星+SSL键scheme 和 TLS 端口(通常如此)6651).spring-doc.cadn.net.cn

  2. 调整管理员客户端服务的网址以使用https://scheme 和 TLS 网页端口(通常)8443).spring-doc.cadn.net.cn

  3. 为客户提供架构商定制服务,将相关属性设定在架构商身上。spring-doc.cadn.net.cn

你可以在官方Pulsar TLS加密文档中找到更多上述信息。spring-doc.cadn.net.cn

3.2. 认证

要连接需要认证的Pulsar集群,你需要指定使用哪个认证插件以及该插件所需的参数。 使用Spring Boot自动配置时,你可以通过配置属性设置插件和插件参数(大多数情况下)。spring-doc.cadn.net.cn

你需要确保定义在以下spring.pulsar.client.authentication.param.*完全符合你验证插件的预期(通常是骆驼壳的)。 Spring Boot不会尝试为这些条目做任何放松装订。spring-doc.cadn.net.cn

例如,如果你想配置发行者的URL,AuthenticationOAuth2你必须使用的认证插件spring.pulsar.client.authentication.param.issuerUrl. 如果你使用其他形式,比如发行人url发行者网址,该设置不会应用到插件上。spring-doc.cadn.net.cn

使用环境变量作为认证参数通常存在问题,因为在翻译过程中会丢失大小写敏感性。 例如,考虑以下内容发行者网址通过环境变量设置的认证参数:spring-doc.cadn.net.cn

SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL=https://some.server.com

当 Spring Boot 加载该属性时,它会使用发行人url(小写)而非预期发行者网址(卡尔卡壳)。 你可以通过用环境变量的值来绕过这个限制,就像你application.yml中相关授权属性的值一样。 继续上述例子:spring-doc.cadn.net.cn

spring:
  pulsar:
    client:
      authentication:
        param:
          issuerUrl: ${SPRING_PULSAR_CLIENT_AUTHENTICATION_PARAM_ISSUERURL}

当不使用Spring Boot自动配置时,你可以使用org.apache.pulsar.client.api.AuthenticationFactory创建认证后,直接在你提供给客户端工厂的客户端定制器中设置。spring-doc.cadn.net.cn

以下列表展示了如何配置每种支持的认证机制。spring-doc.cadn.net.cn

点击这里查看Athenz
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationAthenz
        param:
          tenantDomain: ...
          tenantService: ...
          providerDomain: ...
          privateKey: ...
          keyId: ...
这也需要TLS加密
点击这里获取Tokens
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
        param:
          token: some-token-goes-here
点击这里了解基础内容
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationBasic
        param:
          userId: ...
          password: ...
点击这里查看OAuth2
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: ...
          privateKey: ...
          audience: ...
          scope: ...
点击这里查看Sasl
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationSasl
        param:
          saslJaasClientSectionName: ...
          serverType: ...
点击此处查看mTLS(PEM)
由于该选项需要TLS加密,而TLS本身就需要你提供客户端构建器定制器,建议直接在提供的TLS定制器中添加认证。 你可以使用org.apache.pulsar.client.api.AuthenticationFactory帮助创建认证对象如下:
Authentication auth = AuthenticationFactory.TLS("/path/to/my-role.cert.pem", "/path/to/my-role.key-pk8.pem");

请参阅官方的 Pulsar 文档,内容包括 mTLS(PEM)。spring-doc.cadn.net.cn

点击这里查看mTLS(JKS)
由于该选项需要TLS加密,而TLS本身就需要你提供客户端构建器定制器,建议直接在提供的TLS定制器中添加认证。 你可以使用org.apache.pulsar.client.api.AuthenticationFactory帮助创建认证对象如下:
Authentication auth = AuthenticationFactory.create(
        "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls",
        Map.of("keyStoreType", "JKS", "keyStorePath", "/path/to/my/keystore.jks", "keyStorePassword", "clientpw"));

请参见官方的 Pulsar 文档,内容是 mTLS(JKS)。spring-doc.cadn.net.cn

你可以在官方 Pulsar 安全文档中找到关于每个支持插件及其所需属性的更多信息。spring-doc.cadn.net.cn

4. 消息生产

4.1. 脉冲星模板

在Pulsar生产者端,Spring套自动配置提供了脉冲星模板用于出版记录。该模板实现了一个名为脉冲星运营并通过合同提供发布记录的方法。spring-doc.cadn.net.cn

这些发送API方法分为两类:发送sendAsync. 这发送方法通过 Pulsar 生产器上的同步发送功能阻断调用。 他们会退还MessageId(信息ID)消息一旦在经纪人上持久化后发布。 这sendAsync方法调用是非阻塞的异步调用。 他们会回应完成未来,你可以用它在消息发布后异步接收消息ID。spring-doc.cadn.net.cn

对于不包含主题参数的 API 变体,采用主题解析过程来确定目标主题。

4.1.1. 简单 API

该模板提供了几种方法(前头带“send”)用于简单的发送请求。对于更复杂的发送请求,流畅API可以让你配置更多选项。spring-doc.cadn.net.cn

4.1.2. 流畅 API

该模板提供了一个流畅的构建工具,可以处理更复杂的发送请求。spring-doc.cadn.net.cn

4.1.3. 消息定制

你可以指定一个TypedMessageBuilderCustomizer配置外发消息。例如,以下代码展示了如何发送带密钥的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

4.1.4. 生产者定制

你可以指定一个制作者构建定制器配置底层的Pulsar生产器构建器,最终构建用于发送消息的生产者。spring-doc.cadn.net.cn

使用时要谨慎,因为这样可以完全访问生产者构建器并调用其一些方法(例如创造)可能出现意想不到的副作用。

例如,以下代码展示了如何禁用批处理并启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个例子展示了如何在将记录发布到分区主题时使用自定义路由。 请指定你的自定义消息路由器制作人建造者如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
注意,当使用消息路由器,唯一有效的设置是spring.pulsar.producer.message-routing-mode习惯.

另一个例子展示了如何添加制片人拦截者它会拦截并变异制作者在发布给经纪人之前收到的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制工具只适用于用于发送作的生产商。 如果你想对所有生产者应用定制器,必须按照全球生产者定制描述,向生产者工厂提供定制器。spring-doc.cadn.net.cn

使用Lambda定制器时必须遵守《注意Lambda定制器》中描述的规则。

4.2. 规范信息的指定

如果你使用 Java 原语类型,框架会自动检测该模式,发布数据时无需指定任何模式类型。 对于非原始类型,如果在调用发送作时未明确指定模式,脉冲星模板Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。spring-doc.cadn.net.cn

目前支持的复杂模式类型有 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

4.2.1. 自定义模式映射

作为调用发送作时指定模式的替代方案脉冲星模板对于复杂类型,模式解析器可以配置该类型的映射。 这样就无需指定模式,因为框架通过发送消息类型访问解析器。spring-doc.cadn.net.cn

模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml为 添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
消息类型是消息类的全限定名称。

添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。spring-doc.cadn.net.cn

以下示例使用模式解析器自定义工具来添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这个配置,发送作时就无需设置指定模式。spring-doc.cadn.net.cn

4.3. 脉冲星生产工厂

脉冲星模板依赖于脉冲星生产工厂真正创造出潜在的生产者。 Spring Boot 自动配置还提供生产者工厂,您可以通过指定任意Spring。脉冲星。制作人。*应用属性。spring-doc.cadn.net.cn

如果在直接使用生产者工厂 API 时未指定主题信息,则使用脉冲星模板使用的唯一例外是省略了“消息类型默认”步骤。

4.3.1. 全球生产者定制化

该框架提供了制作者构建定制器合同允许你配置用于建造每个生产商的底层架构。 要自定义所有生产者,你可以将定制器列表传递到脉冲星生产工厂构造 函数。 使用多个自定义工具时,按列表中出现的顺序应用。spring-doc.cadn.net.cn

如果你用 Spring Boot 自动配置,你可以把自定义器指定为豆子,它们会自动传递给脉冲星生产工厂根据他们的顺序排列@Order注解。

如果你只想对单个生产者应用定制器,可以使用 Fluent API,并在发送时指定定制器spring-doc.cadn.net.cn

4.4. 脉冲星生产者缓存

每个底层脉冲星生产者都会消耗资源。 为了提升性能并避免持续生产者,生产者工厂会缓存其所创造的生产者。 它们以LRU方式缓存,并在未在配置时间内被使用时被驱逐。 缓存密钥包含足够的信息,确保调用者在后续创建请求中返回同一个生产者。spring-doc.cadn.net.cn

此外,你还可以通过指定任意一个spring.pulsar.producer.cache.*应用属性。spring-doc.cadn.net.cn

4.4.1. 对Lambda定制器的注意

任何用户提供的生产者自定义工具也包含在缓存键中。 因为缓存密钥依赖于有效的实现等值/哈希码使用Lambda定制器时必须谨慎。spring-doc.cadn.net.cn

统治:两个自定义器作为 Lambda 实现,会匹配于等值/哈希码 当且仅当它们使用相同的 Lambda 实例,且不要求定义在闭包之外的任何变量。

为了澄清上述规则,我们将看几个例子。 在以下示例中,自定义器被定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。此外,它不要求闭包外的变量。因此,它作为缓存密钥匹配。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在接下来的情况下,定制器被定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。然而,它需要一个闭包外的变量。因此,它无法匹配为缓存密钥。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在这个最后一个例子中,自定义器被定义为内联 Lambda,这意味着每个调用sendUser使用相同的 Lambda 实例。虽然它确实使用变量名,但该变量名称不源自其闭包,因此匹配为缓存键。 这说明变量可以在Lambda闭包中使用,甚至可以调用静态方法。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果你的 Lambda 自定义器没有定义一次且仅定义一次(后续调用中仍使用同一个实例),或者它需要在闭包之外定义变量,那么你必须提供一个有效的等值/哈希码实现。
如果不遵守这些规则,生产者缓存总是会失败,应用性能也会受到负面影响。

4.5. 对生产者的截获消息

添加一个制片人拦截者它允许你在发送给经纪人之前拦截并变异制作人收到的消息。 为此,你可以将拦截器列表传递到脉冲星模板构造 函数。 使用多个拦截器时,应用顺序即为它们在列表中的出现顺序。spring-doc.cadn.net.cn

如果你用 Spring Boot 自动配置,可以把拦截器指定为 Beans。 这些数据会自动传递给脉冲星模板. 拦截器的排序通过使用@Order注释如下:spring-doc.cadn.net.cn

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果你不使用起动机,你需要自己配置并注册上述组件。

5. 消息消费

5.1. 脉冲星监听器

对于Pulsar消费者,我们建议终端用户应用使用脉冲星听者注解。 使用脉冲星听者,你需要使用@EnablePulsar注解。 当你使用Spring Boot支持时,它会自动启用这个注释,并配置所有必要的组件脉冲星听者,例如消息监听者基础设施(负责创建Pulsar消费者)。PulsarMessageListenerContainer使用脉冲星消费者工厂创建和管理 Pulsar 消费者,即其用于接收消息的底层 Pulsar 消费者。spring-doc.cadn.net.cn

Spring Boot 提供了这个消费者工厂,你可以通过指定Spring.脉冲星.消费者。*应用属性。工厂中的大多数配置属性都会被监听器尊重,但有以下例外spring-doc.cadn.net.cn

spring.pulsar.consumer.subscription.name属性被忽略,当注释未指定时才生成。
spring.pulsar.consumer.subscription-type属性被忽略,而是取自注释上的值。不过,你可以设置subscriptionType = {}在注释上,改用属性值作为默认值。

让我们重新审视脉冲星听者我们在快速参观区看到的代码片段:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

你还可以进一步简化这个方法:spring-doc.cadn.net.cn

@PulsarListener
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

在这种最基本的形式中,当订阅名称@PulsarListener注释:将使用自动生成的订阅名称。 同样地,当主题这些内容并未直接提供,而是通过主题解决过程来确定目标主题。spring-doc.cadn.net.cn

脉冲星听者前面展示的方法,我们接收数据为字符串但我们没有指定任何模式类型。 在内部,该框架依赖于Pulsar的模式机制来将数据转换为所需的类型。 框架检测到你期望字符串类型,然后根据这些信息推断模式类型。 然后它向消费者提供该图式。 对于 Java 中所有原语类型,框架都做了这种推理。 对于任何复杂类型(如 JSON、AVRO 等),框架无法进行这种推理,用户需要在注释中提供模式类型,使用schemaType财产。spring-doc.cadn.net.cn

以下示例展示了另一个脉冲星听者方法,该方法具有整数:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
   System.out.println(message);
}

如下脉冲星听者方法说明我们如何从一个主题中获取复杂型:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
    System.out.println(message);
}

注意添加了aschemaType脉冲星听者. 这是因为库无法从提供的类型推断模式类型:.我们必须告诉框架应该使用哪种模式。spring-doc.cadn.net.cn

让我们再看看几种方式。spring-doc.cadn.net.cn

你可以直接接收脉冲星消息:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
}

以下示例通过使用 Spring 消息封函来消耗记录:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
}

现在让我们看看如何批量消费记录。 以下示例使用脉冲星听者以POJO批量消费记录:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

注意,在这个例子中,我们以集合的形式接收记录(列表)的对象。 此外,为了实现批量消费脉冲星听者你需要设置Batch注释上的属性为true.spring-doc.cadn.net.cn

根据实际类型列表框架尝试推断要使用的模式。 如果列表包含复类型,你仍然需要提供schemaType脉冲星听者.spring-doc.cadn.net.cn

以下内容使用了消息由 Pulsar Java 客户端提供的包络:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

以下示例使用带有Spring消息封装的批处理记录消息类型:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}

最后,你还可以使用消息Pulsar 中批次监听器的 holder 对象:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}

当你使用脉冲星听者你可以直接在注释本身上提供Pulsar的消费者属性。 如果你不想使用前面提到的启动配置属性或有多个配置,这很方便脉冲星听者方法。spring-doc.cadn.net.cn

以下示例直接使用脉冲星消费者属性于脉冲星听者:spring-doc.cadn.net.cn

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
所用的属性是直接的Pulsar消费者特性,而不是spring.pulsar.consumer应用配置属性

5.1.1. 定制消费者构建器

你可以通过以下方式自定义任何可用的字段消费者建设者使用PulsarListenerConsumerBuilderCustomizer通过提供一个@Bean类型PulsarListenerConsumerBuilderCustomizer然后将它提供给脉冲星听者如下所示。spring-doc.cadn.net.cn

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return (builder) -> builder.consumerName("myConsumer");
}
如果你的申请只有一个@PulsarListener以及一首单曲PulsarListenerConsumerBuilderCustomizerBean注册后,自定义器会自动应用。

5.2. 规范模式信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出用于脉冲星听者. 对于非原语类型,如果注释中没有明确指定模式,Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。spring-doc.cadn.net.cn

目前支持的复杂模式类型有 JSON、AVRO、PROTOBUF 和带内联编码的 KEY_VALUE。

5.2.1. 自定义模式映射

作为指定模式的替代方案脉冲星听者对于复杂类型,模式解析器可以配置该类型的映射。 这样就无需在监听器上设置模式,因为框架会根据收到的消息类型访问解析器。spring-doc.cadn.net.cn

模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml为 添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
消息类型是消息类的全限定名称。

添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。spring-doc.cadn.net.cn

以下示例使用模式解析器自定义工具来添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

有了这种配置,就无需在监听者上设置模式,例如:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
    System.out.println(user);
}

5.3. 访问脉冲星消费者对象

有时,你需要直接访问Pulsar消费者对象。 以下示例展示了如何获得它:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
    System.out.println("Message Received: " + message);
    ConsumerStats stats = consumer.getStats();
    ...
}
访问消费者通过调用接收方法,不要调用任何会改变消费者光标位置的作。 所有此类作都必须由容器完成。

5.4. 脉冲星消息监听器容器

现在我们看到了消费者端的基本交互脉冲星听者.现在让我们深入了解其内部运作脉冲星听者与底层的脉冲星消费者互动。 请记住,对于终端用户应用,在大多数情况下,我们建议使用脉冲星听者在使用 Spring for Apache Pulsar 时,直接从 Pulsar 主题中获取注释,因为该模型涵盖了广泛的应用场景。 然而,理解其原理非常重要脉冲星听者内部工作。本节将详细介绍这些细节。spring-doc.cadn.net.cn

如前所述,消息监听器容器是使用 Spring for Apache Pulsar 消息消费的核心。脉冲星听者在幕后使用消息监听器容器基础设施来创建和管理Pulsar消费者。 Spring for Apache Pulsar 通过以下方式为该消息监听器容器提供合同PulsarMessageListenerContainer. 该消息监听器容器的默认实现通过以下方式提供DefaultPulsarMessageListenerContainer. 顾名思义,PulsarMessageListenerContainer包含消息监听器。 容器创建 Pulsar 消费者,然后运行一个独立线程来接收和处理数据。 数据由提供的消息监听器实现处理。spring-doc.cadn.net.cn

消息监听器容器通过使用消费者的batchReceive方法。 数据接收后,会交给所选的消息监听器实现。spring-doc.cadn.net.cn

当你使用 Apache Pulsar 的 Spring 时,可以使用以下消息监听器类型。spring-doc.cadn.net.cn

我们将在接下来的章节中了解这些不同信息监听者的详细信息。spring-doc.cadn.net.cn

不过在此之前,让我们仔细看看这个容器本身。spring-doc.cadn.net.cn

5.4.1. DefaultPulsarMessageListenerContainer

这是一个基于消费者的单一消息监听器容器。 以下列表展示了其制造商:spring-doc.cadn.net.cn

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
			PulsarContainerProperties pulsarContainerProperties)
}

它获得了脉冲星消费者工厂(它用来创建消费者)以及一个脉冲星容器属性对象(包含容器属性信息)。脉冲星容器属性具有以下构造子:spring-doc.cadn.net.cn

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)

你可以通过以下方式提供相关主题信息脉冲星容器属性或者作为提供给消费工厂的消费品。 以下示例使用了DefaultPulsarMessageListenerContainer:spring-doc.cadn.net.cn

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
		});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
        pulsarContainerProperties);

return pulsarListenerContainer;
如果在直接使用监听器容器时未指定主题信息,则使用相同的主题解析过程脉冲星听者使用的唯一例外是省略了“消息类型默认”步骤。

DefaultPulsarMessageListenerContainer只能创造一个消费者。 如果你想通过多个线程管理多个消费者,你需要使用ConcurrentPulsarMessageListenerContainer.spring-doc.cadn.net.cn

5.4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer具有如下构造子:spring-doc.cadn.net.cn

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
    PulsarContainerProperties pulsarContainerProperties)

ConcurrentPulsarMessageListenerContainer你可以指定一个并发通过二传手获得财产。 并发于1仅允许在非独家订阅上使用(备援切换,共享密钥共享). 你只能用默认的1当你有独家订阅模式时,这主要是为了并发。spring-doc.cadn.net.cn

以下示例使得并发通过脉冲星听者注释备援切换订阅。spring-doc.cadn.net.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
    ...
    System.out.println("Current Thread: " + Thread.currentThread().getName());
    System.out.println("Current Consumer: " + consumer.getConsumerName());
}

在前一位听者中,假设主题为我的话题有三个分区。 如果是非分区主题,并发设置为3什么都没做。除了主要的活跃用户外,还有两个闲置用户。 如果主题有超过三个分区,则消息会在容器创建的消费者之间实现负载均衡。 如果你运行这个脉冲星听者你会看到来自不同分区的消息通过不同的消费者被消耗,正如前述示例中线程名称和消费者名打印出来的所示。spring-doc.cadn.net.cn

当你使用故障切换通过这种方式订阅分区主题,Pulsar保证消息顺序。

以下列表展示了另一个例子脉冲星听者,但共享订阅及并发启用。spring-doc.cadn.net.cn

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
				subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
    ...
}

在前面的例子中,脉冲星听者创建五个不同的消费者(这次假设主题有五个分区)。spring-doc.cadn.net.cn

在此版本中,没有消息排序,因为共享订阅不保证在Pulsar中会有消息排序。

如果你需要消息排序,同时又想要共享订阅类型,你需要使用Key_Shared订阅类型。spring-doc.cadn.net.cn

5.4.3. 消息消费

让我们来看看消息监听器容器如何支持单记录和批处理消息的使用。spring-doc.cadn.net.cn

单次创纪录消耗

让我们重新回顾一下基本情况脉冲星听者为了讨论方便:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

用这个脉冲星听者方法中,我们本质上要求 Spring for Apache Pulsar 每次调用一个记录的监听者方法。 我们提到消息监听器容器会通过batchReceive对消费者采取方法。 该框架检测到脉冲星听者在这种情况下,接收到一条记录。这意味着每次调用该方法时,都需要一个单记录。 虽然记录被消息监听器容器批量消耗,但它会遍历接收批次,并通过适配器调用监听器方法,脉冲星记录消息监听器. 正如你在上一节看到的,脉冲星记录消息监听器消息监听器由 Pulsar Java 客户端提供,并且支持基础收到方法。spring-doc.cadn.net.cn

批量消耗

以下示例展示了脉冲星听者批量消费记录:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
    System.out.println("records received :" + messages.size());
    messages.forEach((message) -> System.out.println("record : " + message));
}

当你使用这种类型的脉冲星听者框架检测你处于批处理模式。 因为它已经通过使用消费者的batchReceive方法,它通过适配器将整个批次交接给监听者方法,PulsarBatchMessageListener.spring-doc.cadn.net.cn

5.5. 脉冲星接收器

Pulsar 消息元数据可以作为 Spring 消息头部使用。 可用头部列表可在PulsarHeaders.java中找到。spring-doc.cadn.net.cn

5.5.1. 基于单记录的消费者访问

以下示例展示了如何在使用单记录消费模式的应用中访问各种脉冲星头部:spring-doc.cadn.net.cn

@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
                @Header(PulsarHeaders.RAW_DATA) byte[] rawData,
                @Header("foo") String foo) {

}

在前面的例子中,我们访问了messageId(信息ID原始数据消息元数据以及一个名为. Spring@Header每个头字段都使用注释。spring-doc.cadn.net.cn

你也可以用脉冲星消息作为承载有效载荷的包层。 在此过程中,用户可以直接调用 Pulsar 消息中的相应方法来获取元数据。 不过,为了方便,你也可以通过以下方式取回页眉注解。 注意你也可以使用春季消息消息包络 以携带有效载荷,然后通过以下方式获取 Pulsar 头部@Header.spring-doc.cadn.net.cn

5.5.2. 批处理记录访问基于消费者的应用

在本节中,我们将了解如何访问使用批处理消费者的应用程序中的各种脉冲星头:spring-doc.cadn.net.cn

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
					@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
					@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}

在前面的例子中,我们以List<String>. 在提取各个头部时,我们会以名单<>也。 Spring for Apache Pulsar 确保头部列表对应于数据列表。spring-doc.cadn.net.cn

你也可以像用批处理监听器一样提取头部,接收有效载荷时,像List<org.apache.pulsar.client.api.Message<?>,org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Messsge<?>.spring-doc.cadn.net.cn

5.6. 消息确认

当你使用 Spring 进行 Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍该框架如何处理消息确认。spring-doc.cadn.net.cn

5.6.1. 消息确认模式

Apache Pulsar 的 Spring 提供了以下确认消息的模式:spring-doc.cadn.net.cn

Batch确认模式是默认的,但你可以在消息监听器容器中更改。 在接下来的章节中,我们将展示当你同时使用单个版本和批量版本时,确认是如何工作的脉冲星听者以及它们如何转化为支持信息的听取器容器(最终转化为Pulsar消费者)。spring-doc.cadn.net.cn

5.6.2. 单记录模式下的自动消息确认

让我们重新审视基于单一信息的基本内容脉冲星听者:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

当你使用时,承认是如何运作的,这是很自然的脉冲星听者尤其是如果你熟悉直接使用Pulsar Consumer的话。 答案归结于消息监听器容器,因为这是Apache Pulsar春季中协调所有消费者相关活动的核心平台。spring-doc.cadn.net.cn

假设你没有覆盖默认行为,这就是你使用前述方法时幕后发生的事情脉冲星听者:spring-doc.cadn.net.cn

  1. 首先,监听器容器以批量方式接收来自 Pulsar 消费者的消息。spring-doc.cadn.net.cn

  2. 接收到的消息会传递给脉冲星听者一条消息一条地来。spring-doc.cadn.net.cn

  3. 当所有记录都传递给监听器方法并成功处理后,容器会确认原始批次中的所有消息。spring-doc.cadn.net.cn

这是正常的流程。如果原始批次中的任何记录触发异常,Spring for Apache Pulsar会单独跟踪这些记录。 当批次中的所有记录都处理完毕后,Spring for Apache Pulsar会确认所有成功消息,并对所有失败消息负面确认(nack)。 换句话说,当使用单条记录时,使用脉冲星记录消息监听器以及默认的 ACK 模式Batch被使用时,框架等待从batchReceive调用 处理成功,然后调用承认在脉冲星消费者上进行方法。 如果某个特定记录在调用处理方法时抛出异常,Apache Pulsar 的 Spring 会跟踪这些记录并单独调用否定确认在整批处理完毕后,记录上会有记录。spring-doc.cadn.net.cn

如果应用程序希望每条记录都发生确认或负面确认,则记录可以启用ACK模式。 在这种情况下,处理完每条记录后,如果没有错误,消息会被确认;如果有错误,则会被否定确认。 以下示例使得记录Pulsar监听器的ack模式:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
    System.out.println("Message Received: " + message);
}

5.6.3. 单记录模式下的手动消息 ACK

你可能不总是希望框架发送确认,而是直接从应用程序本身发送确认。 Spring for Apache Pulsar 提供了几种方式来启用手动消息确认。以下示例展示了其中一个:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
    System.out.println("Message Received: " + message.getValue());
	acknowledgment.acknowledge();
}

这里有几点值得解释。首先,我们通过设置来启用手动获取模式ack模式脉冲星听者. 启用手动 ack 模式时,Spring for Apache Pulsar 允许应用程序注入确认对象。 该框架通过选择兼容的消息监听器容器来实现这一目标:脉冲确认消息听众对于基于单条记录的消费,这让你能够访问确认对象。spring-doc.cadn.net.cn

确认object 提供了以下 API 方法:spring-doc.cadn.net.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以注射这个确认物体进入你的脉冲星听者在使用手动然后调用相应的方法之一。spring-doc.cadn.net.cn

在前述中脉冲星听者例如,我们称 无参数承认方法。 这是因为框架知道具体情况消息目前正在运营。 呼唤时确认(),你不必通过消息但实际上,使用目标类型——字符串,在本例中。 你也可以调用另一个变体承认通过提供消息ID:acknowledge.acknowledge(message.getMessageId());当你使用acknowledge(messageId),你必须通过使用留言<?>信封。spring-doc.cadn.net.cn

类似于确认的可能范围,确认API 还提供负向确认的选项。 参见前面展示的nack方法。spring-doc.cadn.net.cn

你也可以打电话承认直接在Pulsar消费者端:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
    System.out.println("Message Received: " + message.getValue());
	try {
		consumer.acknowledge(message);
	}
	catch (Exception e) {
		....
	}
}

呼唤时承认直接针对底层消费者,你需要自己处理错误。 使用确认不需要,因为框架可以帮你做到这一点。 因此,你应该使用确认使用手动确认时采用对象方式。spring-doc.cadn.net.cn

使用手动确认时,重要的是要明白该框架完全不包含任何确认。 因此,在设计应用程序时,思考正确的确认策略极为重要。

5.6.4. 批量消费中的自动消息 ack

当你批量消费记录(参见“消息确认模式”)并使用默认的确认模式Batch当整个批次成功处理时,整个批次都会被确认。 如果有任何记录抛出异常,整个批次都会被负面确认。 请注意,这可能不是生产方批次的同一批。而是那批刚从电话回来的batchReceive关于消费者spring-doc.cadn.net.cn

考虑以下批次监听器:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
    for (Foo foo : messages) {
		...
    }
}

当所有消息都出现在来的集合中(消息在此示例中)被处理,框架承认所有这些。spring-doc.cadn.net.cn

批量使用时,记录不允许使用ACK模式。 这可能会引发问题,因为应用程序可能不希望整个批次再次被重新交付。 在这种情况下,你需要使用手动确认模式。spring-doc.cadn.net.cn

5.6.5. 批量使用中的手动消息 ack

如前一节所示,当手动ack 模式设置为消息监听器容器,框架不进行任何确认,无论是正面还是负面。 这些问题完全取决于申请方来处理。 什么时候手动ack 模式已设置,Spring for Apache Pulsar 选择一个兼容的消息监听器容器:PulsarBatchAcknowledgeledgingMessageListener批量使用,这能让你获得确认对象。 以下是可用的方法确认应用程序接口:spring-doc.cadn.net.cn

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);

你可以注射这个确认物体进入你的脉冲星听者在使用手动ACK模式。 以下列表展示了基于批处理的监听器的基本示例:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
    for (Message<String> message : messages) {
		try {
			...
	        acknowledgment.acknowledge(message.getMessageId());
		}
		catch (Exception e) {
			acknowledgment.nack(message.getMessageId());
		}
    }
}

使用批处理监听器时,消息监听器容器无法知道当前正在作哪个记录。 因此,要手动确认,你需要使用其中一个超载的承认方法MessageId(信息ID)或者List<MessageId>. 你也可以用MessageId(信息ID)给批次听众。spring-doc.cadn.net.cn

5.7. 消息重发与错误处理

既然我们已经看到了两者脉冲星听者以及消息监听器容器基础设施及其各种功能,现在让我们尝试理解消息重投和错误处理。 Apache Pulsar 提供了多种原生策略用于消息重发和错误处理。我们会看看这些工具,看看如何在Spring for Apache Pulsar中使用它们。spring-doc.cadn.net.cn

5.7.1. 指定消息重投的确认超时

默认情况下,Pulsar 消费者不会重投消息,除非消费者崩溃,但你可以通过设置 Pulsar 消费者的 ack 超时来改变这种行为。 如果 ack 超时属性值大于零,且 Pulsar 用户在超时期间未确认消息,则消息会被重新传递。spring-doc.cadn.net.cn

当你使用 Spring 为 Apache Pulsar 设计时,可以通过消费者定制工具或原生 Pulsar 来设置该属性ackTimeout财产在性能属性@PulsarListener:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"ackTimeout=60s"})
public void listen(String s) {
    ...
}

当你指定确认超时时,如果消费者在60秒内没有发送确认,Pulsar会将消息重新转达给消费者。spring-doc.cadn.net.cn

如果你想指定一些带有不同延迟的ack超时高级退回选项,可以这样做:spring-doc.cadn.net.cn

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

    @PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
            topics = "withAckTimeoutRedeliveryBackoff-test-topic",
            ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
            properties = { "ackTimeout=60s" })
    void listen(String msg) {
        // some long-running process that may cause an ack timeout
    }

    @Bean
    RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

在前面的例子中,我们指定一个豆子来表示脉冲星的重送退让最小延迟为1秒,最大延迟为10秒,退避乘数为2。 在初始确认超时后,消息的重送通过该退回豆进行控制。 我们会把后退豆提供给脉冲星听者通过设置ackTimeoutRedeliveryBackoff与实际Bean名称的财产——ackTimeoutRedeliveryBackoff,在这种情况下。spring-doc.cadn.net.cn

5.7.2. 明确负面确认重送

当确认为负面时,Pulsar 消费者允许你指定应用希望如何重新传递消息。 默认是一分钟内重新发送消息,但你可以通过消费者自定义工具或原生 Pulsar 来更改负面AckRedeliveryDelay财产在性能属性@PulsarListener:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
                properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
    ...
}

你还可以通过提供重送退让豆子并提供豆子名称否定AckRedeliveryBackoffPulsarProducer 上的财产,具体如下:spring-doc.cadn.net.cn

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

    @PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
            topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
            subscriptionType = SubscriptionType.Shared)
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @Bean
    RedeliveryBackoff redeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
                .build();
    }

}

5.7.3. 使用 Apache Pulsar 中的死字母主题进行消息重发和错误处理

Apache Pulsar 允许应用程序在消费者上使用死符主题,且共享订阅类型。 对于独家故障切换订阅类型,此功能不可用。 基本思路是,如果消息被重试一定次数(可能是因为确认超时或 nack 重投),当重试次数用尽后,消息可以发送到一个称为死字母队列(DLQ)的特殊主题。 让我们通过检查一些代码片段来了解该功能的具体作细节:spring-doc.cadn.net.cn

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

    @PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
            topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
            subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

}

首先,我们有一种特殊的豆子DeadLetterPolicy,其名称为deadLetterPolicy(你可以随意取任何名字)。 这个豆子指定了许多内容,比如最大送达时间(这里是10)和死字母主题的名称——我的DLQ主题,在这种情况下。 如果你没有指定DLQ主题名称,默认为<topicname>-<subscriptionname>-DLQ在脉冲星。 接下来,我们将这个豆子命名为脉冲星听者通过设置deadLetterPolicy财产。 注意脉冲星听者拥有一种订阅类型共享,因为DLQ功能只适用于共享订阅。 本代码主要用于演示,因此我们提供ackTimeout值为1秒。 其理念是代码抛出异常,如果 Pulsar 在 1 秒内未收到确认,则进行重试。 如果这个循环持续十次(因为这是我们最大的重送次数DeadLetterPolicy),Pulsar 消费者将消息发布到 DLQ 主题。 我们又来了脉冲星听者它监听 DLQ 主题,接收发布到该主题的数据。spring-doc.cadn.net.cn

关于使用分区主题时的DLQ主题的特别说明

如果主主题被分割,幕后每个分割都会被Pulsar视为独立主题。 脉冲星附录分区<n>哪里n代表主主题名称的分区号。 问题是,如果你没有指定DLQ主题(与我们上面做的相反),Pulsar会发布到默认主题名称中,该主题名称包含以下内容'分<n>其中的信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. 解决这个问题的简单方法是始终提供一个DLQ主题名称。spring-doc.cadn.net.cn

5.7.4. Apache Pulsar Spring 中的原生错误处理

正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。 如果应用程序需要对非共享订阅使用类似功能,该怎么办? Pulsar 不支持 DLQ 的独占和备用切换订阅的主要原因是这些订阅类型是订单保证的。 允许重投、DLQ等实际上是报纸顺序错乱。 但是,如果某个应用对此没问题,但更重要的是,非共享订阅需要这个DLQ功能呢? 为此,Spring for Apache Pulsar 提供了脉冲星消费者错误处理你可以在Pulsar中的任何订阅类型中使用:独家,故障切换,共享Key_Shared.spring-doc.cadn.net.cn

当你使用脉冲星消费者错误处理来自Apache Pulsar的Spring,确保不要在监听器上设置ack超时属性。spring-doc.cadn.net.cn

让我们通过查看几个代码片段来了解一些细节:spring-doc.cadn.net.cn

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

    @Bean
    PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
            PulsarTemplate<String> pulsarTemplate) {
        return new DefaultPulsarConsumerErrorHandler<>(
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
    }

    @PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
                    topics = "pulsarConsumerErrorHandler-topic",
					pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
    void listenDlt(String msg) {
        System.out.println("From DLT: " + msg);
    }

}

考虑pulsarConsumerErrorHandler豆。 这就形成了 的脉冲星消费者错误处理并采用了 Spring 为 Apache Pulsar 默认提供的实现:默认脉冲消耗者错误处理.默认脉冲消耗者错误处理有一个构造子,使得脉冲星消息恢复工厂以及一个org.springframework.util.backoff.Backoff.脉冲星消息恢复工厂是一个功能接口,支持以下 API:spring-doc.cadn.net.cn

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

	/**
	 * Provides a message recoverer {@link PulsarMessageRecoverer}.
	 * @param consumer Pulsar consumer
	 * @return {@link PulsarMessageRecoverer}.
	 */
	PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}

recovererForConsumer(恢复者消费者)方法接收一个脉冲星消费者并返回脉冲星消息恢复器,这是另一个功能接口。 以下是 的 API脉冲星消息恢复器:spring-doc.cadn.net.cn

public interface PulsarMessageRecoverer<T> {

	/**
	 * Recover a failed message, for e.g. send the message to a DLT.
	 * @param message Pulsar message
	 * @param exception exception from failed message
	 */
	void recoverMessage(Message<T> message, Exception exception);

}

Apache Pulsar 的 Spring 提供了一个实现脉冲星消息恢复工厂脉冲星死信出版恢复者该系统提供默认实现,可以通过发送消息到死信主题(DLT)来恢复消息。 我们向前述构造子提供该实现默认脉冲消耗者错误处理. 作为第二个参数,我们给出一个固定退后. 你也可以提供指数退缩来自Spring,用于高级退场功能。 然后我们给出这个豆名脉冲星消费者错误处理作为脉冲星听者. 该性质称为pulsarConsumerErrorHandler. 每当脉冲星听者如果该方法对消息失败,则会重新尝试。 重试次数由退避提供实现值。在我们的例子中,我们做了10次重试(总共11次——第一次再10次)。 当所有重试次数用尽后,消息会发送到DLT主题。spring-doc.cadn.net.cn

脉冲星死信出版恢复者我们提供的实现是脉冲星模板用于向DLT发布消息。 在大多数情况下,同样的自动配置脉冲星模板仅仅针对分区主题有个前提。 使用分区主题并对主主题使用自定义消息路由时,必须使用不同的脉冲星模板这不需要自动配置脉冲星生产工厂该 值为自定义分区消息路由模式. 你可以用脉冲星消费者错误处理并附有以下蓝图:spring-doc.cadn.net.cn

@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
    PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
        PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

        BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
                (c, m) -> "my-foo-dlt";

        PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
                new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

        return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
                new FixedBackOff(100, 5));
}

注意,我们为脉冲星死信出版恢复者作为第二个构造子论证。 如果没有提供,脉冲星死信出版恢复者使用<订阅名>-<topic-name>-DLT>作为DLT主题名称。 使用此功能时,你应该通过设置目标解析器来使用正确的目标名称,而不是默认名称。spring-doc.cadn.net.cn

使用单记录消息监听器时,就像我们之前做的那样PulsarConsumerErrorHnadler如果你使用手动确认,确保在异常出现时不要负面确认该消息。 而是将异常重新抛回容器。否则,容器会认为消息被单独处理,错误处理不会被触发。spring-doc.cadn.net.cn

最后,我们有第二个脉冲星听者该机构接收来自DLT主题的消息。spring-doc.cadn.net.cn

在本节迄今为止提供的示例中,我们只看到如何使用脉冲星消费者错误处理使用单一记录消息监听器。 接下来,我们会看看如何将它用于批量监听器。spring-doc.cadn.net.cn

5.7.5. 带有PulsarConsumerErrorHandler的批处理监听器

首先,让我们来看一批脉冲星听者方法:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
            subscriptionType = SubscriptionType.Failover,
            pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
    for (Message<Integer> datum : data) {
        if (datum.getValue() == 5) {
            throw new PulsarBatchListenerFailedException("failed", datum);
        }
        acknowledgement.acknowledge(datum.getMessageId());
    }
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
        PulsarTemplate<String> pulsarTemplate) {
    return new DefaultPulsarConsumerErrorHandler<>(
            new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
    System.out.println("DLT - RECEIVED: " + message.getValue());
}

我们再次提供pulsarConsumerErrorHandler具有脉冲星消费者错误处理豆子名。当你使用批处理监听器(如前例所示)并想使用脉冲星消费者错误处理来自Spring for Apache Pulsar,你需要使用手动确认。这样,你可以确认所有成功的单独消息。对于失败的消息,你必须抛出PulsarBatchListenerFailedException与失败消息一起。没有此例外,框架无法处理失败。重试时,容器会从失败消息开始向监听者发送新一批消息。如果再次失败,则重试,直到重试用尽,消息才发送给DLT。此时,容器确认消息,监听者会与原批次中的后续消息一同交接。spring-doc.cadn.net.cn

5.8. PulsarListener上的消费者定制

Apache Pulsar 的 Spring 提供了一种方便的方式,可以自定义由所用容器创建的消费者脉冲星听者. 应用程序可以提供一个PulsarListenerConsumerBuilderCustomizer. 这里有一个例子。spring-doc.cadn.net.cn

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
    return cb -> {
        cb.subscriptionName("modified-subscription-name");
    };
}

然后,这个定制豆名可以作为属性在PuslarListener(普斯拉听者)注释如下所示。spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "my-subscription",
        topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}

该框架通过脉冲星听者并在创建脉冲星消费者之前,将该定制器应用到消费者构建者身上。spring-doc.cadn.net.cn

如果你有多重脉冲星听者方法,每种都有不同的自定义规则,你应该创建多个自定义豆,并在每个豆子上安装合适的定制器脉冲星听者.spring-doc.cadn.net.cn

5.9. 暂停和恢复消息监听器容器

在某些情况下,应用程序可能希望暂时暂停消息消费,然后稍后恢复。Apache Pulsar 的 Spring 提供了暂停和恢复底层消息监听器容器的功能。当 Pulsar 消息监听器容器被暂停时,容器对接收 Pulsar 消费者数据所做的任何轮询都会被暂停。同样,当容器恢复时,如果主题在暂停期间添加了任何新记录,下一次轮询会开始返回数据。spring-doc.cadn.net.cn

要暂停或恢复监听器容器,首先通过PulsarListenerEndpointRegistry然后在容器实例上调用暂停/恢复API——如下图所示:spring-doc.cadn.net.cn

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
  PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
  container.pause();
}
id参数传递给getListenerContainer是容器ID——将是@PulsarListener暂停/继续时的ID属性@PulsarListener.

5.10. 脉冲星读卡器支持

该框架支持通过脉冲星读者工厂.spring-doc.cadn.net.cn

Spring Boot 提供了这个读取器工厂,你可以通过指定任意Spring脉冲星读者。*应用属性。spring-doc.cadn.net.cn

5.10.1. 脉冲星读者注释

虽然可以使用脉冲星读者工厂直接来说,Spring for Apache Pulsar 提供了脉冲星读者注释,你可以用它快速阅读主题,无需自己搭建任何阅读器工厂。这与背后的相同思路相似脉冲听众。这里有一个简短的例子。spring-doc.cadn.net.cn

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
    //...
}

身份证属性是可选的,但最佳实践是提供对你的应用有意义的值。如果未指定,则使用自动生成的ID。另一方面,主题startMessageId(启动消息ID)属性是必修的。 这主题属性可以是单个主题,也可以是逗号分隔的主题列表。 这startMessageId(启动消息ID)属性指示读者从主题中的某个特定消息开始。有效值为startMessageId(启动消息ID)最早最近的。假设你希望读者从非最早或最新的可用消息开始任意阅读。在这种情况下,你需要使用ReaderBuilderCustomizer以定制读者构建器所以它知道正确的权利MessageId(信息ID)从这里开始。spring-doc.cadn.net.cn

5.10.2. 定制读者构建器

你可以通过以下方式自定义任何可用的字段读者构建器使用脉冲读者读者构建器定制器春季为Apache Pulsar提供。你可以提供@Bean类型脉冲读者读者构建器定制器然后将它提供给脉冲星读者如下所示。spring-doc.cadn.net.cn

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
    readerCustomizer = "myCustomizer")
void read(String message) {
    //...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
    return readerBuilder -> {
        readerBuilder.startMessageId(messageId); // the first message read is after this message id.
        // Any other customizations on the readerBuilder
    };
}
如果你的申请只有一个@PulsarReader以及一首单曲脉冲读者读者构建器定制器Bean注册后,自定义器会自动应用。

6. 主题解决

在生成或接收消息时需要一个目标主题。 该框架按以下顺序查找主题(从第一个发现处停止):spring-doc.cadn.net.cn

当通过默认机制找到主题时,无需在生产或消费API中指定主题。spring-doc.cadn.net.cn

当找不到某个主题时,API 会相应地抛出异常。spring-doc.cadn.net.cn

6.1. 用户指定

输入到所使用的API中的主题具有最高的优先级(例如,PulsarTemplate.send(“我的主题”,myMessage)@PulsarListener(topics = “我的主题”).spring-doc.cadn.net.cn

6.2. 消息类型默认

当API中没有传入主题时,系统会寻找针对所生成或消费消息类型配置的消息类型到主题映射。spring-doc.cadn.net.cn

映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml配置默认主题以在消费或生产时使用酒吧消息:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.Foo
          topic-name: foo-topic
        - message-type: com.acme.Bar
          topic-name: bar-topic
消息类型是消息类的全限定名称。
如果消息(或第一条消息)是发行人输入)是框架无法从中确定主题。如果您的申请可能会发送,则会采用另一种方法来指定主题消息。

6.2.1. 自定义主题解析器

添加映射的首选方法是上述属性。 然而,如果需要更多控制,可以通过证明自己的实现来替换默认解析器,例如:spring-doc.cadn.net.cn

@Bean
public MyTopicResolver topicResolver() {
	return new MyTopicResolver();
}

6.3. 生产者全球违约

制作时最后咨询的地点是系统范围内的生产者默认话题。 它通过spring.pulsar.producer.topic-name使用命令式 API 时的属性spring.pulsar.reactive.sender.topic-name在使用响应式API时。spring-doc.cadn.net.cn

6.4. 消费者全球违约

最终查询的地点(在消费时)是系统范围的消费者默认话题。 它通过spring.pulsar.consumer.topicsspring.pulsar.consumer.topics-pattern当使用命令式 API 和其中一个spring.pulsar.reactive.consumer.topicsspring.pulsar.reactive.consumer.topics-pattern在使用响应式API时。spring-doc.cadn.net.cn

7. 发布与消费分割主题

在下面的例子中,我们将发布到一个名为你好-脉冲星-分割. 这是一个被划分的主题,对于这个示例,我们假设该主题已经由三个划分创建。spring-doc.cadn.net.cn

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的例子中,我们发布到一个分区主题,并且希望将某个数据段发布到特定的分区。 如果保持 Pulsar 默认状态,它会采用轮询分区分配模式,我们希望覆盖它。 为此,我们提供了一个带有发送方法。 考虑实现的三条消息路由器。FooRouter总是将数据发送到分区0,条形路由器发送到分区1嗡嗡路由器发送到分区2. 还要注意,我们现在使用sendAsync方法脉冲星模板返回完成未来. 运行应用程序时,我们还需要设置消息路由模式制作人自定义分区 (spring.pulsar.producer.message-routing-mode).spring-doc.cadn.net.cn

在消费者端,我们使用脉冲星听者采用专属订阅类型。 这意味着所有分区的数据最终都集中在同一个消费者手中,且没有排序保证。spring-doc.cadn.net.cn

如果我们希望每个分区都被一个不同的消费者消费,我们该怎么办? 我们可以切换到备援切换订阅模式并添加三个独立的消费者:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

采用这种方法时,单个分区总是被专用消费者消耗。spring-doc.cadn.net.cn

类似地,如果你想用Pulsar的共享消费者类型,可以用共享订阅类型。 然而,当你使用该系统时共享模式中,你失去了任何排序保证,因为单个消费者可能会在另一个消费者之前收到来自所有分区的消息。spring-doc.cadn.net.cn

请考虑以下例子:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}