領先一步
VMware 提供培訓和認證,以加速您的進度。
瞭解更多第 1 部分 - 程式設計模型 第 2 部分 - 程式設計模型 (續) 第 3 部分 - 資料反序列化和序列化 第 4 部分 - 錯誤處理
在這篇部落格文章中,我們將繼續討論 Spring Cloud Stream 中對 Kafka Streams 的支援。我們將詳細說明如何自訂 Kafka Streams 應用程式。
Kafka Streams binder 使用 StreamsBuilderFactoryBean,此元件由 Spring for Apache Kafka 專案提供,用以建置 StreamsBuilder 物件,它是 Kafka Streams 應用程式的基礎。此 factory bean 是一個 Spring 生命周期 bean。通常,此 factory bean 必須在啟動前進行自訂,原因有很多。如 先前關於錯誤處理的部落格文章中所述,如果您想要註冊生產異常處理常式,則需要自訂 StreamsBuilderFactoryBean
。假設您有這個生產者異常處理常式
class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
if (exception instanceof RecordTooLargeException) {
return ProductionExceptionHandlerResponse.CONTINUE;
} else {
return ProductionExceptionHandlerResponse.FAIL;
}
}
}
如果您選擇 (使用 default.production.exception.handler
),可以直接使用組態進行註冊。
但是,當使用 binder 時,更優雅的方法是將其註冊為 StreamsBuilderFactoryBean
自訂器的一部分,如下所示
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
IgnoreRecordTooLargeHandler.class);
};
}
請注意,如果應用程式中有多个處理器,您可以根據應用程式 ID 控制哪個處理器獲得自訂。例如,您可以這樣檢查它
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
這是設定狀態監聽器的另一個範例
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
KafkaStreams 物件是任何 Kafka Streams 應用程式的核心。StreamsBuilderFactoryBean
負責建立拓撲,然後建立 KafkaStreams
物件。在啟動 KafkaStreams
物件之前,StreamsBuilderFactoryBean 提供了自訂此 KafkaStreams
物件的機會。例如,如果您想為未捕獲的異常設定一個應用程式範圍的處理常式,您可以執行以下操作
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
請注意,我們從 StreamsBuilderFactoryBean 的自訂器開始。但是,在其中,我們使用一個單獨的 KafkaStreamsCustomizer
。
在這篇部落格文章中,我們看到了 Spring Cloud Stream 中的 Kafka Streams binder 如何讓您自訂底層的 StreamsBuilderFactoryBean
和 KafkaStreams
物件。
感謝您閱讀到這裡!接下來,在本系列的最後一篇部落格文章中,我們將研究 binder 如何讓您處理狀態儲存並啟用對它們的互動式查詢。