領先一步
VMware 提供培訓和認證,以加速您的進度。
了解更多歡迎來到另一篇系列文章,展示了 Spring Cloud Stream (SCSt) 的新功能。在之前的文章 (可在此處取得 這裡, 這裡 和 這裡) 中,我們試圖證明我們在 Spring Cloud Stream (SCSt) 中轉向函數式程式設計模型的合理性。它減少了程式碼和配置,並且您的程式碼與 SCSt 的內部完全分離。
今天,我們將討論使用函數進行路由。在 SCSt 的上下文中,路由是指 a) 將事件路由到特定的事件訂閱者 或 b) 將事件訂閱者產生的事件路由到特定的目的地 的能力。為了更好地理解上下文,讓我們快速了解一下基於註解的程式設計模型是如何運作的。在這篇文章中,我們將其稱為路由「到 (TO)」和路由「從 (FROM)」。
對於路由TO事件訂閱者,我們使用了StreamListener
註解的condition
屬性,如下所示
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}
這裡有更多關於此方法的詳細資訊。
並且,對於路由FROM事件訂閱者,我們使用了動態綁定目的地 - 這種方法允許框架基於單個事件中提供的某些指令綁定到目的地。
使用函數式方法,我們可以通過一些額外的功能,以更乾淨和簡潔的方式完成上述所有操作。
藉由仰賴 Spring Cloud Function (SCF) 中提供的路由函數功能,即可實現路由「TO」函數。您可以通過設定 spring.cloud.stream.function.routing.enabled
屬性顯式啟用路由,或者通過設定 spring.cloud.function.routing-expression
屬性並使用 Spring Expression Language (SpEL) 提供路由指令來隱式啟用。路由指令應導致路由到的函數的定義「TO」。為了綁定的目的,路由目的地的名稱是 functionRouter-in-0
(請參閱 RoutingFunction.FUNCTION_NAME
和 此處描述的綁定命名約定)。
當消息被發送到這個目的地時,路由函數會嘗試確定哪個實際函數需要處理這樣的事件。它首先嘗試訪問 spring.cloud.function.routing-expression
消息頭,如果提供了,則確定要調用的實際函數的名稱。這是最動態的方法。第二個最動態的方法是提供一個 spring.cloud.function.definition
頭,它應該包含路由到的函數的定義「TO」。這兩種方法都需要通過設定 spring.cloud.stream.function.routing.enabled
屬性來顯式啟用路由函數。
至於以前版本中沒有的額外功能,spring.cloud.function.routing-expression
也可以用作應用程式屬性。例如,考慮這樣一種情況,即無論傳入的事件如何,表達式都是相同的,就像本文前面顯示的基於註解的示例一樣 (例如,spring.cloud.function.routing-expression=headers['type']=='order'
)。對於這種方法,您無需顯式啟用路由函數,因為 spring.cloud.function.routing-expression
作為應用程式屬性具有相同的效果。
儘管微不足道,但以下是上述方法之一的完整示例
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
通過將消息發送到由綁定器 (即 rabbit 或 kafka) 公開的 functionRouter-in-0
目的地,基於消息處理時的 nanoTime()
值,這樣的消息將被路由到適當的 ('even' 或 'odd') Consumer
bean。
與以前一樣,路由「FROM」依賴於 SCSt 的動態綁定目的地功能。但是,與路由「TO」一樣,還有許多額外功能。
以下示例顯示了基本原理
@Autowired
private BinderAwareChannelResolver resolver;
public Consumer<String> send(Message message) {
MessageChannel destination = resolver
.resolveDestination(message.getHeaders().get("type"))
Message outgoingMessage = . . . // your code
destination.send(outgoingMessage);
}
您所需要的只是對 BinderAwareChannelResolver
的引用 (在前面的示例中自動裝配)。然後,您可以使用一些邏輯來確定目的地名稱 (在我們的例子中,我們使用 'type' 標頭的值)。一旦確定了目的地名稱,您就可以通過使用 BinderAwareChannelResolver.resolveDestination(..)
操作並向其發送消息來獲取對它的引用。這就是全部。
上述方法的缺點是,一些特定於框架的抽象洩漏到您的程式碼中。看看您需要了解 BinderAwareChannelResolver
和 MessageChannel
的事實,以及其他一些事情。事實上,前面示例中的大多數程式碼都是樣板。
更動態且洩漏更少的方法是依賴 spring.cloud.stream.sendto.destination
屬性,它有效地完成了上述所有操作 - 但在幕後。以下示例顯示了如何使用此方法
@SpringBootApplication
public class RoutingStreamApplication {
@Bean
public Function<Message<String>, Message<String>> process() {
return message -> {
// some logic to process incoming message
Message<String> outgoingMessage = MessageBuilder
.withPayload("Hello")
.setHeader("spring.cloud.stream.sendto.destination", "even")
.build();
return outgoingMessage;
};
}
}
我們不再需要注入 BinderAwareChannelResolver
、執行 MessageChannel
的解析等等。我們只需創建一個新的 Message
,其中指定了一個標頭,該標頭由框架用於動態解析目的地。
最後但同樣重要的是,讓我們看看另一個流行的路由「FROM」用例,其中資料來源起源於 SCSt 上下文之外,但需要路由到適當的目的地
@Controller
public class SourceWithDynamicDestination {
@Autowired
private ObjectMapper jsonMapper;
private final EmitterProcessor<?> processor = EmitterProcessor.create();
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType)
throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destination = payload.get("id");
Message<?> message =
MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<?>> source() {
return () -> processor;
}
}
然後,我們可以通過運行以下 curl
命令來查看結果
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://127.0.0.1:8080
在這裡,由於 Supplier<Flux<?>>
bean,我們同時使用了函數式方法和反應式範例的精華。我們有一個簡單的 MVC 控制器,我們希望根據內容的 'id' 屬性的值將請求向下游路由。雖然 EmitterProcessor
及其在此處的使用的細節是另一篇文章的主題,但重要的是,它演示了一個功能齊全的應用程式,其中 HTTP 請求被動態路由到由目標綁定器管理的目的地。
注意:在撰寫本文時,參考文檔正在積極更新以支援即將推出的 SCSt 3.0.0.RELEASE,但您始終可以使用 參考文檔 的來源以獲取最新資訊。
在 GitHub 上查看 Spring Cloud Stream。
此外,該系列之前的部落格