使用 Spring Cloud Stream 和 Apache Kafka Streams 進行流處理。 第 6 部分 - 狀態儲存區和互動式查詢

工程 | Soby Chacko | 2019 年 12 月 09 日 | ...

第 1 部分 - 程式設計模型 第 2 部分 - 程式設計模型(續) 第 3 部分 - 資料反序列化和序列化 第 4 部分 - 錯誤處理 第 5 部分 - 應用程式自訂

在本部分(本系列的第六部分也是最後一部分),我們將探討 Spring Cloud Stream Binder for Kafka Streams 如何支援 Kafka Streams 中的狀態儲存區和互動式查詢。

具名狀態儲存區

當您需要在應用程式中維護狀態時,Kafka Streams 可讓您將該狀態資訊具體化到具名狀態儲存區中。 Kafka Streams 中有多個操作需要追蹤狀態,例如 countaggregatereduce、各種 windowing 操作等等。 在大多數情況下,Kafka Streams 使用一個名為 RocksDB 的特殊資料庫來維護此狀態儲存區(除非您明確變更儲存類型)。 預設情況下,出於容錯原因,狀態儲存區中的相同資訊也會備份到變更日誌主題以及 Kafka 中。

當您像這樣將狀態明確地具體化到具名狀態儲存區中時,應用程式就能夠在稍後的階段查詢該狀態儲存區。 這是一個非常強大的功能,因為它讓您能夠從 Kafka Streams 應用程式中查詢類似資料庫的結構。

將資料作為 KTable 或 GlobalKTable 取用

基於 Kafka Streams binder 的應用程式可以綁定到作為 KTableGlobalKTable 的目的地。 GlobalKTable 是一種特殊的表類型,您可以在其中從輸入主題的所有分割區取得資料,而與執行個體無關。 相比之下,KTable 僅提供執行個體所使用的主題各自分割區中的資料。

以下是我們在本系列部落格文章中先前看到的功能簽章

@Bean
public Function<KStream<Long, Order>,
     Function<KTable<Long, Customer>,
           Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {

如您所見,此函數具有三個輸入綁定,一個 KStream、一個 KTable 和另一個 GlobalKTable。 Kafka Streams 可讓您將像這樣取用的表格具體化到具名狀態儲存區中,前提是這些表格是基於主鍵的。 您可以使用繫結層級屬性將它們具體化到具名狀態儲存區中,以及同時進行取用。 以下範例說明如何執行此操作

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.materializedAs: incoming-store-2

Kafka Streams DSL 操作具體化到狀態儲存區中

Kafka Streams 高階 DSL 中有多種方法傳回表格類型,例如 countaggregatereduce。 還有其他操作使用狀態儲存區來追蹤資訊。 例如,KStream 中的各種聯結方法呼叫雖然傳回 KStream 類型,但在內部使用狀態儲存區來保留聯結的資料。 總之,當 Kafka Streams 讓您可以將資料具體化為表格或串流時,它會被具體化到狀態儲存區中,就像儲存在資料庫表格中的資料一樣。

要在低階處理器中使用的明確狀態儲存區

當使用 Kafka Streams 的處理器 API 時,它可以讓您更靈活地處理串流,您必須事先宣告狀態儲存區,並將其提供給 StreamsBuilder。 Kafka Streams binder 可以掃描應用程式以偵測 StoreBuilder 類型的 bean,然後使用它來建立狀態儲存區,並透過 StreamsBuilderFactoryBean 將它們傳遞給底層的 StreamsBuilder。 以下是一些這類的 bean

@Bean
public StoreBuilder myStore() {
  return Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
        Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
  return Stores.windowStoreBuilder(
        Stores.persistentWindowStore("other-store",
              Duration.ofSeconds(3), Duration.ofSeconds(3),  false), Serdes.Long(),
        Serdes.Long());
}

這兩個 StoreBuilder bean 由 binder 偵測到,然後它會自動將它們附加到串流建置器。 之後,您可以在基於處理器 API 的應用程式中存取它們,如下所示

…
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
...
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
  return input ->
        input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
           @Override
            public void init(ProcessorContext context) {
              state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
              state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
           }

           @Override
           public void process(Object key, String value) {
              // processing code
           }

           @Override
           public void close() {
              if (state1 != null) {
                 state1.close();
              }
              if (state2 != null) {
                 state2.close();
              }
           }
        }, "my-store", "other-store");
}

關於在基於 Kafka Streams binder 的應用程式中使用處理器 API 的一個快速注意事項。 當您使用 binder 時,使用低階處理器 API 的唯一方法是使用較高階 DSL 的使用模式,然後將其與 transform 或 process 呼叫結合,如上例所示。 如需有關如何在基於 binder 的應用程式中使用處理器 API 的更多詳細資訊,請參閱此處

