使用 Spring Cloud Stream 和 Apache Kafka Streams 進行串流處理。第 4 部分 - 錯誤處理

工程 | Soby Chacko | 2019年12月05日 | ...

第 1 部分 - 程式設計模型 第 2 部分 - 程式設計模型(續) 第 3 部分 - 資料反序列化和序列化

繼續探討 Spring Cloud Stream binder for Kafka Streams 系列,在本篇部落格文章中,我們將探討 Kafka Streams binder 中可用的各種錯誤處理策略。

Kafka Streams 中的錯誤處理主要集中在 inbound(輸入)反序列化和 outbound(輸出)生產期間發生的錯誤。

處理反序列化例外

Kafka Streams 允許您註冊反序列化例外處理常式。預設行為是,當發生反序列化例外時,它會記錄該錯誤並使應用程式失敗 (LogAndFailExceptionHandler)。 它也允許您記錄並跳過記錄,並繼續執行應用程式 (LogAndContinueExceptionHandler)。 通常,您會將相應的類別作為組態的一部分提供。 透過使用 binder,您可以設定這些例外處理常式,可以在 binder 層級設定,這將適用於整個應用程式,也可以在綁定層級設定,這可讓您更精細地控制。

以下是如何在 binder 層級設定反序列化例外處理常式

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

如果您只有一個具有單一輸入的處理器,那麼在 binder 上設定反序列化例外處理常式是一種簡單的方法,如上所示。如果您有多個處理器或輸入,並且想要分別控制它們的錯誤處理,則需要在每個輸入綁定上進行設定。以下是一個範例:

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler=logAndContinue

請注意,處理常式實際上是在輸入綁定 process-in-0 上設定的。如果您有更多此類輸入綁定,則必須明確設定。

Kafka Streams 和 DLQ (Dead Letter Queue)

除了 Kafka Streams 提供的兩個例外處理常式之外,binder 還提供了第三個選項:一個自訂處理常式,可讓您將反序列化錯誤中的記錄傳送到特殊的 DLQ。為了啟用此功能,您必須選擇在 binder 或綁定層級啟用此功能,如上所述。

以下是如何操作:

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq

請記住,在 binder 上使用此設定時,這會在全域層級啟動 DLQ,並且這將應用於所有輸入主題及其綁定。如果這不是您想要發生的情況,您必須在每個輸入綁定上啟用它。

預設情況下,DLQ 名稱命名為 error.<input-topic-name>.<application-id for kafka streams>

您可以用實際的主題名稱替換 <input-topic-name>。請注意,這不是綁定名稱,而是實際的主題名稱。

如果輸入主題是 topic-1 且 Kafka Streams 應用程式 ID 是 my-application,則預設 DLQ 名稱將為 error.topic-1.my-application

變更 binder 產生的預設 DLQ 名稱

您可以重置預設 DLQ 名稱,如下所示

spring.cloud.stream.bindings.process-in-0.consumer.dlqName=input-1-dlq(將 process-in-0 替換為實際的綁定名稱)

如果它在 broker 上具有所需的權限,則 binder provisioner 將建立所有必要的 DLQ 主題。如果不是這種情況,則必須在應用程式啟動之前手動建立這些主題。

DLQ 主題和分割區

預設情況下,binder 假設 DLQ 主題的佈建與輸入主題的分割區數量相同。如果不是這樣(即,如果 DLQ 主題的佈建具有不同數量的分割區),則必須使用 DlqPartitionFunction 實作來告知 binder 將記錄傳送到哪個分割區,如下所示:

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}

應用程式中只能存在一個這樣的 bean。因此,如果有多個具有獨立 DLQ 主題的處理器或輸入,則必須使用群組(在使用 binder 時與應用程式 ID 相同)來過濾記錄。

處理生產者錯誤

到目前為止,我們討論的所有例外處理常式都僅處理與資料反序列化相關的錯誤。 Kafka Streams 還提供處理 outbound(輸出) 生產者錯誤的功能。 截至 3.0 版本,binder 並未提供一流的機制來支援此功能。 然而,這並不意味著您不能使用生產者例外處理常式。 您可以使用 binder 依賴的來自 Spring for Apache Kafka project 的各種自訂工具來執行此操作。 這些自訂工具將成為本系列下一篇部落格文章的主題。

Kafka Streams Binder 健康指標和指標

Kafka Streams binder 允許監控底層串流執行緒的健康狀況,並透過 Spring Boot actuator 端點公開健康指標度量。 您可以在此處找到更多詳細資訊。 除了健康指標之外,binder 還透過 Micrometer meter-registry 公開 Kafka Streams 度量。 透過 KafkaStreams 物件可用的所有基本度量都可以在此登錄中找到。 此處是您可以找到更多相關資訊的地方。

摘要

在本篇部落格文章中,我們看到了 Kafka Streams 使用的各種策略來啟用處理反序列化例外。 除了這些策略之外,Kafka Streams binder 還提供了一個處理常式,可讓您將容易出錯的 payload 傳送到 DLQ 主題。 我們看到 binder 提供了對使用這些 DLQ 主題的精細控制。

感謝您閱讀到這裡! 在下一篇部落格文章中,我們將了解 binder 如何實現進一步的自訂。

取得 Spring 電子報

隨時掌握 Spring 最新資訊

訂閱

領先一步

VMware 提供培訓和認證,加速您的進展。

了解更多

取得支援

Tanzu Spring 在單一訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部