快速导览
前提条件:你必须安装并运行Apache Kafka。
然后你必须为阿帕奇·卡夫卡(春-卡夫卡) JAR及其对你阶级路径的所有依赖。
最简单的方法是在你的构建工具中声明一个依赖。
如果你没有使用Spring Boot,请声明春-卡夫卡将 jar 作为你项目中的依赖。
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>4.0.0</version>
</dependency>
compile 'org.springframework.kafka:spring-kafka:4.0.0'
| 使用Spring Boot时(如果你还没用 start.spring.io 创建项目),省略版本,Boot会自动导入与你Boot版本兼容的正确版本: |
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
implementation 'org.springframework.kafka:spring-kafka'
不过,最快的入门方式是使用 start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)创建一个项目,选择“Spring for Apache Kafka”作为依赖。
开始
最简单的入门方式是使用 start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)创建一个项目,选择“Spring for Apache Kafka”作为依赖。 有关其对基础设施豆的主观自动配置的更多信息,请参阅 Spring Boot 文档。
这里有一个最小的消费者应用。
Spring Boot 消费者应用
应用
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@KafkaListener(id = "myId", topics = ["topic1"])
fun listen(value: String?) {
println(value)
}
}
fun main(args: Array<String>) = runApplication<Application>(*args)
application.properties
spring.kafka.consumer.auto-offset-reset=earliest
这新主题Bean会在经纪人上创建话题;如果该主题已经存在,则无需使用。
Spring Boot Producer 应用
应用
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@Bean
fun runner(template: KafkaTemplate<String, String>) =
ApplicationRunner { template.send("topic1", "test") }
companion object {
@JvmStatic
fun main(args: Array<String>) = runApplication<Application>(*args)
}
}
使用 Java 配置(无 Spring 启动)
Apache Kafka 的 Spring 设计用于 Spring 应用上下文。
例如,如果你自己在 Spring 上下文之外创建监听器容器,除非满足所有...意识到的容器实现的接口。 |
这里有一个不使用 Spring Boot 的应用程序示例;它同时具有消费者和制作人.
无Spring靴
-
Java
-
Kotlin
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
class Sender(private val template: KafkaTemplate<Int, String>) {
fun send(toSend: String, key: Int) {
template.send("topic1", key, toSend)
}
}
class Listener {
@KafkaListener(id = "listen1", topics = ["topic1"])
fun listen1(`in`: String) {
println(`in`)
}
}
@Configuration
@EnableKafka
class Config {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
ConcurrentKafkaListenerContainerFactory<Int, String>().apply {
setConsumerFactory(consumerFactory)
}
@Bean
fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
)
@Bean
fun sender(template: KafkaTemplate<Int, String>) = Sender(template)
@Bean
fun listener() = Listener()
@Bean
fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)
val senderProps = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.LINGER_MS_CONFIG to 10,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to IntegerSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)
}
如你所见,当不使用 Spring Boot 时,你必须定义多个基础设施节点。