領先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多我謹代表 Spring Integration 和 Spring Cloud Stream 團隊宣布 spring-kafka
(Spring for Apache Kafka) 版本 1.0
的候選版本現已推出。
構件 org.springframework.kafka:spring-kafka:1.0.0.RC1
和 org.springframework.kafka:spring-kafka-test:1.0.0.RC1
可在 Milestone 儲存庫中取得。
首先,非常感謝所有參與者,感謝提供功能要求和貢獻的活躍社群成員。 特別感謝 Martin Dam,他花費了大量時間協助我們使用 pause/resume
演算法來處理緩慢的監聽器。
自 第二個里程碑以來,進行了一些重做和內務處理,包括
ConsumerRebalanceListener
可以注入到 MessageListenerContainer
中;
可以使用 ConsumerFactory
和 ProducerFactory
中的 key
和 value
的 (De)Serializer
自訂 KafkaConsumer
和 KafkaProducer
;這是一種替代使用屬性進行設定的方法。
根據 Jackson 函式庫,提供了 JsonSerializer
和 JsonDeserializer
;
提供了 RecordFilterStrategy
和 FilteringMessageListenerAdapter
,以便允許跳過記錄或處理重複傳遞;
提供了一個不言自明的 RetryingMessageListenerAdapter
;
在可配置的時間之後,當 MessageListenerContainer
移至閒置狀態時,會發出 ListenerContainerIdleEvent
;
提供 TopicPartitionInitialOffset
以允許設定 MessageListenerContainer
以分配給主題中的特定分割區,並可選擇在啟動時尋求所需的偏移量;
@KafkaListener
的 @TopicPartition
屬性已使用 @PartitionOffset
屬性進行增強,以進行初始偏移量設定;
改進了消費者重新平衡事件。
為此候選版本實作的主要功能是在目標監聽器處理記錄速度緩慢時暫停/恢復 KafkaConsumer
。 如果我們沒有在 session.timeout.ms
內輪詢 KafkaConsumer
,Kafka 會假定我們的消費者有缺陷並啟動分割區重新平衡程序(當使用群組管理進行分割區分配時)。 為了保持活動狀態,我們必須輪詢消費者。 考慮這樣一種情況:Kafka 在輪詢時傳回了大量記錄,並且處理這些記錄所花費的時間超過了會話逾時。 為此,可以使用 enablePause
、pauseAfter
和 queueDepth
設定 MessageListenerContainer
,並且 KafkaConsumer
將被暫停(輪詢繼續但不會收到任何記錄),直到監聽器完成其工作並且消費者恢復。 再次感謝 Martin Dam 在此功能上的幫助和他的測試。
另請參閱 參考手冊 以獲取更多資訊。
不要錯過 Spring Integration Kafka 2.0 的 候選版本,該版本經過重新設計,以基於此 Spring for Apache Kafka 基礎。
我們預計在大約一周內發布 GA,因此請嘗試候選版本並盡快報告任何問題。
Gary Russell 將在 Spring One Platform 上談論 Spring for Apache Kafka,該平台將於今年 8 月 1 日至 4 日在拉斯維加斯舉行。 還有許多其他精彩的講座,因此請 查看議程 並 取得門票(如果您尚未這樣做)。