使用 Spring Cloud Stream 和 Apache Kafka Streams 進行串流處理。第 5 部分 - 應用程式自訂

工程 | Soby Chacko | 2019 年 12 月 06 日 | ...

第 1 部分 - 程式設計模型 第 2 部分 - 程式設計模型 (續) 第 3 部分 - 資料反序列化和序列化 第 4 部分 - 錯誤處理

在這篇部落格文章中,我們將繼續討論 Spring Cloud Stream 中對 Kafka Streams 的支援。我們將詳細說明如何自訂 Kafka Streams 應用程式。

自訂 StreamsBuilderFactoryBean

Kafka Streams binder 使用 StreamsBuilderFactoryBean,此元件由 Spring for Apache Kafka 專案提供,用以建置 StreamsBuilder 物件,它是 Kafka Streams 應用程式的基礎。此 factory bean 是一個 Spring 生命周期 bean。通常,此 factory bean 必須在啟動前進行自訂,原因有很多。如 先前關於錯誤處理的部落格文章中所述,如果您想要註冊生產異常處理常式,則需要自訂 StreamsBuilderFactoryBean。假設您有這個生產者異常處理常式

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

如果您選擇 (使用 default.production.exception.handler),可以直接使用組態進行註冊。

但是,當使用 binder 時,更優雅的方法是將其註冊為 StreamsBuilderFactoryBean 自訂器的一部分,如下所示

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            IgnoreRecordTooLargeHandler.class);
    };
}

請注意,如果應用程式中有多个處理器,您可以根據應用程式 ID 控制哪個處理器獲得自訂。例如,您可以這樣檢查它

return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {

這是設定狀態監聽器的另一個範例

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

自訂 KafkaStreams 物件。

KafkaStreams 物件是任何 Kafka Streams 應用程式的核心。StreamsBuilderFactoryBean 負責建立拓撲,然後建立 KafkaStreams 物件。在啟動 KafkaStreams 物件之前,StreamsBuilderFactoryBean 提供了自訂此 KafkaStreams 物件的機會。例如,如果您想為未捕獲的異常設定一個應用程式範圍的處理常式,您可以執行以下操作

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

請注意,我們從 StreamsBuilderFactoryBean 的自訂器開始。但是,在其中,我們使用一個單獨的 KafkaStreamsCustomizer

總結

在這篇部落格文章中,我們看到了 Spring Cloud Stream 中的 Kafka Streams binder 如何讓您自訂底層的 StreamsBuilderFactoryBeanKafkaStreams 物件。

感謝您閱讀到這裡!接下來,在本系列的最後一篇部落格文章中,我們將研究 binder 如何讓您處理狀態儲存並啟用對它們的互動式查詢。

獲取 Spring 新聞

隨時關注 Spring 新聞

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔。

瞭解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看所有