如果您願意,您也可以使用我們在先前的部落格中看到的 StreamsBuilderFactoryBean 自訂程式,以程式設計方式新增狀態儲存區,而不是在應用程式中建立 StoreBuilder bean。

使用互動式查詢來查詢狀態儲存區中的資料

Kafka Streams 可讓您在進行即時串流處理時以互動方式查詢狀態儲存區中的資料。 binder 圍繞此功能提供抽象,以使其更易於使用互動式查詢。 InteractiveQueryService 是 binder 提供用來處理狀態儲存區查詢的基本 API。 您通常可以將其作為 bean 注入到您的應用程式中,然後從中調用各種 API 方法。 這是一個範例

@Autowired
private InteractiveQueryService interactiveQueryService;
 …
 ...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
                                                interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

然後,您可以從儲存區調用各種檢索方法,並迭代遍歷結果。 您可以根據您的使用案例和您使用的狀態儲存區類型,從這些狀態儲存區調用各種方法。 有關可用的各種迭代方法,請參閱 Kafka Streams 文件中的互動式查詢

透過 RPC 機制進行互動式查詢

通常,您希望透過 RPC 機制從狀態儲存區公開系統的狀態。 您可以使用 Spring Web 支援以這種方式編寫功能強大的基於 REST 的應用程式。 這是一個藍圖

@RestController
public class Controller {

		@RequestMapping("/song/id")
		public SongBean song(@RequestParam(value="id") Long id) {
			final ReadOnlyKeyValueStore<Long, Song> songStore =
					interactiveQueryService.getQueryableStore(“song-store”, QueryableStoreTypes.<Long, Song>keyValueStore());

			final Song song = songStore.get(id);
			if (song == null) {
				throw new IllegalArgumentException("...");
}

可以從前端 Web 應用程式(例如)存取此 REST 控制器。

這種使用模式顯然會引起一些疑慮。如果有多個 Kafka Streams 應用程式實例正在執行會發生什麼情況?例如,如果有 3 個實例,每個實例都從單個來源分割區提取資料呢?哪個控制器實例將負責提供鍵 X 的資訊?如果鍵 X 只託管在分割區 3 中,而剛好是實例 3,但請求卻落在了實例 1 上呢?這顯然是一個問題,但 Kafka Streams 提供了一個解決方案。

從正確的實例檢索鍵

當您有多個實例正在執行,並且想要使用互動式查詢時,您必須在 binder 層級設定此屬性

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

然後,在控制器方法中,您必須編寫類似於以下的邏輯

@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
{

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
                                                key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
RestTemplate restTemplate = new RestTemplate();
	return restTemplate.postForObject(
						String.format("http://%s:%d/%s", hostInfo.host(),
								hostInfo.port(), "charts/top-five?genre=Punk"), …);

}

總結

在這篇部落格中,我們看到了 Kafka Streams 讓您將狀態資訊具體化到狀態儲存中的各種方式。binder 讓您可以將資料作為 KTableGlobalKTable 使用,同時允許您將其具體化到具名的狀態儲存中。Kafka Streams 有幾個操作可以將狀態儲存具體化為具名的儲存。我們看到,當在 Kafka Streams 中使用處理器 API 時,應用程式需要建立狀態儲存建構器 bean,binder 會偵測到這些 bean,然後將它們傳遞給 Kafka Streams。最後,我們看到了如何使用互動式查詢來查詢這些狀態儲存。我們還看到了涉及應用程式的多個實例以及針對它們的互動式查詢的細微差別。

本系列的結論以及接下來的方向...

感謝您閱讀本部落格系列!

在這個由六部分組成的系列中,我們看到了 Spring Cloud Stream 中 Kafka Streams binder 的許多功能,例如它的程式設計模型資料序列化錯誤處理自訂以及互動式查詢狀態儲存。作為本系列的一部分,我們沒有涵蓋更多功能,因為我們想專注於介紹在 3.0.0 版本中新增或增強的這個 binder 的主要功能的總體主題。對於那些額外功能或與 Spring Cloud Stream 背後的工程團隊互動,請查看下面資源部分中提供的各種連結。

資源

Kafka Streams Binder 文件

Spring Cloud Stream

核心 Spring Cloud Stream GitHub Spring Cloud Stream Kafka Binder GitHub Spring Cloud Stream 範例

Stack Overflow Gitter

取得 Spring 電子報

隨時關注 Spring 電子報

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

取得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將到來的活動

查看 Spring 社群中所有即將到來的活動。

查看所有