Spring Cloud Stream - 事件路由

工程 | Oleg Zhurakousky | 2019年10月31日 | ...

歡迎來到另一篇系列文章,展示了 Spring Cloud Stream (SCSt) 的新功能。在之前的文章 (可在此處取得 這裡, 這裡這裡) 中,我們試圖證明我們在 Spring Cloud Stream (SCSt) 中轉向函數式程式設計模型的合理性。它減少了程式碼和配置,並且您的程式碼與 SCSt 的內部完全分離。

今天,我們將討論使用函數進行路由。在 SCSt 的上下文中,路由是指 a) 將事件路由到特定的事件訂閱者b) 將事件訂閱者產生的事件路由到特定的目的地 的能力。為了更好地理解上下文,讓我們快速了解一下基於註解的程式設計模型是如何運作的。在這篇文章中,我們將其稱為路由「到 (TO)」和路由「從 (FROM)」。

對於路由TO事件訂閱者,我們使用了StreamListener註解的condition屬性,如下所示

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

這裡有更多關於此方法的詳細資訊。

並且,對於路由FROM事件訂閱者,我們使用了動態綁定目的地 - 這種方法允許框架基於單個事件中提供的某些指令綁定到目的地。

使用函數進行事件路由

使用函數式方法,我們可以通過一些額外的功能,以更乾淨和簡潔的方式完成上述所有操作。

路由 TO

藉由仰賴 Spring Cloud Function (SCF) 中提供的路由函數功能,即可實現路由「TO」函數。您可以通過設定 spring.cloud.stream.function.routing.enabled 屬性顯式啟用路由,或者通過設定 spring.cloud.function.routing-expression 屬性並使用 Spring Expression Language (SpEL) 提供路由指令來隱式啟用。路由指令應導致路由到的函數的定義「TO」。為了綁定的目的,路由目的地的名稱是 functionRouter-in-0 (請參閱 RoutingFunction.FUNCTION_NAME此處描述的綁定命名約定)。

當消息被發送到這個目的地時,路由函數會嘗試確定哪個實際函數需要處理這樣的事件。它首先嘗試訪問 spring.cloud.function.routing-expression 消息頭,如果提供了,則確定要調用的實際函數的名稱。這是最動態的方法。第二個最動態的方法是提供一個 spring.cloud.function.definition 頭,它應該包含路由到的函數的定義「TO」。這兩種方法都需要通過設定 spring.cloud.stream.function.routing.enabled 屬性來顯式啟用路由函數。

至於以前版本中沒有的額外功能,spring.cloud.function.routing-expression 也可以用作應用程式屬性。例如,考慮這樣一種情況,即無論傳入的事件如何,表達式都是相同的,就像本文前面顯示的基於註解的示例一樣 (例如,spring.cloud.function.routing-expression=headers['type']=='order')。對於這種方法,您無需顯式啟用路由函數,因為 spring.cloud.function.routing-expression 作為應用程式屬性具有相同的效果。

儘管微不足道,但以下是上述方法之一的完整示例

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通過將消息發送到由綁定器 (即 rabbit 或 kafka) 公開的 functionRouter-in-0 目的地,基於消息處理時的 nanoTime() 值,這樣的消息將被路由到適當的 ('even' 或 'odd') Consumer bean。

路由 FROM

與以前一樣,路由「FROM」依賴於 SCSt 的動態綁定目的地功能。但是,與路由「TO」一樣,還有許多額外功能。

以下示例顯示了基本原理

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

您所需要的只是對 BinderAwareChannelResolver 的引用 (在前面的示例中自動裝配)。然後,您可以使用一些邏輯來確定目的地名稱 (在我們的例子中,我們使用 'type' 標頭的值)。一旦確定了目的地名稱,您就可以通過使用 BinderAwareChannelResolver.resolveDestination(..) 操作並向其發送消息來獲取對它的引用。這就是全部。

上述方法的缺點是,一些特定於框架的抽象洩漏到您的程式碼中。看看您需要了解 BinderAwareChannelResolverMessageChannel 的事實,以及其他一些事情。事實上,前面示例中的大多數程式碼都是樣板。

更動態且洩漏更少的方法是依賴 spring.cloud.stream.sendto.destination 屬性,它有效地完成了上述所有操作 - 但在幕後。以下示例顯示了如何使用此方法

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
		.withPayload("Hello")
		.setHeader("spring.cloud.stream.sendto.destination", "even")
		.build();
       return outgoingMessage;
     };
  }
}

我們不再需要注入 BinderAwareChannelResolver、執行 MessageChannel 的解析等等。我們只需創建一個新的 Message,其中指定了一個標頭,該標頭由框架用於動態解析目的地。

路由來源

最後但同樣重要的是,讓我們看看另一個流行的路由「FROM」用例,其中資料來源起源於 SCSt 上下文之外,但需要路由到適當的目的地

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然後,我們可以通過運行以下 curl 命令來查看結果

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://127.0.0.1:8080

在這裡,由於 Supplier<Flux<?>> bean,我們同時使用了函數式方法和反應式範例的精華。我們有一個簡單的 MVC 控制器,我們希望根據內容的 'id' 屬性的值將請求向下游路由。雖然 EmitterProcessor 及其在此處的使用的細節是另一篇文章的主題,但重要的是,它演示了一個功能齊全的應用程式,其中 HTTP 請求被動態路由到由目標綁定器管理的目的地。

注意:在撰寫本文時,參考文檔正在積極更新以支援即將推出的 SCSt 3.0.0.RELEASE,但您始終可以使用 參考文檔 的來源以獲取最新資訊。

在 GitHub 上查看 Spring Cloud Stream

此外,該系列之前的部落格

- Spring Cloud Stream - 函數式和反應式

獲取 Spring 電子報

與 Spring 電子報保持聯繫

訂閱

領先一步

VMware 提供培訓和認證,以加速您的進度。

了解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位檔案。

了解更多

即將舉行的活動

查看 Spring 社群中所有即將舉行的活動。

查看全部