領先一步
VMware 提供培訓與認證,加速您的進展。
瞭解更多繼先前的部落格介紹了使用 Spring Cloud Stream 和 Kafka Streams 撰寫串流應用程式的基本函數式程式設計模型之後,在本部分中,我們將進一步探索該程式設計模型。
讓我們看看幾個情境。
如果您的應用程式從單一輸入綁定消耗資料並將資料產生到輸出綁定中,您可以使用 Java 的 Function 介面來執行此操作。請記住,此處的綁定不一定對應到單一輸入 Kafka 主題,因為主題可以多路複用並附加到單一輸入綁定(在單一綁定上配置以逗號分隔的多個主題 - 請參閱下面的範例)。在輸出端,綁定在此處對應到單一主題。
這是一個處理器範例
請注意,實際的業務邏輯實現在此處理器中以 Lambda 運算式給出。
@Bean
public Function<KStream<Object, String>, KStream<String, WordCount>> wordcount() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("wordcount-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
看看處理器的返回簽名。它是一個Function<KStream<Object, String>, KStream<String, WordCount>>
。處理器消耗一個 KStream
並產生另一個 KStream
。在底層,綁定器使用傳入的 Kafka 主題來消耗資料,然後將其提供給這個輸入 KStream
。同樣地,在輸出端,綁定器產生資料作為 KStream
,這將被發送到一個外出的 Kafka 主題。
以下是如何為此處理器提供輸入主題
spring.cloud.stream.bindings.wordcount-in-0.destination=words
在多路複用主題的情況下,您可以使用這個
spring.cloud.stream.bindings.wordcount-in-0.destination=words1,words2,word3
輸出主題可以配置如下
spring.cloud.stream.bindings.wordcount-out-0.destination=counts
Kafka Streams 讓您可以使用一個稱為分支的功能,將資料傳送到多個輸出主題。基本上,它使用一個謂詞作為分支到多個主題的依據。這與上面的範例大致相同,但主要區別在於輸出端以 KStream[] 的形式提供。
這是一個處理器範例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> wordcount() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
}
請注意函數的第二個參數類型。它以 KStream[] 的形式提供。
您可以為這些綁定提供個別的輸出主題
spring.cloud.stream.bindings.wordcount-out-0.destination=output1
spring.cloud.stream.bindings.wordcount-out-1.destination=output2
spring.cloud.stream.bindings.wordcount-out-2.destination=output3
當您有兩個輸入綁定和一個輸出綁定時,您可以將您的處理器表示為類型為 java.util.function.BiFunction
的 bean。這是一個範例
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
BiFunction
有兩個輸入和一個輸出。第一個輸入是 KStream
,第二個輸入是 KTable
,而輸出是另一個 KStream
。如果您想在輸出端有多個 KStream
,您可以將類型簽名更改為 KStream[]
,然後進行必要的實作變更。
情境 4:兩個輸入綁定且沒有輸出綁定
如果您只有兩個輸入綁定但沒有輸出,您可以使用 Java 的 BiConsumer
支援。可能的用例是不想產生輸出,而是更新一些狀態儲存。這是一個範例
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {
userClicksStream.foreach((key, value) -> latch.countDown());
userRegionsTable.toStream().foreach((key, value) -> latch.countDown());
};
}
如果您有三個或四個或 n 個輸入綁定怎麼辦?在這種情況下,您不能依賴 Function 或 BiFunction 方法。您需要依賴部分應用的函數。基本上,您從一個 Function 開始,但是,在這個第一個函數的輸出端,您提供另一個 Function 或 Consumer,直到您用盡您的輸入。這種以這種方式部分應用函數的技術通常在函數式程式設計術語中稱為函數柯里化。這是一個使用三個輸入和一個輸出的範例
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
return orderStream -> (
customers -> (
products -> (
orderStream.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
仔細檢查處理器的類型簽名。
Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>>
我們從一個函數開始,該函數接受一個 KStream
作為輸入,但是第二個參數(這個函數的輸出)是另一個 Function
,該函數接受一個 GlobalKTable
作為輸入。這個第二個 Function 的輸出是另一個函數,該函數有一個另一個 GlobalKTable
的輸入。這個第三個函數正在用盡我們的輸入,這個函數有一個 KStream
作為其輸出,這將用於輸出綁定。
讓我們從數學的角度來看這個模型。
讓我們將這三個函數稱為 f(x)
、f(y)
和 f(z)
。
如果我們展開這些函數,它會看起來像這樣
f(x) -> f(y) -> f(z) -> KStream<Long, EnrichedOrder>
.
變數 x 代表 KStream<Long, Order>
,變數 y 代表 GlobalKTable<Long, Customer>
,變數 z 代表 GlobalKTable<Long, Product>
。
第一個函數 f(x)
具有應用程式的第一個輸入綁定 (KStream<Long, Order>
),其輸出是函數 f(y)
。
函數 f(y)
具有應用程式的第二個輸入綁定 (GlobalKTable<Long, Customer>
),其輸出是另一個函數 f(z)
。
函數 f(z)
的輸入是應用程式的第三個輸入 (GlobalKTable<Long, Product>
),其輸出是 KStream<Long, EnrichedOrder>
,這是應用程式的最終輸出綁定。
來自三個部分函數的輸入 (分別為 KStream
、GlobalKTable
、GlobalKTable
) 可在方法主體中使用,以作為 Lambda 運算式的一部分來實作業務邏輯。
請記住,如上所述,在 Java 中使用函數柯里化處理超過合理數量的輸入(例如上面範例中的三個)可能會導致程式碼可讀性問題。因此,您必須仔細評估和分解您的應用程式,以了解在單一處理器中擁有大量輸入綁定的適當性。
在這篇部落格文章中,我們快速瀏覽了您可以在基於 Spring Cloud Stream 的 Kafka Streams 應用程式中使用的各種函數式程式設計模型。我們看到了可以使用 java.util.function.Function
(或如我們在先前的部落格中看到的 Consumer
)、java.util.function.BiFunction
和 BiConsumer
的方式。我們也看到了如何使用 Kafka Stream 的分支功能在輸出端支援多個綁定,該功能提供了一個 KStream
陣列作為輸出。最後,我們看到了如何透過部分應用的 (柯里化的) 函數來支援超過兩個輸入綁定。在下一篇部落格文章中,我們將看到 Kafka Streams 綁定器如何執行資料反序列化和序列化。