|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
寻求特定偏移
为了寻求,你的听者必须实现消费者寻求,具有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
这registerSeekCallback在容器启动和分配分区时调用。在初始化后的任意时间寻找时,应使用该回调。你应保存回调的引用。如果你在多个容器中使用同一个监听器(或在ConcurrentMessageListenerContainer),你应该把回调存储在一个ThreadLocal或由监听者键控的其他结构线.
使用组管理时,onPartitionsAssigned当分配分区时调用。
例如,你可以用这种方法来设置分区的初始偏移量,方法是调用回调。
你也可以用这种方法将该线程的回调与分配的分区关联起来(见下面的示例)。
你必须使用回调参数,而不是输入的那个参数registerSeekCallback.
从版本 2.5.5 开始,即使使用手动分区分配,也会调用该方法。
在被取消的分区上当容器被停止或卡夫卡撤销任务时,会被叫走。
你应该丢弃本帖的回调,并删除与被撤销分区的任何关联。
回调方法如下:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
String getGroupId();
两种不同的变体寻求方法提供了一种寻找任意偏移量的方法。
采用功能作为计算偏移的论据,在框架3.2版本中加入了。
该函数提供当前偏移量(消费者返回的当前位置,即下一个要取的偏移量)。
用户可以根据用户当前的偏移量,作为函数定义的一部分,决定寻求哪个偏移量。
seekRelative在2.3版本中加入,用于执行相对寻址。
-
抵消负 和至今false- 相对于划分末端进行寻觅。 -
抵消正且至今false- 相对于划分起始的寻觅。 -
抵消负 和至今true- 相对于当前位置进行寻道(倒带)。 -
抵消正且至今true- 相对于当前位置的寻道(快进)。
这seekToTimestamp方法也在2.3版本中被添加。
当为多个分区寻找相同的时间戳时,onIdleContainer或onPartitionsAssigned第二种方法更受青睐,因为在一次呼叫消费者的时间偏移方法。
当从其他地点呼叫时,容器会收集所有时间戳寻道请求,并对时间偏移. |
你也可以从 执行寻道作onIdleContainer()当检测到空闲容器时。
有关如何启用闲置容器检测,请参见“检测闲置和无响应消费者”。
这开始接受集合的方法非常有用,例如在处理压缩主题时,你希望每次启动应用时都回到开头: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意寻觅,请使用以下回调引用registerSeekCallback针对合适的讨论串。
这里有一个简单的 Spring Boot 应用,演示如何使用回调;它会发送10条记录来讨论该主题;打<进来>在控制台中,所有分区都指向起始。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简化作,2.3 版本增加了摘要消费者寻求感知类,用于跟踪主题/分区需要使用的回调。
以下示例展示了如何在容器每次空闲时,在每个分区中寻找最后处理的记录。
它还支持允许任意外部调用将分区倒带一条记录的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getTopicsAndCallbacks()
.forEach((tp, callbacks) ->
callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
);
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbacksFor(new TopicPartition(topic, partition))
.forEach(callback -> callback.seekRelative(topic, partition, -1, true));
}
}
2.6版本为抽象类增加了便利方法:
-
seekToBeginning()- 寻找所有分配到起始的分区。 -
seekToEnd()- 寻求所有分配到末尾的分区。 -
seekToTimestamp(长时间戳)- 寻找所有分配给该时间戳所代表偏移量的分区。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listen(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
从3.3版本开始,新增方法getGroupId()在ConsumerSeekAware.ConsumerSeekCallback接口。
当您需要识别与特定寻呼相关的用户群体时,这种方法尤其有用。
当使用扩展的类时摘要消费者寻求感知,在一个监听器上执行的寻道作可能影响同一类中的所有监听者。
这并不总是理想的行为。
针对这个问题,你可以使用getGroupId()回调提供了方法。
这使你能够有选择地执行搜索作,只针对感兴趣的消费者群体。 |