解密 Spring Cloud Stream 搭配 Apache Kafka 分割區的生產者

工程 | Soby Chacko | 2021 年 2 月 3 日 | ...

在本篇部落格中,我們將深入探討如何使用 Apache Kafka 編寫 Spring Cloud Stream 生產者,以及它如何處理 Kafka 中的原生分割區。

Spring Cloud Stream 具有與中介軟體無關的分割區概念。如果中介軟體具有類似 Apache Kafka 的功能,Spring Cloud Stream 會盡可能利用中介軟體的原生分割區功能。這篇部落格著重於 Spring Cloud Stream 開發人員在編寫將資料發布到 Kafka 的生產者應用程式時,如何處理分割區。在後續的文章中,我們將探討消費者如何在基於 Kafka 的 Spring Cloud Stream 應用程式中處理分割區。

分割區是 Apache Kafka 中擴展和並行化的基本單位。使用正確的分割區策略可讓您的應用程式以最小的延遲處理 TB 級別的資料。Kafka 生產者可以並行寫入不同的分割區,這通常意味著它可以實現更高的吞吐量。雖然分割區具有這些明顯的優點,但還需要仔細考慮其他各種因素。在分割區本身內,吞吐量可能會受到批量大小、使用的壓縮演算法、確認類型、複製因子等因素的進一步限制。此外,擁有更多分割區意味著更多的開啟檔案控制代碼(因為分割區對應到 Broker 上的目錄,並且分割區中的每個日誌區段都需要一個索引檔案和一個資料檔案)。網路上有很多關於如何為 Kafka 應用程式設計正確分割區數量的資源,在部署基於 Kafka 的企業級生產者應用程式之前,您可能需要熟悉這些資源。

Spring Cloud Stream Kafka 繫結器的配置器

Spring Cloud Stream Kafka 繫結器具有一個主題配置器,用於處理各種應用程式層級的主題需求。其中,建立和修改分割區的數量是配置器能夠做到的事情。配置器本身不做這些操作,而是從 Kafka 叢集呼叫正確的管理 API。

通常,在編寫 Spring Cloud Stream Kafka 應用程式時,會出現兩種與主題建立相關的情況。大多數企業鎖定對 Kafka 叢集的存取,只有管理員才能進行諸如建立主題、新增分割區等操作變更。在這種情況下,應用程式無法直接建立或修改主題。另一種情況是,企業在授予對 Kafka 叢集的存取權限方面非常寬鬆,應用程式可以自由建立或修改主題。讓我們進一步考慮其中一些事項。

情境 1:應用程式對 Kafka 叢集具有完整的管理員權限

在這種情況下,應用程式對 Kafka 叢集具有完整的管理員存取權限。您正在編寫一個 Spring Cloud Stream 生產者,將訊息發布到 Kafka 主題。為了方便我們的討論,假設此主題不存在,您的應用程式將建立它。您還想確保主題已配置一定數量的分割區。

有幾種方法可以告訴 Spring Cloud Stream 您希望為主題配置多少個分割區。每種方法都有優缺點。讓我們看看它們。

  • 使用繫結器範圍的屬性來指定分割區計數。使用此方法,您建立的任何主題都將具有相同的分割區計數。如果您的應用程式正在建立多個主題,並且它們都想要相同數量的分割區,那麼這是建立分割區的理想方式。這種方法的缺點是,除非覆寫,否則此設定無法針對每個繫結進行配置。您在繫結器層級使用的屬性如下所示。

spring.cloud.stream.kafka.binder.min-partition-count

  • 另一種選擇是在繫結層級指定分割區計數。使用這種方法,您可以在同一個應用程式中配置具有不同分割區計數的多個主題。以下是屬性

spring.cloud.stream.bindings.<binding-name>.producer.partition-count

由於之前的全域屬性強制執行最小值(可能會更大),因此兩者中較大的值將對特定繫結生效。

  • 如果未使用上述任何選項,則將根據 Broker 的 num.partitions 屬性(預設值:1)建立具有一定數量分割區的主題。

情境 2:Kafka 叢集已鎖定,且不允許應用程式執行任何管理操作。

在這種情況下,您作為應用程式開發人員的選擇非常有限。由於 Kafka 叢集已鎖定,應用程式將無法建立或變更現有主題。如果未事先建立主題,您的應用程式將在啟動期間擲回例外狀況並失敗。為了避免這種情況,您必須確保使用正確數量的分割區建立主題,並使用繫結器屬性停用自動主題配置(將 spring.cloud.stream.kafka.binder.auto-create-topics 設定為 false)。

情境 3:應用程式對 Kafka 叢集具有完整的管理員權限,且主題已存在,但您希望在下次啟動應用程式時增加分割區。

這是可能的。假設您的主題已配置 64 個分割區,現在由於更高的容量需求,您想將其加倍到 128 個。您可以使用情境 1 中討論的任何一個屬性來通知繫結器。(spring.cloud.stream.kafka.binder.min-partition-countspring.cloud.stream.bindings.<binding-name>.producer.partition-count)

