搶先一步
VMware 提供培訓和認證,以加速您的進展。
瞭解更多這是系列部落格文章的第一篇,我們將研究如何使用 Spring Cloud Stream 和 Kafka Streams 撰寫串流處理應用程式。
Spring Cloud Stream Horsham 版本 (3.0.0) 針對應用程式如何利用 Apache Kafka,使用 Kafka 和 Kafka Streams 的繫結器,引入了一些變更。此版本帶來的主要增強功能之一是,一流地支援使用完全函數式程式設計範例來撰寫應用程式。這篇部落格文章簡要介紹了如何使用此函數式程式設計模型,使用 Spring Cloud Stream 和 Kafka Streams 開發串流處理應用程式。在本系列的後續部落格文章中,我們將深入探討更多細節。
這通常是一個令人困惑的問題:如果我想編寫基於 Apache Kafka 的應用程式,我應該使用哪個繫結器?Spring Cloud Stream 為 Kafka 提供了兩個單獨的繫結器 - spring-cloud-stream-binder-kafka 和 spring-cloud-stream-binder-kafka-streams。顧名思義,如果您想編寫標準的事件驅動應用程式,並且想使用普通的 Kafka 生產者和消費者,那麼您想使用第一個繫結器。另一方面,如果您想使用 Kafka Streams 程式庫開發串流處理應用程式,請使用第二個繫結器。再次強調,在這篇部落格文章中,我們將重點關注 Kafka Streams 的第二個繫結器。
關於這個部落格系列的一般說明。這主要著眼於 Spring Cloud Stream 和 Kafka Streams 之間的接觸點,並未深入探討 Kafka Streams 本身。為了編寫使用 Kafka Streams 的非平凡串流處理應用程式,強烈建議深入了解 Kafka Streams 程式庫。本系列僅停留在實際 Kafka Streams 程式庫的周邊,主要關注如何從 Spring Cloud Stream 的有利位置與其進行互動。
Spring Cloud Stream 應用程式的核心都是 Spring Boot 應用程式。為了引導一個新專案,請前往 Spring Initializr,然後建立一個新專案。選取 “Cloud Stream” 和 “Spring for Apache Kafka Streams” 作為依賴項。這將產生一個專案,其中包含開始開發應用程式所需的所有元件。以下是從 initializr 截取的螢幕截圖,其中選取了基本依賴項。
這是一個非常基本但功能齊全的 Kafka Streams 應用程式,它使用 Spring Cloud Stream 的函數式程式設計支援編寫
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
如您所見,這是一個非常簡單的應用程式,僅列印到標準輸出,但無論如何,它都是一個功能齊全的 Kafka Streams 應用程式。在外層,我們使用 @SpringBootApplication
註解來表示這是一個 boot 應用程式。然後,我們提供一個 java.util.function.Consumer
bean,我們透過 lambda 運算式封裝應用程式的邏輯。消費者將 KStream 作為其輸入,其中鍵和值都表示為 String 類型。
就是這樣。您可以針對 Kafka Broker 執行此應用程式,並查看其運作情況。在幕後,Spring Cloud Stream 的 Kafka Streams 繫結器會將其轉換為具有 StreamsBuilder
、Kafka Streams 拓撲等的正確 Kafka Streams 應用程式。Spring Cloud Stream 的主要宗旨之一是將複雜性和樣板程式碼從使用者那裡隱藏起來,以便應用程式開發人員可以專注於手頭的業務問題。繫結器將負責建立 Kafka Streams 拓撲、連接到 Kafka 叢集、繫結到主題以及從該 Kafka 主題取用資料,在這種情況下,該主題繫結為 KStream。通常,如果他們不使用 Spring Cloud Stream 等架構,則應用程式開發人員有責任完成所有這些事情。
如果您了解 Kafka Streams 的內部結構,您可能想知道上面介紹的內容是否有效。我們沒有提供 Kafka Streams 需要的一些基本內容(例如叢集資訊、應用程式 ID、要使用的主題、要使用的 Serdes 等)。簡短的答案是,這將在不提供單個配置屬性的情況下工作。這是因為繫結器將使用許多合理的預設值,並對從哪些主題取用等做出判斷。儘管如此,對於生產用途,我們建議提供所有適用的屬性(如果繫結器使用的預設值沒有意義)。
讓我們看一下 Kafka Streams 需要的一些基本內容,以及繫結器如何為它們提供預設值。
預設情況下,繫結器將嘗試連接到在 localhost:9092 上運行的叢集。如果不是這種情況,您可以使用 Spring Cloud Stream 提供的配置屬性來覆蓋它。請參閱 Spring Cloud Stream 參考指南。
在 Kafka Streams 應用程式中,application.id 是一個必填欄位。沒有它,您無法啟動 Kafka Streams 應用程式。預設情況下,繫結器將產生一個應用程式 ID 並將其分配給處理器。它使用函數 bean 名稱作為前綴。例如,如果您有一個如上的消費者,繫結器將產生應用程式 ID 作為 process-applicationId。您可以使用此處概述的策略覆蓋它。請參閱 Spring Cloud Stream 參考指南。
對於上述處理器,您可以提供要取用的主題,如下所示
spring.cloud.stream.bindings.process-in-0.destination: my-input-topic
在這個例子中,我們是說,對於函數 bean (process) 及其第一個輸入 (in-0),它應該綁定到一個名為 my-input-topic 的 Kafka topic。 如果您沒有提供像這樣明確的目的地,binder 會假設您使用的是與綁定名稱相同的 topic (在這個例子中是 process-in-0)。
Kafka Streams 使用一個名為 Serde 的特殊類別來處理資料序列化。 它本質上是一個 inbound 的反序列化器和 outbound 的序列化器的包裝器。 通常,您必須告訴 Kafka Streams 要為每個 consumer 使用哪個 Serde。 然而,Binder 會透過使用 Kafka Streams 中提供的參數類型來推斷此資訊。 例如,對於 KStream<String, String>,binder 假設它需要使用 String 反序列化器。 與往常一樣,您可以透過多種方式覆寫這些設定。
是的,可以。 用於 Kafka Streams 的 Spring Cloud Stream binder 可以輕鬆地在單個應用程式中提供多個表示為 java.util.function.Function 或 java.util.function.Consumer beans 的 processors。 binder 會將每個 processor 隔離到它自己的應用程式 ID 和 StreamsBuilder。 它確保彼此之間不會有任何干擾。 從 Kafka Streams 的角度來看,它們是具有自己專用拓撲的多個 processors。 雖然在測試和快速嘗試某些東西時,這是一個合法的用例,但在單個應用程式中擁有幾個 processors 可能會使其成為更難維護的巨石。
在這篇部落格文章中,我們快速介紹了如何使用 Spring Cloud Stream 的函數式程式設計支援來編寫使用 Kafka Streams 的流處理應用程式。 我們看到 binder 處理了許多基礎架構和配置細節,這讓您可以專注於手邊的業務邏輯。 在下一篇部落格文章中,我們將進一步探索此程式設計模型,以了解如何使用 Spring Cloud Stream 和 Kafka Streams 開發更複雜的流處理應用程式。