領先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多我很高興宣布 spring-integration-kafka
(Spring Integration Kafka Support) 的第一個里程碑版本 2.0
現已推出。
Spring Integration Kafka 擴充專案為 Apache Kafka 提供 inbound
和 outbound
頻道適配器。
從 2.0 版本開始,該專案是基於新的 spring-kafka
專案的完整重寫,該專案使用 Kafka 0.9.x.x
提供的純 Java Producer
和 Consumer
客戶端。
artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1
可在 里程碑 儲存庫中找到。
由於 spring-kafka
專案提供了 MessageListenerContainer
基礎,因此現在 KafkaMessageDrivenChannelAdapter
的定義非常簡單
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
透過 XML 配置,我們只需要聲明單一組件即可
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
由於 spring-kafka
專案提供了 KafkaTemplate
基礎,因此 KafkaProducerMessageHandler
也非常簡單
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
XML 配置也已簡化
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
從版本 1.2
開始,Spring Integration Java DSL 引入了 Kafka09
Factory,以涵蓋來自這個新的 2.0
版本的前述頻道適配器的功能。 例如,生產部分可能如下所示
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
最後,請不要錯過 Spring for Apache Kafka 的公告!
連同下一個 Spring for Apache Kafka,我們可能會考慮為 Kafka Streams 實作一些適配器。
由於該專案的代碼庫變得非常簡單,並且看起來 Apache Kafka API 將會 穩定,因此我們打算在時機成熟時,將此專案合併到 Spring Integration Code 5.0 中。
同時,我們期待您的回饋,如果一切順利,計劃在接下來的幾週內發布 2.0.0.RELEASE
!