領先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多親愛的 Spring 社群:
我們很高興地宣布 Spring Integration Kafka 1.0 GA 擴充功能現已推出,它為 Apache Kafka 提供了熟悉的 Spring Integration 端點。 如往常一樣,使用具有 Maven 或 Gradle 的 Release Repository
compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"
或下載發佈版本壓縮檔,來試用看看。
首先,感謝所有為該專案做出貢獻的人,特別感謝該專案的創始人 Soby Chacko,他實現了基礎架構,以及基於 High Level Consumer 的訊息來源和生產者,以及 Marius Bogoevici,他學習了用於訊息驅動消費者的 Simple Consumer API 的複雜性。
總覽
這個專案完全基於 Apache Kafka(0.8.1.1 版本)和 Spring Integration 基礎(4.0.5.RELEASE 版本)並不令人意外。 我們提供了幾個抽象概念,例如 Configuration
、ConnectionFactory
、KafkaMessageListenerContainer
、KafkaConsumerContext
/KafkaProducerContext
、KafkaMessage
等,以遵循 Spring 的解耦和易於使用的原則。 基於這些抽象概念,我們提供了高階 API,例如 KafkaMessageDrivenChannelAdapter
、KafkaHighLevelConsumerMessageSource
和 KafkaProducerMessageHandler
,它們是 Spring Integration 中的适配器。 還提供了 XML 配置支持。
KafkaHighLevelConsumerMessageSource
Kafka High Level Consumer 使用 <int-kafka:inbound-channel-adapter>
和 <int-kafka:consumer-context>
呈現,以使用 KafkaStream
從 Kafka 主題 poll
訊息。 它的主要優點是易於使用,並且如果訊息來源的多個實例並行運行,則能夠平衡消費者之間的分割區。
典型的配置可能如下所示
<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
channel="inputFromKafka">
<int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="4000"
zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="default"
value-decoder="valueDecoder"
key-decoder="valueDecoder"
max-messages="5000">
<int-kafka:topic id="test1" streams="4"/>
<int-kafka:topic id="test2" streams="4"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
如您所見,除了 <int-kafka:consumer-context>
之外,還需要引用 zookeeperConnect
。 這是一個簡單的 bean,它表示與 Zookeeper 集群的連線
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="host1,host2,host3:2182"
zk-connection-timeout="6000"
zk-session-timeout="6000"
zk-sync-time="2000"/>
KafkaHighLevelConsumerMessageSource
產生具有 Map<String, Map<Integer, List<Object>>>
payload
的 Message
,它類似於「每個主題的按分割區劃分的 Kafka 訊息」。
KafkaMessageDrivenChannelAdapter
Kafka Simple Consumer 使用 <int-kafka:message-driven-adapter>
呈現,並且基於眾所周知的 ListenerContainer
抽象 - KafkaMessageListenerContainer
(類似於 Spring AMQP SimpleMessageListenerContainer
或 Spring JMS DefaultMessageListenerContainer
)
@Bean
public Configuration zkConfiguration() {
return new ZookeeperConfiguration(new ZookeeperConnect());
}
@Bean
public ConnectionFactory kafkaConnectionFactory() {
return new DefaultConnectionFactory(zkConfiguration());
}
@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter adapter =
new KafkaMessageDrivenChannelAdapter(
new KafkaMessageListenerContainer(kafkaConnectionFactory(),
"topic1", "topic2"));
adapter.setOutputChannel(inputChannel());
return adapter;
}
此元件的主要優點是可以更好地控制一個元件監聽的分割區(這些分割區是可配置的),以及起始偏移量(例如,在需要重播主題時)。 此外,還提供了更豐富的偏移量管理和錯誤處理策略。
監聽任務的結果是單個 Message
,其 payload
基於 Kafka 訊息和具有來自 KafkaHeaders
的鍵的其他 headers
。 在分割區內保留排序。
這兩個适配器都可以使用 kafka.serializer.Decoder
配置 Kafka 訊息和 Kafka 訊息鍵。 Spring Integration Kafka 提供了開箱即用的 Avro Encoder/Decoder
實現。
此外,Spring Integration Kafka 引入了 OffsetManager
抽象,用於處理 Kafka 主題偏移量,這在 High Level Consumer
中不可用。 提供 MetadataStoreOffsetManager
和 KafkaTopicOffsetManager
。 OffsetManager
必須注入到 KafkaMessageListenerContainer
。 預設情況下,使用 MetadataStoreOffsetManager
,它由 Spring Integration Core 中的 SimpleMetadataStore
提供支持。
KafkaProducerMessageHandler
Kafka Producer 使用 <int-kafka:outbound-channel-adapter>
和 <int-kafka:producer-context>
對呈現。 最後一個使用目標 Kafka Producer
的配置,該配置由來自 MessageHeaders
的 KafkaHeaders.TOPIC
或 <int-kafka:outbound-channel-adapter>
上的 topic-expression
選擇,並與 <int-kafka:producer-configuration>
子元素上配置的 topic
選項匹配
<int-kafka:producer-context id="kafkaProducerContext">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
key-class-type="java.lang.String"
value-class-type="java.lang.String"
topic="test1"
value-encoder="kafkaEncoder"
key-encoder="kafkaEncoder"
compression-codec="default"/>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test2"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
如您所見,有足夠的選項來調整目標 Producer
,並且每個 Producer
都可以由特定的 broker-list
提供支持。 如果僅存在單個 <int-kafka:producer-configuration>
,則您可以將訊息傳送到任何 topic
,例如,基於 Message
上下文標頭。
Spring XD 使用這些适配器作為 Kafka Source 和 Sink。 此外,它還提供了 KafkaMessageBus
。 這些功能將很快在 Spring XD 1.1 RELEASE 中提供。
此外,在我們完成此發佈時,Apache Kafka 0.8.2 也收到了最終發佈。 我們很高興祝賀該團隊,我們將在不久的將來整合可用的新功能 - 這僅僅是在 Spring 應用程式中提供對 Kafka 的首要支持的開始!
有關更多資訊,請參閱專案主頁。
與往常一樣,我們期待您的評論和反饋(StackOverflow(spring-integration
標籤)、Spring JIRA、GitHub),我們非常歡迎貢獻!