Spring Integration Kafka Support 2.0.0.M1 現已推出

發布 | Artem Bilan | 2016 年 4 月 11 日 | ...

我很高興宣布 spring-integration-kafka (Spring Integration Kafka Support) 的第一個里程碑版本 2.0 現已推出。

Spring Integration Kafka 擴充專案為 Apache Kafka 提供 inboundoutbound 頻道適配器。

從 2.0 版本開始,該專案是基於新的 spring-kafka 專案的完整重寫,該專案使用 Kafka 0.9.x.x 提供的純 Java ProducerConsumer 客戶端。

artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1 可在 里程碑 儲存庫中找到。

主要特性

Kafka 消費者頻道適配器

由於 spring-kafka 專案提供了 MessageListenerContainer 基礎,因此現在 KafkaMessageDrivenChannelAdapter 的定義非常簡單

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

透過 XML 配置,我們只需要聲明單一組件即可

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

Kafka 生產者頻道適配器

由於 spring-kafka 專案提供了 KafkaTemplate 基礎,因此 KafkaProducerMessageHandler 也非常簡單

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

XML 配置也已簡化

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL 變更

從版本 1.2 開始,Spring Integration Java DSL 引入了 Kafka09 Factory,以涵蓋來自這個新的 2.0 版本的前述頻道適配器的功能。 例如,生產部分可能如下所示

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

最後,請不要錯過 Spring for Apache Kafka 的公告!

下一步

連同下一個 Spring for Apache Kafka,我們可能會考慮為 Kafka Streams 實作一些適配器。

由於該專案的代碼庫變得非常簡單,並且看起來 Apache Kafka API 將會 穩定,因此我們打算在時機成熟時,將此專案合併到 Spring Integration Code 5.0 中。

同時,我們期待您的回饋,如果一切順利,計劃在接下來的幾週內發布 2.0.0.RELEASE

專案頁面 | 說明

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進制檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部