使用 Spring Cloud Stream 和 Apache Kafka Streams 進行串流處理。第一部分 - 程式設計模型

工程 | Soby Chacko | 2019 年 12 月 02 日 | ...

這是系列部落格文章的第一篇,我們將研究如何使用 Spring Cloud Stream 和 Kafka Streams 撰寫串流處理應用程式。

Spring Cloud Stream Horsham 版本 (3.0.0) 針對應用程式如何利用 Apache Kafka,使用 Kafka 和 Kafka Streams 的繫結器,引入了一些變更。此版本帶來的主要增強功能之一是,一流地支援使用完全函數式程式設計範例來撰寫應用程式。這篇部落格文章簡要介紹了如何使用此函數式程式設計模型,使用 Spring Cloud Stream 和 Kafka Streams 開發串流處理應用程式。在本系列的後續部落格文章中,我們將深入探討更多細節。

Spring Cloud Stream 下有多少種類型的 Kafka 繫結器?

這通常是一個令人困惑的問題:如果我想編寫基於 Apache Kafka 的應用程式,我應該使用哪個繫結器?Spring Cloud Stream 為 Kafka 提供了兩個單獨的繫結器 - spring-cloud-stream-binder-kafkaspring-cloud-stream-binder-kafka-streams。顧名思義,如果您想編寫標準的事件驅動應用程式,並且想使用普通的 Kafka 生產者和消費者,那麼您想使用第一個繫結器。另一方面,如果您想使用 Kafka Streams 程式庫開發串流處理應用程式,請使用第二個繫結器。再次強調,在這篇部落格文章中,我們將重點關注 Kafka Streams 的第二個繫結器。

關於這個部落格系列的一般說明。這主要著眼於 Spring Cloud Stream 和 Kafka Streams 之間的接觸點,並未深入探討 Kafka Streams 本身。為了編寫使用 Kafka Streams 的非平凡串流處理應用程式,強烈建議深入了解 Kafka Streams 程式庫。本系列僅停留在實際 Kafka Streams 程式庫的周邊,主要關注如何從 Spring Cloud Stream 的有利位置與其進行互動。

引導 Spring Cloud Stream Kafka Streams 應用程式

Spring Cloud Stream 應用程式的核心都是 Spring Boot 應用程式。為了引導一個新專案,請前往 Spring Initializr,然後建立一個新專案。選取 “Cloud Stream”“Spring for Apache Kafka Streams” 作為依賴項。這將產生一個專案,其中包含開始開發應用程式所需的所有元件。以下是從 initializr 截取的螢幕截圖,其中選取了基本依賴項。

spring-initializr-kafka-streams

請給我一個簡單的範例,說明如何使用 Spring Cloud Stream 撰寫快速 Kafka Streams 應用程式

這是一個非常基本但功能齊全的 Kafka Streams 應用程式,它使用 Spring Cloud Stream 的函數式程式設計支援編寫

@SpringBootApplication
public class SimpleConsumerApplication {

   @Bean
   public java.util.function.Consumer<KStream<String, String>> process() {

       return input ->
               input.foreach((key, value) -> {
                   System.out.println("Key: " + key + " Value: " + value);
               });
   }
}

如您所見,這是一個非常簡單的應用程式,僅列印到標準輸出,但無論如何,它都是一個功能齊全的 Kafka Streams 應用程式。在外層,我們使用 @SpringBootApplication 註解來表示這是一個 boot 應用程式。然後,我們提供一個 java.util.function.Consumer bean,我們透過 lambda 運算式封裝應用程式的邏輯。消費者將 KStream 作為其輸入,其中鍵和值都表示為 String 類型。

就是這樣。您可以針對 Kafka Broker 執行此應用程式,並查看其運作情況。在幕後,Spring Cloud Stream 的 Kafka Streams 繫結器會將其轉換為具有 StreamsBuilder、Kafka Streams 拓撲等的正確 Kafka Streams 應用程式。Spring Cloud Stream 的主要宗旨之一是將複雜性和樣板程式碼從使用者那裡隱藏起來,以便應用程式開發人員可以專注於手頭的業務問題。繫結器將負責建立 Kafka Streams 拓撲、連接到 Kafka 叢集、繫結到主題以及從該 Kafka 主題取用資料,在這種情況下,該主題繫結為 KStream。通常,如果他們不使用 Spring Cloud Stream 等架構,則應用程式開發人員有責任完成所有這些事情。