在這種情況下,繫結器會偵測到主題已存在。如果主題的目前分割區大小小於請求的大小,則繫結器會檢查屬性 spring.cloud.stream.kafka.binder.autoAddPartitions。預設情況下,此屬性設定為 false。因此,如果應用程式需要增加分割區,則必須明確將其設定為 true。如果設定為 true,配置器將請求 Kafka 管理 API 以增加分割區的數量。如果未設定為 true 且新請求的分割區數量高於現有的分割區數量,則在生產者的情況下,繫結器會抱怨它無法容忍 Broker 上較低的分割區數量,並擲回配置例外狀況。如果發生這種情況,您必須手動增加分割區,或將 autoAddPartitions 屬性設定為 true

這裡尤其要注意的一件事是,繫結器不允許您透過 Spring Cloud Stream 減少 Kafka 主題分割區的數量。

請記住,增加或減少分割區(使用任何機制)可能會破壞分割區內的嚴格排序(如果這是一個考量因素),具體取決於您的分割區策略(請參閱下文)。

選擇分割區

現在我們了解了主題是如何分割的,我們需要討論如何為特定記錄選擇分割區。

有三種機制來選擇分割區

原生 Kafka 分割區選擇

若要使用原生分割區,請設定自訂的 Partitioner,可以使用綁定器層級的 spring.cloud.stream.kafka.binder.producer-properties.partitioner.class 屬性,或是綁定層級的 spring.cloud.stream.kafka.bindings.<binding>.producer.configuration.partitioner.class 屬性。

直接設定分割區標頭

當使用預設的 Kafka Partitioner 時,應用程式可以直接將 KafkaHeaders.PARTITION_ID 標頭設定為所需的分割區。

Spring Cloud Stream 分割區選擇

當使用 Spring Cloud Stream 分割區時,讓 Kafka 分割器使用其預設分割器,它只會使用綁定器在 producer 記錄中設定的分割區。在以下章節中,我們將會看到 Spring Cloud Stream 提供的此支援的詳細資訊。

Spring Cloud Stream 生產者如何決定要分配哪個分割區?

生產者如何使用 Spring Cloud Stream 將記錄分配到正確的分割區? Spring Cloud Stream 中有哪些可用於這樣做的控制項? 此部落格的其餘部分將側重於這些問題。

決定分割區金鑰

Spring Cloud Stream 提供了兩種機制,讓應用程式決定分割區金鑰。

1. 分割區金鑰表達式

一個簡單的方法是將分割區金鑰作為 SpEL 表達式屬性提供。以下是一個範例。

spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression: headers['partitionKey']

然後您的應用程式在發布訊息時,可以新增一個名為 partitonKey 的標頭。 Spring Cloud Stream 將在評估上述表達式以分配分割區金鑰時,使用此標頭的值。 以下是一個生產者程式碼範例

@Bean
public Supplier<Message<?>> generate() {
  return () -> {
     String value = “random payload”;
    	return MessageBuilder.withPayload(value)
           .setHeader("partitionKey", value.length() % 4)
           .build();
  };
}
2. 分割區金鑰提取器策略

Spring Cloud Stream 提供一個名為 PartitionKeyExtractorStrategy 的 API,它有一個單一方法需要實作 - Object extractKey(Message<?> message)

您可以實作此介面並將其設定為 bean。 然後提供一個屬性 spring.cloud.stream.bindings.<binding-name>.producer.parition-key-extractor-name

然後提供 bean 名稱。

如果您只有一個這樣的 bean,則可以忽略將其作為屬性提供。 Spring Cloud Stream 將簡單地選擇此 bean 作為分割區提取器策略。

使用分割區金鑰提取器策略設定金鑰是預設機制。 如果未提供提取器策略,Spring Cloud Stream 只會尋找分割區金鑰表達式。

請記住,我們在此討論的分割區金鑰可能與記錄最終將落在其上的分割區不同。 為此,我們需要使用使用此金鑰的分割區選擇器。

選擇實際的分割區

我們選擇了一個分割區金鑰,現在它如何選擇 Kafka topic 上的實際分割區?

好的,現在我們讓 Spring Cloud Stream 決定要使用的分割區金鑰。 但是,如何根據此金鑰實際選擇分割區? 與分割區金鑰選擇選項類似,Spring Cloud Stream 提供了兩種不同的機制,用於使用給定的金鑰選擇分割區。

1. 使用分割區選擇器策略

同樣地,這是一個具有單一方法的功能介面 - int selectPartition(Object key, int partitionCount)

您可以實作此方法並將其作為 bean 提供。 如果您只有一個這樣的 bean,則不需要任何其他屬性。 如果有多個,則可以使用屬性 spring.cloud.stream.bindings.<binding-name>.producer.parition-selector-name 針對每個綁定定義它

2. 分割區選擇器表達式

如果您不想實作分割區選擇器策略,您也可以提供針對金鑰進行評估的 SpEL 表達式。

