領先一步
VMware 提供培訓和認證,加速您的進展。
瞭解更多我們很高興宣布 Reactor Kafka 1.0.0 的第一個里程碑版本發布。
Reactor Kafka 是一個基於 Project Reactor 的 Apache Kafka 反應式 API。Reactor Kafka API 讓訊息能夠以非阻斷式背壓和極低的管理負擔,透過函數式 API 發布到 Kafka 主題和從 Kafka 主題消費。這讓使用 Reactor 的應用程式能夠使用 Kafka 作為訊息匯流排或串流平台,並與其他系統整合以提供端對端的反應式管線。
Reactor Kafka 的價值主張在於有效利用具有多個外部互動(其中 Kafka 是外部系統之一)的應用程式中的資源。端對端反應式管線受益於非阻斷式背壓和有效率的執行緒使用,從而能夠有效率地處理大量並行請求。Project Reactor 提供的最佳化功能,讓反應式應用程式的開發能夠以極低的管理負擔和可預測的容量規劃,來交付低延遲、高吞吐量的管線。
若要開始使用並執行範例反應式 Kafka 生產者和消費者,請依照參考指南的「開始使用」章節中的指示操作。
Reactor Kafka API 是基於 Apache Kafka Producer/Consumer API,並包含兩個主要類別
Sender
用於發布訊息到 Kafka 主題
Receiver
用於從 Kafka 主題消費訊息
底層 Kafka Producer
和 Consumer
的完整功能由這些反應式介面提供。
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
建立 Sender
要發送到 Kafka 的出站訊息 Flux
反應式發送
記錄每次發送的結果
訂閱以開始將訊息流向 Kafka
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
建立 ReceiverOptions
並設定訂閱 Kafka 主題
建立 Receiver
反應式接收
記錄每個傳入的訊息
在處理訊息後確認,以便可以提交偏移量
Reactor Kafka 原始碼和範例可在 github 上取得。
如需更多資訊和其他資源,請參閱 Reactor Kafka 參考指南 和 Javadocs。