等一下,您確定這會有效嗎?

如果您了解 Kafka Streams 的內部結構,您可能想知道上面介紹的內容是否有效。我們沒有提供 Kafka Streams 需要的一些基本內容(例如叢集資訊、應用程式 ID、要使用的主題、要使用的 Serdes 等)。簡短的答案是,這將在不提供單個配置屬性的情況下工作。這是因為繫結器將使用許多合理的預設值,並對從哪些主題取用等做出判斷。儘管如此,對於生產用途,我們建議提供所有適用的屬性(如果繫結器使用的預設值沒有意義)。

讓我們看一下 Kafka Streams 需要的一些基本內容,以及繫結器如何為它們提供預設值。

叢集資訊

預設情況下,繫結器將嘗試連接到在 localhost:9092 上運行的叢集。如果不是這種情況,您可以使用 Spring Cloud Stream 提供的配置屬性來覆蓋它。請參閱 Spring Cloud Stream 參考指南

應用程式 ID

在 Kafka Streams 應用程式中,application.id 是一個必填欄位。沒有它,您無法啟動 Kafka Streams 應用程式。預設情況下,繫結器將產生一個應用程式 ID 並將其分配給處理器。它使用函數 bean 名稱作為前綴。例如,如果您有一個如上的消費者,繫結器將產生應用程式 ID 作為 process-applicationId。您可以使用此處概述的策略覆蓋它。請參閱 Spring Cloud Stream 參考指南

要取用的主題

對於上述處理器,您可以提供要取用的主題,如下所示

spring.cloud.stream.bindings.process-in-0.destination: my-input-topic

在這個例子中,我們是說,對於函數 bean (process) 及其第一個輸入 (in-0),它應該綁定到一個名為 my-input-topic 的 Kafka topic。 如果您沒有提供像這樣明確的目的地,binder 會假設您使用的是與綁定名稱相同的 topic (在這個例子中是 process-in-0)。

序列化與反序列化 (Serdes)

Kafka Streams 使用一個名為 Serde 的特殊類別來處理資料序列化。 它本質上是一個 inbound 的反序列化器和 outbound 的序列化器的包裝器。 通常,您必須告訴 Kafka Streams 要為每個 consumer 使用哪個 Serde。 然而,Binder 會透過使用 Kafka Streams 中提供的參數類型來推斷此資訊。 例如,對於 KStream<String, String>,binder 假設它需要使用 String 反序列化器。 與往常一樣,您可以透過多種方式覆寫這些設定。在這個系列中,我們即將推出一篇專門討論這個主題的部落格文章。

我可以在單個 Boot 應用程式中有多個 processors 嗎?

是的,可以。 用於 Kafka Streams 的 Spring Cloud Stream binder 可以輕鬆地在單個應用程式中提供多個表示為 java.util.function.Functionjava.util.function.Consumer beans 的 processors。 binder 會將每個 processor 隔離到它自己的應用程式 ID 和 StreamsBuilder。 它確保彼此之間不會有任何干擾。 從 Kafka Streams 的角度來看,它們是具有自己專用拓撲的多個 processors。 雖然在測試和快速嘗試某些東西時,這是一個合法的用例,但在單個應用程式中擁有幾個 processors 可能會使其成為更難維護的巨石。

總結

在這篇部落格文章中,我們快速介紹了如何使用 Spring Cloud Stream 的函數式程式設計支援來編寫使用 Kafka Streams 的流處理應用程式。 我們看到 binder 處理了許多基礎架構和配置細節,這讓您可以專注於手邊的業務邏輯。 在下一篇部落格文章中,我們將進一步探索此程式設計模型,以了解如何使用 Spring Cloud Stream 和 Kafka Streams 開發更複雜的流處理應用程式。

獲取 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進展。

瞭解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部