如果未提供這些選項中的任何一個,則 Spring Cloud Stream 將使用預設的分割區選擇器策略,該策略基於取得金鑰的 hashCode,然後對 topic 上的總分割區計數執行模數運算。 除非您有複雜的需求,否則此預設策略在大多數情況下都有效。

為什麼綁定器提供兩種不同的抽象?

您可能想知道為什麼我們有這兩種不同的抽象。 首先是分割區金鑰,然後是分割區選擇器。 分割區金鑰可以是任何東西 - 例如,它可以是整數、字串(可能是具有任意長度的文字)或其他類型。 分割區選擇器將根據分割區金鑰表達式選擇一個金鑰。 選擇器還確保選擇的分割區在可用分割區數量內。 預設實作方式是將分割區金鑰的雜湊碼與分割區總數進行模數除法。 因此,當您有像這樣的分割用例時,您必須在 producer 上指定 partittionCount 屬性。 總之,PartitionKey 是 PartitionSelector 用於選擇實際分割區的一段資料。

讓我們舉一個具體的例子。 假設您正在編寫一個處理信用卡交易的應用程式。 此應用程式使用信用卡號碼作為分割區金鑰 - 一個具有 x 位數的長隨機數。 想像一下,根據信用卡的前 4 位數字,您想將該交易傳送到 topic 中的特定分割區。 你該怎麼做? 首先,您可以解析卡號以提取前 4 位數字來設定 partitionKeyExpression(或提供分割區金鑰提取器策略)。 然後,您需要提供一個分割區選擇器策略實作,其中根據金鑰和分割區數量,選擇金鑰。 如果您不提供此策略或針對金鑰的分割區金鑰選擇器表達式,則預設的分割區選擇器策略將為您選擇一個。 假設您的前 4 位數字是 1234,並且您在 topic 上有 10 個分割區。 假設雜湊計算為 1234 也是。 然後,這將落在分割區 1234 % 10 = 4 中。 如果您希望此交易因任何原因來到分割區 8,那麼您必須在分割區選擇器策略類別或表達式中明確實作它。

以下是這兩個不同層如何組合在一起的流程圖表示。

kafka producer partitions blog

分割區鍵和訊息鍵之間的混淆

有時候,要理解分割區鍵和實際的訊息鍵,在透過實際的 Kafka 主題在網路上傳輸,以便在 Kafka 紀錄上作為鍵使用,可能會感到困惑。這是透過不同的機制完成的。上述的分割區鍵和選擇器僅確保選擇了一個分割區鍵,並根據該分割區鍵,在 Kafka 主題上選擇一個實際的分割區。但是,當生產時,您如何將鍵與紀錄一起傳送?在這裡,您可以從兩個選項中選擇。一種方法是簡單地將標頭附加到傳出的訊息中。以下是一個範例。

@Bean Supplier<Message<String>> process() {
   return () -> MessageBuilder.withPayload("foo")
     .setHeader(KafkaHeaders.MESSAGE_KEY, "bar".getBytes()) .build(); }

您也可以在 Kafka binder 上使用訊息鍵 SpEL 表達式,如下所示。

spring.cloud.stream.kafka.binder.messageKeyExpression: headers['messageKey']

然後將此標頭附加到傳出的訊息。

需要記住的一些注意事項

如果您沒有提供分割區鍵表達式或分割區鍵提取器 Bean,那麼 Spring Cloud Stream 將完全不參與為您做出任何分割區決策。在這種情況下,如果主題有多個分割區,則會觸發 Kafka 的預設分割機制。預設情況下,Kafka 使用 DefaultPartitioner,如果訊息具有鍵(請參閱上文),則使用此鍵的雜湊來計算分割區。如果訊息沒有鍵,則將使用循環配置策略進行分配。從 Kafka client 2.4 開始,還有一些需要注意的額外複雜性。如果紀錄未攜帶分割區資訊(本部落格的主要討論),或者紀錄缺少鍵,那麼從 Kafka 2.4 開始,它將使用 sticky partitions 而不是循環配置策略。簡而言之,sticky partitions 用於透過將紀錄粘附到一個分割區或一組分割區來最小化延遲。有關 sticky partitions 的更多資訊,請參閱 KIP-480 https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

結論

在本部落格中,我們討論了 Spring Cloud Stream 如何在編寫基於生產者的應用程式時,幫助處理 Kafka 分割區。我們看到了 Spring Cloud Stream 向應用程式開發人員提供控制權以配置分割區的各種細微差別的多種方式。我們看到了分割區鍵、分割區選擇器和訊息鍵之間的差異。我們討論了如何將訊息鍵新增到 Kafka 紀錄。最後,我們了解了 Spring Cloud Stream 生產者如何完全不參與分割業務,而讓 Kafka 直接處理它。

取得 Spring 電子報

隨時掌握 Spring 電子報

訂閱

取得領先

VMware 提供培訓和認證,以加速您的進度。

瞭解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部